From 690712872f78c86fa00e2038ff1bfecd6fec4a17 Mon Sep 17 00:00:00 2001 From: Romain Broucquart <romain.broucquart@synchrotron-soleil.fr> Date: Wed, 16 Nov 2022 17:42:33 +0100 Subject: [PATCH] WIP move from class to module --- core/ArchiveExtractor.py | 373 ++++++++++++++++++++++++++++++++++++++- env_tango.py | 2 +- pyprompt.ipy | 2 +- 3 files changed, 373 insertions(+), 4 deletions(-) diff --git a/core/ArchiveExtractor.py b/core/ArchiveExtractor.py index ba40a98..0f5256d 100755 --- a/core/ArchiveExtractor.py +++ b/core/ArchiveExtractor.py @@ -42,13 +42,19 @@ _ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp) def _check_initialized(): """ - Check if the module is initialized, raise exception RuntimeError if not. + Check if the module is initialized. + + Returns + ------- + success : boolean """ global _extractors if None in _extractors: logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__)) - raise RuntimeError("Module not initialized") + return False + return True +##----------------------------------------------------------------------## def _dateparse(datestr): """ Convenient function to parse date strings. @@ -93,6 +99,124 @@ def _dateparse(datestr): return date +##----------------------------------------------------------------------## +def _check_attribute(attribute, db): + """ + Check that the attribute is in the database + + Parameters + ---------- + attribute : String + Name of the attribute. Full Tango name i.e. "test/dg/panda/current". + + db: str + Which database to look in, 'H' or 'T'. + """ + global _extractors + + logger.debug("Check that %s is archived."%attribute) + if not _extractors[{'H':0, 'T':1}[db]].IsArchived(attribute): + logger.error("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]])) + raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]])) + +##----------------------------------------------------------------------## +def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000): + """ + + Parameters + ---------- + attribute : String + Name of the attribute. Full Tango name i.e. "test/dg/panda/current". + + dateStart : datetime.datetime + Start date for extraction. + + dateStop : datetime.datetime + Stop date for extraction. + + db: str + Which database to look in, 'H' or 'T'. + + Nmax: int + Max number of atoms in one chunk. Default 100000. + + Returns + ------- + cdates : list + List of datetime giving the limit of each chunks. + For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included. + """ + info=infoattr(attribute, db=db) + logger.debug("Attribute information \n%s"%info) + + # Get the number of points + N=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([ + attribute, + dateStart.strftime(_DBDFMT2), + dateStop.strftime(_DBDFMT2) + ]) + logger.debug("On the period, there is %d entries"%N) + + dx=int(info["max_dim_x"]) + if dx > 1: + logger.debug("Attribute is a vector with max dimension = %s"%dx) + N=N*dx + + # If data chunk is too much, we need to cut it + if N > Nmax: + dt = (dateStop-dateStart)/(N//Nmax) + cdates = [dateStart] + while cdates[-1] < dateStop: + cdates.append(cdates[-1]+dt) + cdates[-1] = dateStop + logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt)) + else: + cdates=[dateStart, dateStop] + + return cdates + +##----------------------------------------------------------------------## +def _cmd_with_retry(dp, cmd, arg, retry=2): + """ + Run a command on tango.DeviceProxy, retrying on DevFailed. + + Parameters + ---------- + dp: tango.DeviceProxy + Device proxy to try command onto. + + cmd : str + Command to executte on the extractor + + arg : list + Attribute to pass to the command + + retry : int + Number of command retry on DevFailed + + Returns + ------- + cmdreturn : + Whatever the command returns. + None if failed after the amount of retries. + """ + + for i in range(retry): + # Make retrieval request + logger.debug("Execute %s (%s)"%(cmd, arg)) + try: + cmdreturn = getattr(dp, cmd)(arg) + except tango.DevFailed as e: + logger.warning("The extractor device returned the following error:") + logger.warning(e) + if i == retry-1: + logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg)) + return None + logger.warning("Retrying...") + continue + break + return cmdreturn + ########################################################################## ### Module private variables ### @@ -106,6 +230,7 @@ _AttrTables = (None, None) ########################################################################## ### Module initialisation functions ### ########################################################################## + def init( HdbExtractorPath="archiving/hdbextractor/2", TdbExtractorPath="archiving/tdbextractor/2", @@ -118,6 +243,7 @@ def init( Tango path to the extractors. """ global _extractors + global _AttrTables logger.debug("Instanciating extractors device proxy...") @@ -134,6 +260,249 @@ def init( logger.debug("HDB: {} TDB: {} attributes counted".format(len(_AttrTables[0]), len(_AttrTables[1]))) +##----------------------------------------------------------------------## +def findattr(pattern, db="H"): + """ + Search for an attribute path using the pattern given. + Case insensitive. + + Parameters: + ----------- + pattern: str + Pattern to search, wildchar * accepted. + example "dg*dcct*current" + + db: str + Which database to look in, 'H' or 'T'. + + Returns: + -------- + results: (str,) + List of string match + """ + if not _check_initialized(): + return + + if not db in ("H", "T"): + raise AttributeError("Attribute db should be 'H' or 'T'") + + global _AttrTables + + keywords=pattern.lower().split('*') + + # Select DB + attr_table = _AttrTables[{'H':0, 'T':1}[db]] + + matches = [attr for attr in attr_table if all(k in attr.lower() for k in keywords)] + + return matches + +##----------------------------------------------------------------------## +def infoattr(attribute, db='H'): + """ + Get informations for an attribute and pack it into a python dict. + + Parameters + ---------- + attribute : String + Name of the attribute. Full Tango name i.e. "test/dg/panda/current". + + db: str + Which database to look in, 'H' or 'T'. + + Returns + ------- + info : dict + Dictionnary of propertyname:propertyvalue + """ + if not _check_initialized(): + return + + if not db in ("H", "T"): + raise AttributeError("Attribute db should be 'H' or 'T'") + + info = dict() + + for func in ("GetAttDefinitionData", "GetAttPropertiesData"): + R=getattr(_extractors[{'H':0, 'T':1}[db]], func)(attribute) + if not R is None: + for i in R: + _s=i.split("::") + info[_s[0]]=_s[1] + else: + logger.warning("Function %s on extractor returned None"%func) + + return info + +##---------------------------------------------------------------------------## +def ExtrBetweenDates( + attribute, + dateStart, + dateStop=None, + db='H', + ): + """ + Query attribute data from an archiver database, get all points between dates. + Use ExtractBetweenDates. + + Parameters + ---------- + attribute : String + Name of the attribute. Full Tango name i.e. "test/dg/panda/current". + + dateStart : datetime.datetime, string + Start date for extraction. If string, it will be parsed. + Example of string format %Y-%m-%d-%H:%M:%S or less precise. + + dateStop : datetime.datetime, string, None + Stop date for extraction. + If string, it will be parsed. + Example of string format %Y-%m-%d-%H:%M:%S or less precise. + If None, it takes the current date and time. + Default is None (now). + + db: str + Which database to look in, 'H' or 'T'. + + Exceptions + ---------- + ValueError + The attribute is not found in the database. + + Returns + ------- + [date, value] : array + date : numpy.ndarray of datetime.datime objects + Dates of the values + value : numpy.ndarray + Archived values + + """ + if not _check_initialized(): + return + + if not db in ("H", "T"): + raise AttributeError("Attribute db should be 'H' or 'T'") + + # Parse date if it is string + if type(dateStart) is str: + dateStart = _dateparse(dateStart) + if dateStop is None: + dateStop = datetime.datetime.now() + if type(dateStop) is str: + dateStop = _dateparse(dateStop) + + # Uncapitalize attribute + attribute = attribute.lower() + + # Check attribute is in database + _check_attribute(attribute, db=db) + + # Get info about the attribute + info=infoattr(attribute, db=db) + logger.debug("Attribute information \n%s"%info) + + # Detect spectrum + attrtype="scalar" + if int(info["max_dim_x"]) > 1: + if int(info["max_dim_y"]) > 0: + logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%( + attribute, info["max_dim_x"], info["max_dim_y"])) + attrtype="multi" + else: + logger.info("Attribute %s is a 1D vector, dimension = %s."%( + attribute, info["max_dim_x"])) + attrtype="vector" + + # Cut the time horizon in chunks + cdates = _chunkerize(attribute, dateStart, dateStop, db) + + # Arrays to hold every chunks + value = [] + date = [] + + # For each date chunk + for i_d in range(len(cdates)-1): + + # ============= + # For now we handle multi dimension the same way as scalar, which will get only the first element + if (attrtype=="scalar") or (attrtype=="multi"): + # Inform on retrieval request + logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%( + attribute, + cdates[i_d].strftime(_DBDFMT), + cdates[i_d+1].strftime(_DBDFMT)) + ) + + cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [ + attribute, + cdates[i_d].strftime(_DBDFMT), + cdates[i_d+1].strftime(_DBDFMT) + ]) + + # Check command return + if cmdreturn is None: + logger.error("Could not extract this chunk. Check the device extractor") + return None + + # Unpack return + _date, _value = cmdreturn + + # Transform to datetime - value arrays + # NOTE: it is faster than using pandas.to_datetime() + _value = np.asarray(_value, dtype=float) + if len(_date) > 0: + _date = _ArrayTimeStampToDatetime(_date/1000.0) + + value.append(_value) + date.append(_date) + + # ============= + if attrtype=="vector": + logger.info("Perform GetAttDataBetweenDates (%s, %s, %s)"%( + attribute, + cdates[i_d].strftime(_DBDFMT), + cdates[i_d+1].strftime(_DBDFMT) + )) + + [N,], [name,] = _extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDates([ + attribute, + cdates[i_d].strftime(_DBDFMT), + cdates[i_d+1].strftime(_DBDFMT) + ]) + N=int(N) + + # Read the history + logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name)) + attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N) + + # Transform to datetime - value arrays + _value = np.empty((N, int(info["max_dim_x"])), dtype=float) + _value[:] = np.nan + _date = np.empty(N, dtype=object) + for i_h in range(N): + _value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value + _date[i_h]=attrHist[i_h].time.todatetime() + + # Remove dynamic attribute + logger.debug("Remove dynamic attribute %s."%name) + _extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name) + + + value.append(_value) + date.append(_date) + + logger.debug("Concatenate chunks") + value = np.concatenate(value) + date = np.concatenate(date) + + logger.debug("Extraction done for %s."%attribute) + if attrtype=="vector": + return pd.DataFrame(index=date, data=value).dropna(axis=1, how='all') + else: + return pd.Series(index=date, data=value) + + class ArchiveExtractor: diff --git a/env_tango.py b/env_tango.py index abbf7de..3e5ef00 100644 --- a/env_tango.py +++ b/env_tango.py @@ -1,4 +1,4 @@ -import PyTango as tango +import tango import re # ============================================================================= diff --git a/pyprompt.ipy b/pyprompt.ipy index 58b649c..15158ec 100644 --- a/pyprompt.ipy +++ b/pyprompt.ipy @@ -7,7 +7,7 @@ # Import usefull packages -import PyTango as tango +import tango import numpy as np import OpUtils as OU print("Imported: tango, np, OU") -- GitLab