""" Python module for extracting attribute from Arhive Extractor Device. """ import logging import datetime import numpy as np import PyTango as tango __version__ = "1.0.1" ########################################################################## """ Commodity variables """ # Extractor date format for GetAttDataBetweenDates DBDFMT = "%Y-%m-%d %H:%M:%S" # Extractor date format for GetNearestValue DBDFMT2 = "%d-%m-%Y %H:%M:%S" # Vectorized fromtimestamp function ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp) class ArchiveExtractor: # Max number of point per extraction chunks Nmax = 100000 ########################################################################## def __init__( self, extractorKind='H', extractorNumber=2, extractorPath=None, logger='info', ): """ Constructor function Parameters ---------- extractorKind: char Either 'H' or 'T' for HDB or TDB. extractorNumber: int Number of the archive extractor instance to use. extractorPath: string Tango path to the extractor. If this argument is given, it takes precedence over extractorKind and extractorNumber. logger: logging.Logger, str Logger object to use. If string, can be a log level. A basic logger with stream handler will be instanciated. Default to 'info'. Return ------ ArchiveExtractor """ ####################################################### # Get logger if type(logger) == logging.Logger: self.logger = logger else: self.logger = logging.getLogger(__name__) self.logger.setLevel(getattr(logging, logger.upper())) if not self.logger.hasHandlers(): # No handlers, create one sh = logging.StreamHandler() sh.setLevel(self.logger.level) sh.setFormatter(logging.Formatter("%(levelname)s:%(message)s")) self.logger.addHandler(sh) ####################################################### # Select Extractor if extractorPath is None: self.extractor = tango.DeviceProxy( "archiving/%sDBExtractor/%d"%(extractorKind, extractorNumber) ) else: self.extractor = tango.DeviceProxy(extractorPath) self.extractor.set_timeout_millis(3000) self.logger.debug("Archive Extractor %s used."%self.extractor.name()) ##---------------------------------------------------------------------------## @staticmethod def dateparse(datestr): """ Convenient function to parse date strings. Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise. Parameters --------- datestr : string Date as a string, format %Y-%m-%d-%H:%M:%S or less precise. Exceptions ---------- ValueError If the parsing failed. Returns ------- date : datetime.datetime Parsed date """ # This gives all format that will be tried, in order. # Stop on first parse success. Raise error if none succeed. fmt = [ "%Y-%m-%d-%H:%M:%S", "%Y-%m-%d-%H:%M", "%Y-%m-%d-%H", "%Y-%m-%d", "%Y-%m", ] date = None for f in fmt: try: date = datetime.datetime.strptime(datestr, f) except ValueError: continue else: break else: raise ValueError("Could not parse argument to a date") return date ##---------------------------------------------------------------------------## def betweenDates( self, attribute, dateStart, dateStop=None, ): """ Query attribute data from an archiver database, get all points between dates. Use ExtractBetweenDates. Parameters ---------- attribute : String Name of the attribute. Full Tango name i.e. "test/dg/panda/current". dateStart : datetime.datetime, string Start date for extraction. If string, it will be parsed. dateStop : datetime.datetime, string, None Stop date for extraction. If string, it will be parsed. If None, it takes the current date and time. Default is None (now). Exceptions ---------- ValueError The attribute is not found in the database. Returns ------- [date, value] : array date : numpy.ndarray of datetime.datime objects Dates of the values value : numpy.ndarray Archived values """ # Parse date if it is string if type(dateStart) is str: dateStart = self.dateparse(dateStart) if dateStop is None: dateStop = datetime.datetime.now() if type(dateStop) is str: dateStop = self.dateparse(dateStop) # Uncapitalize attribute attribute = attribute.lower() # Check that the attribute is in the database self.logger.debug("Check that %s is archived."%attribute) if not self.extractor.IsArchived(attribute): self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) # Get the number of points N=self.extractor.GetAttDataBetweenDatesCount([ attribute, dateStart.strftime(DBDFMT2), dateStop.strftime(DBDFMT2) ]) self.logger.debug("On the period, there is %d entries"%N) # If data chunk is too much, we need to cut it if N > self.Nmax: dt = (dateStop-dateStart)/(N//self.Nmax) cdates = [dateStart] while cdates[-1] < dateStop: cdates.append(cdates[-1]+dt) cdates[-1] = dateStop self.logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt)) else: cdates=[dateStart, dateStop] # Arrays to hold every chunks value = [] date = [] # For each date chunk for i_d in range(len(cdates)-1): # 2 retries on DevFailed for i in range(3): # Make retrieval request self.logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%( attribute, cdates[i_d].strftime(DBDFMT), cdates[i_d+1].strftime(DBDFMT)) ) try: _date, _value = self.extractor.ExtractBetweenDates([ attribute, cdates[i_d].strftime(DBDFMT), cdates[i_d+1].strftime(DBDFMT) ]) except tango.DevFailed as e: self.logger.warning("The extractor device returned the following error:") self.logger.warning(e) self.logger.warning("Retrying...") continue break if i==2: logger.error("Could not extract this chunk. Check the device extractor") return None # Transform to datetime - value arrays _value = np.asarray(_value, dtype=float) if len(_date) > 0: _date = ArrayTimeStampToDatetime(_date/1000.0) value.append(_value) date.append(_date) self.logger.debug("Concatenate chunks") value = np.concatenate(value) date = np.concatenate(date) self.logger.debug("Extraction done for %s."%attribute) return [date, value] ##---------------------------------------------------------------------------## def betweenDates_MinMaxMean( self, attribute, dateStart, dateStop=datetime.datetime.now(), timeInterval=datetime.timedelta(seconds=60), ): """ Query attribute data from an archiver database, get all points between dates. Use ExtractBetweenDates. Parameters ---------- attribute : String Name of the attribute. Full Tango name i.e. "test/dg/panda/current". dateStart : datetime.datetime, string Start date for extraction. If string, it will be parsed. dateStop : datetime.datetime, string Stop date for extraction. If string, it will be parsed. Default is now (datetime.datetime.now()) timeInterval: datetime.timedelta, string Time interval used to perform min,max and mean. Can be a string with a number and a unit in "d", "h", "m" or "s" Exceptions ---------- ValueError The attribute is not found in the database. Returns ------- [mdates, value_min, value_max, value_mean] : array mdates : numpy.ndarray of datetime.datime objects Dates of the values, middle of timeInterval windows value_min : numpy.ndarray Minimum of the value on the interval value_max : numpy.ndarray Maximum of the value on the interval value_mean : numpy.ndarray Mean of the value on the interval """ # Parse date if it is string if type(dateStart) is str: dateStart = self.dateparse(dateStart) if type(dateStop) is str: dateStop = self.dateparse(dateStop) # Parse timeInterval if string if type(timeInterval) is str: try: mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]] except KeyError: self.logger.error("timeInterval could not be parsed") raise ValueError("timeInterval could not be parsed") timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul) # Check that the attribute is in the database self.logger.debug("Check that %s is archived."%attribute) if not self.extractor.IsArchived(attribute): self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) # Cut data range in time chunks cdates = [dateStart] while cdates[-1] < dateStop: cdates.append(cdates[-1]+timeInterval) cdates[-1] = dateStop mdates = np.asarray(cdates[:-1])+timeInterval/2 self.logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval)) # Prepare arrays value_min = np.empty(len(cdates)-1) value_max = np.empty(len(cdates)-1) value_mean = np.empty(len(cdates)-1) # For each time chunk for i_d in range(len(cdates)-1): for func, arr in zip( ["Max", "Min", "Avg"], [value_max, value_min, value_mean], ): # Make requests self.logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%( func, attribute, cdates[i_d].strftime(DBDFMT2), cdates[i_d+1].strftime(DBDFMT2)) ) _val =getattr(self.extractor, "GetAttData%sBetweenDates"%func)([ attribute, cdates[i_d].strftime(DBDFMT2), cdates[i_d+1].strftime(DBDFMT2) ]) arr[i_d] = _val self.logger.debug("Extraction done for %s."%attribute) return [mdates, value_min, value_max, value_mean]