From 7da2f36127527dc5c9f5a849b2abc20a5fffc59e Mon Sep 17 00:00:00 2001 From: Romain Broucquart <romain.broucquart@synchrotron-soleil.fr> Date: Wed, 13 Oct 2021 14:53:47 +0200 Subject: [PATCH] Add MinMaxMean extraction, fix CLI, add core directory --- cli_archiveextractor.py | 194 +++++++------ .../ArchiveExtractor.py | 265 +++++------------- core/__init__.py | 0 3 files changed, 171 insertions(+), 288 deletions(-) rename ArchiveExtractor.py => core/ArchiveExtractor.py (53%) create mode 100644 core/__init__.py diff --git a/cli_archiveextractor.py b/cli_archiveextractor.py index 4c90d5a..5c22c18 100644 --- a/cli_archiveextractor.py +++ b/cli_archiveextractor.py @@ -1,135 +1,131 @@ +#!/usr/Local/pyroot/PyTangoRoot/bin/python +""" +Command Line Interface to use ArchiveExtractor module +""" +import argparse +import ArchiveExtractor +# Name the logger after the filename +logger = logging.getLogger("ArchiveExtractor") -########################################################################## -""" Command Line Interface """ -if __name__ == "__main__": +# Default stop date +dateStop = datetime.datetime.now() - # Name the logger after the filename - logger = logging.getLogger("ArchiveExtractor") +# Default start date +dateStart = datetime.datetime.now()-datetime.timedelta(days=1) - # Default stop date - dateStop = datetime.datetime.now() +####################################################### +# Install argument parser - # Default stop date - dateStart = datetime.datetime.now()-datetime.timedelta(days=1) +parser = argparse.ArgumentParser(description="Extract attributes from the extractor devices.\nVersion %s"%__version__) - ####################################################### - # Install argument parser - import argparse +parser.add_argument("--from", type=dateparse, dest="dateStart", + help="Start date for extraction, format '1990-12-13-22:33:45'. "+ + "It is possible to be less precise and drop, seconds, minutes, hours or even day."+ + " Default is one day ago", + default=dateStart) - parser = argparse.ArgumentParser(description="Extract attributes from the extractor devices.\nVersion %s"%__version__) +parser.add_argument("--to", type=dateparse, dest="dateStop", + help="Stop date for extraction, format '1990-12-13-22:33:45'. It is possible to be less precise and drop, seconds, minutes, hours or even day."+ + " Default is now.", + default=dateStop) - parser.add_argument("--from", type=dateparse, dest="dateStart", - help="Start date for extraction, format '1990-12-13-22:33:45'. "+ - "It is possible to be less precise and drop, seconds, minutes, hours or even day."+ - " Default is one day ago", - default=dateStart) +parser.add_argument("--DB", choices=["H", "T", "L"], + default="T", help="Database to extract from. HDB (H) or TDB (T), default: %(default)s") - parser.add_argument("--to", type=dateparse, dest="dateStop", - help="Stop date for extraction, format '1990-12-13-22:33:45'. It is possible to be less precise and drop, seconds, minutes, hours or even day."+ - " Default is now.", - default=dateStop) +parser.add_argument("--DBN", type=int, default=2, + help="Extractor device number, default: %(default)s") - parser.add_argument("--DB", choices=["H", "T", "L"], - default="T", help="Database to extract from. HDB (H) or TDB (T), default: %(default)s") +parser.add_argument("--fileout", type=str, default="extracted_%s.npy"%datetime.datetime.now().strftime("%Y%m%d_%H%M%S"), + help="filename of the extraction destination. Default: %(default)s"), - parser.add_argument("--DBN", type=int, default=2, - help="Extractor device number, default: %(default)s") +parser.add_argument('--log', type=str, default="INFO", + help="Log level. Default: %(default)s.") - parser.add_argument("--fileout", type=str, default="extracted_%s.npy"%datetime.datetime.now().strftime("%Y%m%d_%H%M%S"), - help="filename of the extraction destination. Default: %(default)s"), - parser.add_argument('--log', type=str, default="INFO", - help="Log level. Default: %(default)s.") +parser.add_argument('--filemode', action="store_true", + help="Set attribute to filemode."+ + " Instead of specifying attributes, put a path to a file containing a list of attributes."+ + " The file contains one attribute per line.") +parser.add_argument('attributes', type=str, nargs='+', + help="List of attributes to extract. Full tango path.") - parser.add_argument('--filemode', action="store_true", - help="Set attribute to filemode."+ - " Instead of specifying attributes, put a path to a file containing a list of attributes."+ - " The file contains one attribute per line.") +args = parser.parse_args() - parser.add_argument('attributes', type=str, nargs='+', - help="List of attributes to extract. Full tango path.") - args = parser.parse_args() +####################################################### +# Configure logger +# Add a stream handler +s_handler = logging.StreamHandler() +s_handler.setFormatter(logging.Formatter("%(levelname)s\t[%(funcName)s] \t%(message)s")) - ####################################################### - # Configure logger +# Set level according to command line attribute +s_handler.setLevel(level=getattr(logging, args.log.upper())) +logger.setLevel(level=getattr(logging, args.log.upper())) +logger.addHandler(s_handler) - # Add a stream handler - s_handler = logging.StreamHandler() - s_handler.setFormatter(logging.Formatter("%(levelname)s\t[%(funcName)s] \t%(message)s")) +logger.debug("Parsed arguments: %s"%args) - # Set level according to command line attribute - s_handler.setLevel(level=getattr(logging, args.log.upper())) - logger.setLevel(level=getattr(logging, args.log.upper())) - logger.addHandler(s_handler) +logger.info("Archive Extractor %s"%__version__) - logger.debug("Parsed arguments: %s"%args) +####################################################### +# Filemode or not +if args.filemode: + logger.info("Filemode, openning file %s"%args.attributes[0]) + # Read the file. Each line is an attribute + with open(args.attributes[0], "r") as fp: + attributes = fp.readlines() - logger.info("Archive Extractor %s"%__version__) + logger.debug("Read lines : %s"%attributes) - ####################################################### - # Filemode or not - if args.filemode: - logger.info("Filemode, openning file %s"%args.attributes[0]) - # Read the file. Each line is an attribute - with open(args.attributes[0], "r") as fp: - attributes = fp.readlines() + # Clean end of line + for i_a in range(len(attributes)): + attributes[i_a] = attributes[i_a].rstrip() - logger.debug("Read lines : %s"%attributes) - - # Clean end of line - for i_a in range(len(attributes)): - attributes[i_a] = attributes[i_a].rstrip() - - else: - attributes = args.attributes +else: + attributes = args.attributes - ####################################################### - # Select Extractor - if args.DB == "L": - extractor = "archiving/extractor/%d"%(args.DBN) - else: - extractor = "archiving/%sDBExtractor/%d"%(args.DB, args.DBN) +####################################################### +# Instanciate Extractor +if args.DB == "L": + AE = ArchiveExtractor.ArchiveExtractor(extractorPath="archiving/extractor/%d"%(args.DBN)) +else: + AE = ArchiveExtractor.ArchiveExtractor(args.DB, args.DBN) - ####################################################### - # Prepare dictionnary for result - results = dict() +####################################################### +# Prepare dictionnary for result +results = dict() - ####################################################### - # Extract from database +####################################################### +# Extract from database +for attr in attributes: + logger.info("Extracting attribute %s..."%attr) logger.info("Extract from %s to %s."%(args.dateStart, args.dateStop)) - for attr in attributes: - logger.info("Extracting attribute %s..."%attr) - - for attempt in range(3): - try: - datevalue = query_ADB_BetweenDates(attr, args.dateStart, args.dateStop, extractor) + for attempt in range(3): + try: + datevalue = AE.betweenDates(attr, args.dateStart, args.dateStop) - # Add to result dictionnary - results[attr] = datevalue + # Add to result dictionnary + results[attr] = datevalue - except ValueError as e: - logger.debug("ErrorMsg: %s"%e) - logger.warning("Failed to extract %s. Skipping..."%attr) - except (tango.CommunicationFailed, tango.DevFailed) as e: - # retry - logger.debug("ErrorMsg: %s"%e) - logger.warning("Failed to extract %s. Retry..."%attr) - break + except ValueError as e: + logger.debug("ErrorMsg: %s"%e) + logger.warning("Failed to extract %s. Skipping..."%attr) + except (tango.CommunicationFailed, tango.DevFailed) as e: + # retry + logger.debug("ErrorMsg: %s"%e) + logger.warning("Failed to extract %s. Retry..."%attr) + break - else: - logger.error("The device %s might have crash.\n"%extractor+ - "You should check with Jive and probably restart with Astor.\n") + else: + logger.error("The device %s might have crash.\n"%extractor+ + "You should check with Jive and probably restart with Astor.\n") - # Save all at each step - np.save(args.fileout, results) + # Save all at each step + np.save(args.fileout, results) - logger.info("Extraction done, saved in file %s"%args.fileout) +logger.info("Extraction done, saved in file %s"%args.fileout) -else: - # Name the logger after the module name - logger = logging.getLogger(__name__) diff --git a/ArchiveExtractor.py b/core/ArchiveExtractor.py similarity index 53% rename from ArchiveExtractor.py rename to core/ArchiveExtractor.py index 13b6211..ae66b91 100755 --- a/ArchiveExtractor.py +++ b/core/ArchiveExtractor.py @@ -1,10 +1,5 @@ -#!/usr/Local/pyroot/PyTangoRoot/bin/python """ Python module for extracting attribute from Arhive Extractor Device. - -Includes a Command Line Interface. -Can be imported as is to use function in user script. - """ import logging import datetime @@ -33,8 +28,8 @@ class ArchiveExtractor: ########################################################################## def __init__( self, - ExtractorKind='H', ExtractorNumber=2, - ExtractorPath=None, + extractorKind='H', extractorNumber=2, + extractorPath=None, logger=logging.getLogger("ArchiveExtractor") ): """ @@ -42,15 +37,15 @@ class ArchiveExtractor: Parameters ---------- - ExtractorKind: char + extractorKind: char Either 'H' or 'T' for HDB or TDB. - ExtractorNumber: int + extractorNumber: int Number of the archive extractor instance to use. - ExtractorPath: string + extractorPath: string Tango path to the extractor. - If this argument is given, it takes precedence over ExtractorKind and ExtractorNumber. + If this argument is given, it takes precedence over extractorKind and extractorNumber. logger: logging.Logger Logger object to use @@ -65,12 +60,12 @@ class ArchiveExtractor: ####################################################### # Select Extractor - if ExtractorPath is None: + if extractorPath is None: self.extractor = tango.DeviceProxy( - "archiving/%sDBExtractor/%d"%(ExtractorKind, ExtractorNumber) + "archiving/%sDBExtractor/%d"%(extractorKind, extractorNumber) ) else: - self.extractor = tango.DeviceProxy(ExtractorPath) + self.extractor = tango.DeviceProxy(extractorPath) self.extractor.set_timeout_millis(3000) self.logger.debug("Archive Extractor %s used."%self.extractor.name()) @@ -121,49 +116,6 @@ class ArchiveExtractor: return date - - ##---------------------------------------------------------------------------## - def evalPoints( - self, - attribute, - dateStart, - dateStop, - ): - """ - Evaluate the number of points for the attribute on the date range. - Also checks for its presence. - - Parameters - ---------- - attribute : String - Name of the attribute. Full Tango name i.e. "test/dg/panda/current". - - dateStart : datetime.datetime - Start date for extraction. - - dateStop : datetime.datetime - Stop date for extraction. - Default is now (datetime.datetime.now()) - - Exceptions - ---------- - ValueError - The attribute is not found in the database. - - NotImplemented - The archive mode returned by the DB is not handled. - - - Return - ------ - N: int - Number of points on the date range. - - """ - return N - - - ##---------------------------------------------------------------------------## def betweenDates( self, @@ -177,7 +129,7 @@ class ArchiveExtractor: Parameters ---------- - attr : String + attribute : String Name of the attribute. Full Tango name i.e. "test/dg/panda/current". dateStart : datetime.datetime, string @@ -214,23 +166,6 @@ class ArchiveExtractor: self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) - # Get its sampling period in seconds - req=self.extractor.GetArchivingMode(attribute) - self.logger.debug("GetArchivingMode: "+str(req)) - - if req[0] == "MODE_P": - samplingPeriod = int(req[1])*10**-3 - self.logger.debug("Attribute is sampled every %g seconds"%samplingPeriod) - - elif req[0] == "MODE_EVT": - self.logger.warning("Attribute is archived on event. Chunks of data are sized with an estimated datarate of 0.1Hz") - samplingPeriod = 10 - - else: - self.logger.error("Archive mode not implemented in this script") - raise NotImplemented("Archive mode not implemented in this script") - - # Get the number of points N=self.extractor.GetAttDataBetweenDatesCount([ attribute, @@ -241,7 +176,6 @@ class ArchiveExtractor: # If data chunk is too much, we need to cut it if N > self.Nmax: - dt = datetime.timedelta(seconds=samplingPeriod)*self.Nmax dt = (dateStop-dateStart)/(N//self.Nmax) cdates = [dateStart] while cdates[-1] < dateStop: @@ -286,38 +220,35 @@ class ArchiveExtractor: self.logger.debug("Extraction done for %s."%attribute) return [date, value] + + ##---------------------------------------------------------------------------## - def query_ADB_BetweenDates_MinMaxMean( - attr, - dateStart, - dateStop=datetime.datetime.now(), - timeinterval=datetime.timedelta(seconds=60), - extractor="archiving/TDBExtractor/4"): + def betweenDates_MinMaxMean( + self, + attribute, + dateStart, + timeInterval=datetime.timedelta(seconds=60), + dateStop=datetime.datetime.now(), + ): """ - Query attribute data from archiver database. - Divide the time range in time intervals. - Get min, max and mean value on each time interval. - The date stamp is in the middle of the interval. + Query attribute data from an archiver database, get all points between dates. + Use ExtractBetweenDates. Parameters ---------- - attr : String + attribute : String Name of the attribute. Full Tango name i.e. "test/dg/panda/current". - dateStart : datetime.datetime - Start date for extraction. + dateStart : datetime.datetime, string + Start date for extraction. If string, it will be parsed. - dateStop : datetime.datetime - Stop date for extraction. + dateStop : datetime.datetime, string + Stop date for extraction. If string, it will be parsed. Default is now (datetime.datetime.now()) - timeinterval : datetime.timedelta - Interval time to divide the time range in chunks. - Default is 1 minute. - - extractor : String - Name of the DB Extractor device. - Default is "archiving/TDBExtractor/4" + timeInterval: datetime.timedelta, string + Time interval used to perform min,max and mean. + Can be a string with a number and a unit in "d", "h", "m" or "s" Exceptions ---------- @@ -326,36 +257,46 @@ class ArchiveExtractor: Returns ------- - [date, value] : array - date : numpy.ndarray of datetime.datime objects - Dates of the values - value : numpy.ndarray - Archived values + [mdates, value_min, value_max, value_mean] : array + mdates : numpy.ndarray of datetime.datime objects + Dates of the values, middle of timeInterval windows + value_min : numpy.ndarray + Minimum of the value on the interval + value_max : numpy.ndarray + Maximum of the value on the interval + value_mean : numpy.ndarray + Mean of the value on the interval """ - # TEMP Dev not finished - logger.error("Feature not implemented yet.") - return + # Parse date if it is string + if type(dateStart) is str: + dateStart = self.dateparse(dateStart) + if type(dateStop) is str: + dateStop = self.dateparse(dateStop) - # Device Proxy to DB - logger.debug("Instantiate proxy to %s"%extractor) - ADB = tango.DeviceProxy(extractor) + # Parse timeInterval if string + if type(timeInterval) is str: + try: + mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]] + except KeyError: + self.logger.error("timeInterval could not be parsed") + raise ValueError("timeInterval could not be parsed") + timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul) - # Give the DB extractor 3 seconds timeout - ADB.set_timeout_millis(3000) # Check that the attribute is in the database - logger.debug("Check that %s is archived."%attr) - if not ADB.IsArchived(attr): - logger.error("Attribute '%s' is not archived in DB %s"%(attr, extractor)) - raise ValueError("Attribute '%s' is not archived in DB %s"%(attr, extractor)) + self.logger.debug("Check that %s is archived."%attribute) + if not self.extractor.IsArchived(attribute): + self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) + raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) # Cut data range in time chunks cdates = [dateStart] while cdates[-1] < dateStop: - cdates.append(cdates[-1]+timeinterval) + cdates.append(cdates[-1]+timeInterval) cdates[-1] = dateStop + mdates = np.asarray(cdates[:-1])+timeInterval/2 logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, dt)) # Prepare arrays @@ -365,81 +306,27 @@ class ArchiveExtractor: # For each time chunk for i_d in range(len(cdates)-1): - # Make requests - logger.debug("Perform GetAttDataMaxBetweenDates (%s, %s, %s)"%( - attr, - cdates[i_d].strftime(DBDFMT), - cdates[i_d+1].strftime(DBDFMT)) - ) - - ADB.GetAttDataMaxBetweenDates([ - attr, - cdates[i_d].strftime(DBDFMT), - cdates[i_d+1].strftime(DBDFMT) - ]) - - - ##---------------------------------------------------------------------------## - def query_ADB_NearestValue(attr, - dates, - extractor="archiving/TDBExtractor/4"): - """ - Query attribute data from an archiver database, get nearest points from dates. - Use GetNearestValue and perform multiple calls. - For each date in dates, it read the closest sampled value. - Return the real dates of the samples. - - Parameters - ---------- - attr : String - Name of the attribute. Full Tango name i.e. "test/dg/panda/current". - - dates : numpy.ndarray of datetime.datetime - Dates for extraction. - - extractor : String - Name of the DB Extractor device. - Default is "archiving/TDBExtractor/4" - - Exceptions - ---------- - ValueError - The attribute is not found in the database. - - Returns - ------- - [realdate, value] : array - realdate : numpy.ndarray of datetime.datime objects - Dates of the values - value : numpy.ndarray - Archived values - - """ - - # Device Proxy to DB - ADB = tango.DeviceProxy(extractor) - - # Give the DB extractor 3 seconds timeout - ADB.set_timeout_millis(3000) - - # Check that the attribute is in the database - if not ADB.IsArchived(attr): - raise ValueError("Attribute '%s' is not archived in DB %s"%(attr, extractor)) - - # Prepare arrays - value = np.empty(len(dates), dtype=float) - realdate = np.empty(len(dates), dtype=object) - - # Loop on dates - for i in range(len(dates)): - # Make retrieval + for func, arr in zip( + ["Max", "Min", "Avg"], + [value_max, value_min, value_mean], + ): + # Make requests + logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%( + func, + attribute, + cdates[i_d].strftime(DBDFMT2), + cdates[i_d+1].strftime(DBDFMT2)) + ) - answ = ADB.GetNearestValue([attr, dates[i].strftime(DBDFMT2)]) - answ = answ.split(";") + _val =getattr(self.extractor, "GetAttData%sBetweenDates"%func)([ + attribute, + cdates[i_d].strftime(DBDFMT2), + cdates[i_d+1].strftime(DBDFMT2) + ]) - realdate[i] = datetime.datetime.fromtimestamp(int(answ[0])/1000) - value[i] = answ[1] + arr[i_d] = _val - return [realdate, value] + self.logger.debug("Extraction done for %s."%attribute) + return [mdates, value_min, value_max, value_mean] diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..e69de29 -- GitLab