import logging import datetime import numpy as np import pandas as pd import ArchiveExtractor as ae import ArchiveExtractor.Amenities as aea # Get the module logger logger = logging.getLogger("ArchiveExtractor") ########################################################################## ### Module core functions ### ########################################################################## def _extract_attribute(attribute, method, date1, date2, db): """ Check if exists, check scalar or spectrum and dispatch """ # Uncapitalize attribute attribute = attribute.lower() aea._check_attribute(attribute, db) # Get info about the attribute info=ae.infoattr(attribute, db=db) logger.debug("Attribute information \n%s"%info) # Detect spectrum attrtype="scalar" if int(info["max_dim_x"]) > 1: if int(info["max_dim_y"]) > 0: logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%( attribute, info["max_dim_x"], info["max_dim_y"])) attrtype="multi" else: logger.info("Attribute %s is a 1D vector, dimension = %s."%( attribute, info["max_dim_x"])) attrtype="vector" # ============= # For now we handle multi dimension the same way as scalar, which will get only the first element if (attrtype=="scalar") or (attrtype=="multi"): if info["data_type"] == '1': # Boolean data type, quick fix dtype=bool else: dtype=float return _extract_scalar(attribute, method, date1, date2, db, dtype) if attrtype=="vector": return _extract_vector(attribute, method, date1, date2, db) ##---------------------------------------------------------------------------## def _extract_scalar(attribute, method, date1, date2, db, dtype): # ===================== if method == "nearest": cmdreturn = aea._cmd_with_retry(ae._Extractors[{'H':0, 'T':1}[db]], "GetNearestValue", [ attribute, date1.strftime(aea._DBDFMT), ]) # Unpack return try: _date, _value = cmdreturn.split(';') except TypeError: logger.error("Could not extract this chunk. Check the device extractor") return None # Transform by datatype if dtype is bool: _value = _cast_bool(_value) # Fabricate return pandas.Series d=pd.Series(index=[datetime.datetime.fromtimestamp(int(_date)/1000),], data=[_value,], name=attribute) return d # ===================== if method == "between": # Cut the time horizon in chunks cdates = aea._chunkerize(attribute, date1, date2, db) # Array to hold data data = [] # For each date chunk for i_d in range(len(cdates)-1): cmdreturn = aea._cmd_with_retry(ae._Extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [ attribute, cdates[i_d].strftime(aea._DBDFMT), cdates[i_d+1].strftime(aea._DBDFMT) ]) # Unpack return try: _date, _value = cmdreturn except TypeError: logger.error("Could not extract this chunk. Check the device extractor") return None # Transform to datetime - value arrays if dtype is bool: _value = aea._cast_bool(_value) else: _value = np.asarray(_value, dtype=dtype) if len(_date) > 0: _date = aea._ArrayTimeStampToDatetime(_date/1000.0) # Fabricate return pandas.Series data.append(pd.Series(index=_date, data=_value, name=attribute)) # Concatenate chunks return pd.concat(data) # ======================== if method == "minmaxmean": pass # If we are here, the method is not implemented logger.error("Method {} is not implemented for scalars.".format(method)) raise NotImplemented ##---------------------------------------------------------------------------## def _extract_vector(attribute, method, date1, date2, db): # Get info about the attribute info=ae.infoattr(attribute, db=db) # ===================== if method == "nearest": # Get nearest does not work with vector. # Make a between date with surounding dates. # Dynamically find surounding cnt=0 dt=datetime.timedelta(seconds=10) while cnt<1: logger.debug("Seeking points in {} to {}".format(date1-dt,date1+dt)) cnt=ae._Extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([ attribute, (date1-dt).strftime(aea._DBDFMT2), (date1+dt).strftime(aea._DBDFMT2) ]) dt=dt*1.5 logger.debug("Found {} points in a +- {} interval".format(cnt,str(dt/1.5))) # For vector, we have to use the GetAttxxx commands cmdreturn = aea._cmd_with_retry(ae._Extractors[{'H':0, 'T':1}[db]], "GetAttDataBetweenDates", [ attribute, (date1-dt).strftime(aea._DBDFMT), (date1+dt).strftime(aea._DBDFMT), ]) # Unpack return try: [N,], [name,] = cmdreturn N=int(N) except TypeError: logger.error("Could not extract this attribute. Check the device extractor") return None # Read the history logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name)) attrHist = ae._Extractors[{'H':0, 'T':1}[db]].attribute_history(name, N) ae._Extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name) # Transform to datetime - value arrays mx = min(int(info["max_dim_x"]), 2048) # Quick fix: Crop dimension _value = np.empty((N, mx), dtype=float) _value[:] = np.nan _date = np.empty(N, dtype=object) for i_h in range(N): _value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value _date[i_h]=attrHist[i_h].time.todatetime() # Seeking nearest entry idx=np.argmin(abs(_date-date1)) logger.debug("Found nearest value at index {}: {}".format(idx, _date[idx])) # Fabricate return pandas.Series, droping empty columns d=pd.Series(index=[_date[idx],], data=[_value[idx],], name=attribute).dropna(axis=1, how='all') return d # ===================== if method == "between": # Cut the time horizon in chunks cdates = aea._chunkerize(attribute, date1, date2, db) # Array to hold data data = [] # For each date chunk for i_d in range(len(cdates)-1): cmdreturn = aea._cmd_with_retry(ae._Extractors[{'H':0, 'T':1}[db]], "GetAttDataBetweenDates", [ attribute, cdates[i_d].strftime(aea._DBDFMT), cdates[i_d+1].strftime(aea._DBDFMT) ]) # Unpack return try: [N,], [name,] = cmdreturn N=int(N) except TypeError: logger.error("Could not extract this attribute. Check the device extractor") return None # Read the history logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name)) attrHist = ae._Extractors[{'H':0, 'T':1}[db]].attribute_history(name, N) ae._Extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name) # Transform to datetime - value arrays mx = min(int(info["max_dim_x"]), 2048) # Quick fix: Crop dimension _value = np.empty((N, mx), dtype=float) _value[:] = np.nan _date = np.empty(N, dtype=object) for i_h in range(N): _value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value _date[i_h]=attrHist[i_h].time.todatetime() # Fabricate return pandas.Series data.append(pd.DataFrame(index=_date, data=_value)) # Concatenate chunks, dropping empty columns return pd.concat(data).dropna(axis=1, how='all') # ======================== if method == "minmaxmean": pass # If we are here, the method is not implemented logger.error("Method {} is not implemented for vectors.".format(method)) raise NotImplemented ##---------------------------------------------------------------------------## def ExtrBetweenDates_MinMaxMean( attribute, dateStart, dateStop=None, timeInterval=datetime.timedelta(seconds=60), db='H', ): """ Query attribute data from an archiver database, get all points between dates. Use ExtractBetweenDates. Parameters ---------- attribute : String Name of the attribute. Full Tango name i.e. "test/dg/panda/current". dateStart : datetime.datetime, string Start date for extraction. If string, it will be parsed. Example of string format %Y-%m-%d-%H:%M:%S or less precise. dateStop : datetime.datetime, string Stop date for extraction. If string, it will be parsed. Example of string format %Y-%m-%d-%H:%M:%S or less precise. Default is now (datetime.datetime.now()) timeInterval: datetime.timedelta, string Time interval used to perform min,max and mean. Can be a string with a number and a unit in "d", "h", "m" or "s" db: str Which database to look in, 'H' or 'T'. Exceptions ---------- ValueError The attribute is not found in the database. Returns ------- [mdates, value_min, value_max, value_mean] : array mdates : numpy.ndarray of datetime.datime objects Dates of the values, middle of timeInterval windows value_min : numpy.ndarray Minimum of the value on the interval value_max : numpy.ndarray Maximum of the value on the interval value_mean : numpy.ndarray Mean of the value on the interval """ if not _check_initialized(): return if not db in ("H", "T"): raise AttributeError("Attribute db should be 'H' or 'T'") # Uncapitalize attribute attribute = attribute.lower() # Check attribute is in database _check_attribute(attribute, db=db) # Parse dates dateStart = _dateparse(dateStart) dateStop = _dateparse(dateStop) # Parse timeInterval if string if type(timeInterval) is str: try: mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]] except KeyError: logger.error("timeInterval could not be parsed") raise ValueError("timeInterval could not be parsed") timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul) # Get info about the attribute info=infoattr(attribute) logger.debug("Attribute information \n%s"%info) # Detect spectrum attrtype="scalar" if int(info["max_dim_x"]) > 1: logger.error("Attribute is not a scalar. Cannot perform this kind of operation.") return None # Cut data range in time chunks cdates = [dateStart] while cdates[-1] < dateStop: cdates.append(cdates[-1]+timeInterval) cdates[-1] = dateStop mdates = np.asarray(cdates[:-1])+timeInterval/2 logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval)) # Prepare arrays value_min = np.empty(len(cdates)-1) value_max = np.empty(len(cdates)-1) value_mean = np.empty(len(cdates)-1) # For each time chunk for i_d in range(len(cdates)-1): for func, arr in zip( ["Max", "Min", "Avg"], [value_max, value_min, value_mean], ): # Make requests logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%( func, attribute, cdates[i_d].strftime(_DBDFMT2), cdates[i_d+1].strftime(_DBDFMT2)) ) _val =getattr(ae._Extractors[{'H':0, 'T':1}[db]], "GetAttData%sBetweenDates"%func)([ attribute, cdates[i_d].strftime(_DBDFMT2), cdates[i_d+1].strftime(_DBDFMT2) ]) arr[i_d] = _val logger.debug("Extraction done for %s."%attribute) return pd.DataFrame( index=mdates, data={ "Min":value_min, "Mean":value_mean, "Max":value_max, },)