#!/usr/Local/pyroot/PyTangoRoot/bin/python """ Python module for extracting attribute from Arhive Extractor Device. Includes a Command Line Interface. Can be imported as is to use function in user script. """ import logging import datetime import numpy as np import PyTango as tango __version__ = "1.0.1" ########################################################################## """ Commodity variables """ # Extractor date format for GetAttDataBetweenDates DBDFMT = "%Y-%m-%d %H:%M:%S" # Extractor date format for GetNearestValue DBDFMT2 = "%d-%m-%Y %H:%M:%S" ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp) ##---------------------------------------------------------------------------## def dateparse(datestr): """ Convenient function to parse date strings. Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise. Parameters --------- datestr : string Date as a string, format %Y-%m-%d-%H:%M:%S or less precise. Exceptions ---------- ValueError If the parsing failed. Returns ------- date : datetime.datetime Parsed date """ logger.info("Parse date '%s'"%datestr) fmt = [ "%Y-%m-%d-%H:%M:%S", "%Y-%m-%d-%H:%M", "%Y-%m-%d-%H", "%Y-%m-%d", "%Y-%m", ] date = None for f in fmt: logger.debug("Try format '%s'"%f) try: date = datetime.datetime.strptime(datestr, f) except ValueError: logger.debug("Parsing failed") if date is None: logger.error("Could not parse date") raise ValueError return date ##---------------------------------------------------------------------------## def query_ADB_BetweenDates(attr, dateStart, dateStop=datetime.datetime.now(), extractor="archiving/TDBExtractor/4"): """ Query attribute data from an archiver database, get all points between dates. Use ExtractBetweenDates. Parameters ---------- attr : String Name of the attribute. Full Tango name i.e. "test/dg/panda/current". dateStart : datetime.datetime Start date for extraction. dateStop : datetime.datetime Stop date for extraction. Default is now (datetime.datetime.now()) extractor : String Name of the DB Extractor device. Default is "archiving/TDBExtractor/4" Exceptions ---------- ValueError The attribute is not found in the database. Returns ------- [date, value] : array date : numpy.ndarray of datetime.datime objects Dates of the values value : numpy.ndarray Archived values """ # Max number of point per extraction chunks Nmax = 100000 # Device Proxy to DB logger.debug("Instantiate proxy to %s"%extractor) ADB = tango.DeviceProxy(extractor) # Give the DB extractor 3 seconds timeout ADB.set_timeout_millis(3000) # Check that the attribute is in the database logger.debug("Check that %s is archived."%attr) if not ADB.IsArchived(attr): logger.error("Attribute '%s' is not archived in DB %s"%(attr, extractor)) raise ValueError("Attribute '%s' is not archived in DB %s"%(attr, extractor)) # Get its sampling period in seconds samplingPeriod = int(ADB.GetArchivingMode(attr)[1])*10**-3 logger.debug("Attribute is sampled every %g seconds"%samplingPeriod) # Evaluate the number of points est_N = (dateStop-dateStart).total_seconds()/samplingPeriod logger.debug("Which leads to %d points to extract."%est_N) # If data chunk is too much, we need to cut it if est_N > Nmax: dt = datetime.timedelta(seconds=samplingPeriod)*Nmax cdates = [dateStart] while cdates[-1] < dateStop: cdates.append(cdates[-1]+dt) cdates[-1] = dateStop logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt)) else: cdates=[dateStart, dateStop] # Arrays to hold every chunks value = [] date = [] # For each date chunk for i_d in range(len(cdates)-1): # Make retrieval request logger.debug("Perform ExtractBetweenDates (%s, %s, %s)"%( attr, cdates[i_d].strftime(DBDFMT), cdates[i_d+1].strftime(DBDFMT)) ) _date, _value = ADB.ExtractBetweenDates([ attr, cdates[i_d].strftime(DBDFMT), cdates[i_d+1].strftime(DBDFMT) ]) # Transform to datetime - value arrays _value = np.asarray(_value, dtype=float) if len(_date) > 0: _date = ArrayTimeStampToDatetime(_date/1000.0) value.append(_value) date.append(_date) logger.debug("Concatenate chunks") value = np.concatenate(value) date = np.concatenate(date) logger.debug("Extraction done for %s."%attr) return [date, value] ##---------------------------------------------------------------------------## def query_ADB_NearestValue(attr, dates, extractor="archiving/TDBExtractor/4"): """ Query attribute data from an archiver database, get nearest points from dates. Use GetNearestValue and perform multiple calls. For each date in dates, it read the closest sampled value. Return the real dates of the samples. Parameters ---------- attr : String Name of the attribute. Full Tango name i.e. "test/dg/panda/current". dates : numpy.ndarray of datetime.datetime Dates for extraction. extractor : String Name of the DB Extractor device. Default is "archiving/TDBExtractor/4" Exceptions ---------- ValueError The attribute is not found in the database. Returns ------- [realdate, value] : array realdate : numpy.ndarray of datetime.datime objects Dates of the values value : numpy.ndarray Archived values """ # Device Proxy to DB ADB = tango.DeviceProxy(extractor) # Give the DB extractor 3 seconds timeout ADB.set_timeout_millis(3000) # Check that the attribute is in the database if not ADB.IsArchived(attr): raise ValueError("Attribute '%s' is not archived in DB %s"%(attr, extractor)) # Prepare arrays value = np.empty(len(dates), dtype=float) realdate = np.empty(len(dates), dtype=object) # Loop on dates for i in range(len(dates)): # Make retrieval answ = ADB.GetNearestValue([attr, dates[i].strftime(DBDFMT2)]) answ = answ.split(";") realdate[i] = datetime.datetime.fromtimestamp(int(answ[0])/1000) value[i] = answ[1] return [realdate, value] ########################################################################## """ Command Line Interface """ if __name__ == "__main__": # Name the logger after the filename logger = logging.getLogger("ArchiveExtractor") # Default stop date dateStop = datetime.datetime.now() # Default stop date dateStart = datetime.datetime.now()-datetime.timedelta(days=1) ####################################################### # Install argument parser import argparse parser = argparse.ArgumentParser(description="Extract attributes from the extractor devices.\nVersion %s"%__version__) parser.add_argument("--from", type=dateparse, dest="dateStart", help="Start date for extraction, format '1990-12-13-22:33:45'. "+ "It is possible to be less precise and drop, seconds, minutes, hours or even day."+ " Default is one day ago", default=dateStart) parser.add_argument("--to", type=dateparse, dest="dateStop", help="Stop date for extraction, format '1990-12-13-22:33:45'. It is possible to be less precise and drop, seconds, minutes, hours or even day."+ " Default is now.", default=dateStop) parser.add_argument("--DB", choices=["H", "T"], default="T", help="Database to extract from. HDB (H) or TDB (T), default: %(default)s") parser.add_argument("--DBN", type=int, default=2, help="Extractor device number, default: %(default)s") parser.add_argument("--fileout", type=str, default="extracted_%s.npy"%datetime.datetime.now().strftime("%Y%m%d_%H%M%S"), help="filename of the extraction destination. Default: %(default)s"), parser.add_argument('--log', type=str, default="INFO", help="Log level. Default: %(default)s.") parser.add_argument('--filemode', action="store_true", help="Set attribute to filemode."+ " Instead of specifying attributes, put a path to a file containing a list of attributes."+ " The file contains one attribute per line.") parser.add_argument('attributes', type=str, nargs='+', help="List of attributes to extract. Full tango path.") args = parser.parse_args() ####################################################### # Configure logger # Add a stream handler s_handler = logging.StreamHandler() s_handler.setFormatter(logging.Formatter("%(levelname)s\t[%(funcName)s] \t%(message)s")) # Set level according to command line attribute s_handler.setLevel(level=getattr(logging, args.log.upper())) logger.setLevel(level=getattr(logging, args.log.upper())) logger.addHandler(s_handler) logger.debug("Parsed arguments: %s"%args) logger.info("Archive Extractor %s"%__version__) ####################################################### # Filemode or not if args.filemode: logger.info("Filemode, openning file %s"%args.attributes[0]) # Read the file. Each line is an attribute with open(args.attributes[0], "r") as fp: attributes = fp.readlines() logger.debug("Read lines : %s"%attributes) # Clean end of line for i_a in range(len(attributes)): attributes[i_a] = attributes[i_a].rstrip() else: attributes = args.attributes ####################################################### # Select Extractor extractor = "archiving/%sDBExtractor/%d"%(args.DB, args.DBN) ####################################################### # Prepare dictionnary for result results = dict() ####################################################### # Extract from database logger.info("Extract from %s to %s."%(args.dateStart, args.dateStop)) for attr in attributes: logger.info("Extracting attribute %s..."%attr) try: datevalue = query_ADB_BetweenDates(attr, args.dateStart, args.dateStop, extractor) # Add to result dictionnary results[attr] = datevalue except ValueError: logger.warning("Failed to extract %s. Skipping..."%attr) except tango.CommunicationFailed: logger.warning("Failed to extract %s. Skipping..."%attr) logger.error("The device %s might have crash.\n"+ "You should check with Jive and probably restart with Astor.\n") # Save all at each step np.save(args.fileout, results) logger.info("Extraction done, saved in file %s"%args.fileout) else: # Name the logger after the module name logger = logging.getLogger(__name__)