diff --git a/ArchiveExtractor.py b/ArchiveExtractor.py index 2f3fa644e3d48ee0a1027eb67e8164d3b4f50f20..2f604f471e92c42f13d7cf1c60c5114e38e615b7 100755 --- a/ArchiveExtractor.py +++ b/ArchiveExtractor.py @@ -38,6 +38,7 @@ _DBDFMT2 = "%d-%m-%Y %H:%M:%S" ########################################################################## # Vectorized fromtimestamp function +# NOTE: it is faster than using pandas.to_datetime() _ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp) def _check_initialized(): @@ -223,6 +224,7 @@ def _cmd_with_retry(dp, cmd, arg, retry=2): Whatever the command returns. None if failed after the amount of retries. """ + logger.info("Perform Command {} {}".format(cmd, arg)) for i in range(retry): # Make retrieval request @@ -292,6 +294,120 @@ def init( _AttrTables = tuple(e.getattnameall() for e in _extractors) logger.debug("HDB: {} TDB: {} attributes counted".format(len(_AttrTables[0]), len(_AttrTables[1]))) +########################################################################## +### Module access functions ### +########################################################################## + +def extract( + attr, + date1, date2=None, + method="nearest", + db='H', + ): + """ + Access function to perform extraction between date1 and date2. + Can extract one or several attributes. + date1 and date2 can be both exact date, or one of two can be a time interval that will be taken relative to the other. + + + Parameters: + ----------- + attr: string, list, dict + Attribute(s) to extract. + If string, extract the given attribute, returning a pandas.Series. + If list, extract attributes and return a list of pandas.Series. + If a dict, extract attributes and return a dict of pandas.Series with same keys. + + date1, date2: string, datetime.datetime, datetime.timedelta, None + Exact date, or duration relative to date2. + If string, it will be parsed. + A start date can be given with string format '%Y-%m-%d-%H:%M:%S' or less precise (ie '2021-02', '2022-11-03' '2022-05-10-21:00'.i..). + A duration can be given with string format 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months) + A datetime.datetime object or datetime.timedelta object will be used as is. + date2 can be None. In that case it is replaced by the current time. + + method: str + Method of extraction + 'nearest': Retrieve nearest value of date1, date2 is ignored. + 'between': Retrive data between date1 and date2. + + db: str + Which database to look in, 'H' or 'T'. + + """ + + ## _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_ + # Perform a few sanity checks + if not _check_initialized(): + # Stop here, the function has produced a message if necessary + return + + if not db in ("H", "T"): + raise ValueError("Attribute 'db' should be 'H' or 'T'") + + + allowedmethods=("nearest", "between", "minmaxmean") + if not method in allowedmethods: + raise ValueError("Attribute 'method' should be in {}".format(str(allowedmethods))) + + ## _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_ + # Work with dates + if not type(date1) in (datetime.datetime, datetime.timedelta): + date1 = _dateparse(date1) + if date2 is None: + date2 = datetime.datetime.now() + else: + if not type(date2) in (datetime.datetime, datetime.timedelta): + date2 = _dateparse(date2) + + if not datetime.datetime in (type(date1), type(date2)): + logger.error("One of date1 date2 should be an exact date.\nGot {} {}".format(date1, date2)) + raise ValueError("date1 and date2 not valid") + + # Use timedelta relative to the other date. date1 is always before date2 + if type(date1) is datetime.timedelta: + date1 = date2-date1 + if type(date2) is datetime.timedelta: + date2 = date1+date2 + + if date1 > date2: + logger.error("date1 must precede date2.\nGot {} {}".format(date1, date2)) + raise ValueError("date1 and date2 not valid") + + ## _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_ + # Perform extraction and return + + if type(attr) is dict: + d=dict() + for k,v in attr.items(): + try: + d.update({k:_extract_attribute(v, method, date1, date2, db)}) + except Exception as e: + logger.debug(str(e)) + logger.error("Could not extract {}.".format(v)) + + return d + + if type(attr) in (list,tuple): + d=[] + for v in attr: + try: + d.append(_extract_attribute(v, method, date1, date2, db)) + except Exception as e: + logger.debug(str(e)) + logger.error("Could not extract {}.".format(v)) + + return d + + try: + d=_extract_attribute(attr, method, date1, date2, db) + except Exception as e: + logger.debug(str(e)) + logger.error("Could not extract {}.".format(attr)) + return None + + return d + ##----------------------------------------------------------------------## def findattr(pattern, db="H"): @@ -367,65 +483,18 @@ def infoattr(attribute, db='H'): return info -##---------------------------------------------------------------------------## -def ExtrBetweenDates( - attribute, - dateStart, - dateStop=None, - db='H', - ): - """ - Query attribute data from an archiver database, get all points between dates. - Use ExtractBetweenDates. - - Parameters - ---------- - attribute : String - Name of the attribute. Full Tango name i.e. "test/dg/panda/current". - - dateStart : datetime.datetime, string - Start date for extraction. If string, it will be parsed. - Example of string format %Y-%m-%d-%H:%M:%S or less precise. - - dateStop : datetime.datetime, string, None - Stop date for extraction. - If string, it will be parsed. - Example of string format %Y-%m-%d-%H:%M:%S or less precise. - If None, it takes the current date and time. - Default is None (now). - - db: str - Which database to look in, 'H' or 'T'. - - Exceptions - ---------- - ValueError - The attribute is not found in the database. - - Returns - ------- - [date, value] : array - date : numpy.ndarray of datetime.datime objects - Dates of the values - value : numpy.ndarray - Archived values +########################################################################## +### Module core functions ### +########################################################################## +def _extract_attribute(attribute, method, date1, date2, db): + """ + Check if exists, check scalar or spectrum and dispatch """ - if not _check_initialized(): - return - - if not db in ("H", "T"): - raise AttributeError("Attribute db should be 'H' or 'T'") # Uncapitalize attribute attribute = attribute.lower() - - # Check attribute is in database - _check_attribute(attribute, db=db) - - # Parse dates - dateStart = _dateparse(dateStart) - dateStop = _dateparse(dateStop) + _check_attribute(attribute, db) # Get info about the attribute info=infoattr(attribute, db=db) @@ -443,93 +512,143 @@ def ExtrBetweenDates( attribute, info["max_dim_x"])) attrtype="vector" - # Cut the time horizon in chunks - cdates = _chunkerize(attribute, dateStart, dateStop, db) + # ============= + # For now we handle multi dimension the same way as scalar, which will get only the first element + if (attrtype=="scalar") or (attrtype=="multi"): + return _extract_scalar(attribute, method, date1, date2, db) + if attrtype=="vector": + return _extract_vector(attribute, method, date1, date2, db) - # Arrays to hold every chunks - value = [] - date = [] - # For each date chunk - for i_d in range(len(cdates)-1): +##---------------------------------------------------------------------------## +def _extract_scalar(attribute, method, date1, date2, db): - # ============= - # For now we handle multi dimension the same way as scalar, which will get only the first element - if (attrtype=="scalar") or (attrtype=="multi"): - # Inform on retrieval request - logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%( - attribute, - cdates[i_d].strftime(_DBDFMT), - cdates[i_d+1].strftime(_DBDFMT)) - ) + # ===================== + if method == "nearest": + cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "GetNearestValue", [ + attribute, + date1.strftime(_DBDFMT), + ]) + + # Unpack return + try: + _date, _value = cmdreturn + except TypeError: + logger.error("Could not extract this chunk. Check the device extractor") + return None + + # Fabricate return pandas.Series + d=pd.Series(index=[datetime.datetime.fromtimestamp(_date),], data=[_data,], name=attribute) + + return d + + # ===================== + if method == "between": + # Cut the time horizon in chunks + cdates = _chunkerize(attribute, dateStart, dateStop, db) + + # Array to hold data + data = [] + # For each date chunk + for i_d in range(len(cdates)-1): cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [ attribute, cdates[i_d].strftime(_DBDFMT), cdates[i_d+1].strftime(_DBDFMT) ]) - # Check command return - if cmdreturn is None: + + # Unpack return + try: + _date, _value = cmdreturn + except TypeError: logger.error("Could not extract this chunk. Check the device extractor") return None - # Unpack return - _date, _value = cmdreturn # Transform to datetime - value arrays - # NOTE: it is faster than using pandas.to_datetime() _value = np.asarray(_value, dtype=float) if len(_date) > 0: _date = _ArrayTimeStampToDatetime(_date/1000.0) - value.append(_value) - date.append(_date) + # Fabricate return pandas.Series + data.append(pd.Series(index=_date, data=_data,name=attribute)) - # ============= - if attrtype=="vector": - logger.info("Perform GetAttDataBetweenDates (%s, %s, %s)"%( - attribute, - cdates[i_d].strftime(_DBDFMT), - cdates[i_d+1].strftime(_DBDFMT) - )) + # Concatenate chunks + return pd.concat(data) - [N,], [name,] = _extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDates([ - attribute, - cdates[i_d].strftime(_DBDFMT), - cdates[i_d+1].strftime(_DBDFMT) - ]) - N=int(N) - - # Read the history - logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name)) - attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N) + # ======================== + if method == "minmaxmean": - # Transform to datetime - value arrays - _value = np.empty((N, int(info["max_dim_x"])), dtype=float) - _value[:] = np.nan - _date = np.empty(N, dtype=object) - for i_h in range(N): - _value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value - _date[i_h]=attrHist[i_h].time.todatetime() - - # Remove dynamic attribute - logger.debug("Remove dynamic attribute %s."%name) - _extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name) + # If we are here, the method is not implemented + logger.error("Method {} is not implemented for scalars.".format(method)) + raise NotImplemented +##---------------------------------------------------------------------------## +def _extract_vector(attribute, method, date1, date2, db): - value.append(_value) - date.append(_date) - - logger.debug("Concatenate chunks") - value = np.concatenate(value) - date = np.concatenate(date) + # Get info about the attribute + info=infoattr(attribute, db=db) - logger.debug("Extraction done for %s."%attribute) - if attrtype=="vector": - return pd.DataFrame(index=date, data=value).dropna(axis=1, how='all') - else: - return pd.Series(index=date, data=value) + # ===================== + if method == "nearest": + # Get nearest does not work with vector. + # Make a between date with surounding dates. + + # Dynamically find surounding + cnt=0 + dt=datetime.timedelta(seconds=10) + while cnt<1: + logger.debug("Seeking points in {} to {}".format(date1-dt,date1+dt)) + cnt=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([ + attribute, + (date1-dt).strftime(_DBDFMT2), + (date1+dt).strftime(_DBDFMT2) + ]) + dt=dt*1.5 + logger.debug("Found {} points in a +- {} interval".format(cnt,str(dt/1.5))) + + + # For vector, we have to use the GetAttxxx commands + cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "GetAttDataBetweenDates", [ + attribute, + (date1-dt).strftime(_DBDFMT), + (date1+dt).strftime(_DBDFMT), + ]) + + # Unpack return + try: + [N,], [name,] = cmdreturn + N=int(N) + except TypeError: + logger.error("Could not extract this attribute. Check the device extractor") + return None + + # Read the history + logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name)) + attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N) + + # Transform to datetime - value arrays + _value = np.empty((N, int(info["max_dim_x"])), dtype=float) + _value[:] = np.nan + _date = np.empty(N, dtype=object) + for i_h in range(N): + _value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value + _date[i_h]=attrHist[i_h].time.todatetime() + + # Seeking nearest entry + idx=np.argmin(abs(_date-date1)) + logger.debug("Found nearest value at index {}: {}".format(idx, _date[idx])) + + # Fabricate return pandas.Series + d=pd.Series(index=[_date[idx],], data=[_value[idx],], name=attribute) + + return d + + # If we are here, the method is not implemented + logger.error("Method {} is not implemented for vectors.".format(method)) + raise NotImplemented ##---------------------------------------------------------------------------##