Skip to content
Snippets Groups Projects
ArchiveExtractor.py 19.7 KiB
Newer Older
BRONES Romain's avatar
BRONES Romain committed
"""
Python module for extracting attribute from Arhive Extractor Device.
"""
import logging
import datetime
import numpy as np
import PyTango as tango
import pandas as pd
BRONES Romain's avatar
BRONES Romain committed

BRONES Romain's avatar
BRONES Romain committed

System User's avatar
System User committed
##########################################################################
###                 Install logger for the module                      ###
##########################################################################
logger = logging.getLogger(__name__)
#logger.setLevel(getattr(logging, logger.upper()))

if not logger.hasHandlers():
    # No handlers, create one
    sh = logging.StreamHandler()
    sh.setLevel(logger.level)
    sh.setFormatter(logging.Formatter("%(levelname)s:%(message)s"))
    logger.addHandler(sh)


##########################################################################
###               Commodity private variables                          ###
##########################################################################
BRONES Romain's avatar
BRONES Romain committed

System User's avatar
System User committed
# Extractor date format for GetAttDataBetweenDates
_DBDFMT = "%Y-%m-%d %H:%M:%S"
System User's avatar
System User committed
# Extractor date format for GetNearestValue
_DBDFMT2 = "%d-%m-%Y %H:%M:%S"

##########################################################################
###               Commodity private functions                          ###
##########################################################################
System User's avatar
System User committed
# Vectorized fromtimestamp function
_ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
BRONES Romain's avatar
BRONES Romain committed

def _check_initialized():
    """
    Check if the module is initialized.

    Returns
    -------
    success : boolean
    """
    global _extractors
    if None in _extractors:
        logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
        return False
    return True
##----------------------------------------------------------------------##
def _dateparse(datestr):
    """
    Convenient function to parse date strings.
    Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
BRONES Romain's avatar
BRONES Romain committed
    If datstr is None, take the actual date and time.

    Parameters
    ---------
    datestr : string
        Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.

    Exceptions
    ----------
    ValueError
        If the parsing failed.

    Returns
    -------
    date : datetime.datetime
        Parsed date
    """

BRONES Romain's avatar
BRONES Romain committed
    if datestr is None:
        return datetime.datetime.now()

    # This gives all format that will be tried, in order.
    # Stop on first parse success. Raise error if none succeed.
    fmt = [
        "%Y-%m-%d-%H:%M:%S",
        "%Y-%m-%d-%H:%M",
        "%Y-%m-%d-%H",
        "%Y-%m-%d",
        "%Y-%m",
        ]

    date = None
    for f in fmt:
        try:
            date = datetime.datetime.strptime(datestr, f)
        except ValueError:
            continue
        else:
            break
    else:
        raise ValueError("Could not parse argument to a date")

    return date

##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
    """
    Check that the attribute is in the database

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    db: str
        Which database to look in, 'H' or 'T'.
    """
    global _extractors

    logger.debug("Check that %s is archived."%attribute)
    if not _extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
        logger.error("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
        raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))

##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
    """

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    dateStart : datetime.datetime
        Start date for extraction.

    dateStop : datetime.datetime
        Stop date for extraction.

    db: str
        Which database to look in, 'H' or 'T'.

    Nmax: int
        Max number of atoms in one chunk. Default 100000.

    Returns
    -------
    cdates : list
        List of datetime giving the limit of each chunks.
        For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
    """
    info=infoattr(attribute, db=db)
    logger.debug("Attribute information \n%s"%info)

    # Get the number of points
    N=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
            attribute,
            dateStart.strftime(_DBDFMT2),
            dateStop.strftime(_DBDFMT2)
            ])
    logger.debug("On the period, there is %d entries"%N)

    dx=int(info["max_dim_x"])
    if dx > 1:
        logger.debug("Attribute is a vector with max dimension = %s"%dx)
        N=N*dx

    # If data chunk is too much, we need to cut it
    if N > Nmax:
        dt = (dateStop-dateStart)/(N//Nmax)
        cdates = [dateStart]
        while cdates[-1] < dateStop:
            cdates.append(cdates[-1]+dt)
        cdates[-1] = dateStop
        logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
    else:
        cdates=[dateStart, dateStop]

    return cdates

##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
    """
    Run a command on tango.DeviceProxy, retrying on DevFailed.

    Parameters
    ----------
    dp: tango.DeviceProxy
        Device proxy to try command onto.

    cmd : str
        Command to executte on the extractor

    arg : list
        Attribute to pass to the command

    retry : int
        Number of command retry on DevFailed

    Returns
    -------
    cmdreturn :
        Whatever the command returns.
        None if failed after the amount of retries.
    """

    for i in range(retry):
        # Make retrieval request
        logger.debug("Execute %s (%s)"%(cmd, arg))
        try:
            cmdreturn = getattr(dp, cmd)(arg)
        except tango.DevFailed as e:
            logger.warning("The extractor device returned the following error:")
            logger.warning(e)
            if  i == retry-1:
                logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
                return None
            logger.warning("Retrying...")
            continue
        break
    return cmdreturn

##########################################################################
###                  Module private variables                          ###
##########################################################################
# Tuple of extractor for HDB and TDB
_extractors = (None, None)

# Tuple for attribute tables
_AttrTables = (None, None)

##########################################################################
###                Module initialisation functions                     ###
##########################################################################
def init(
        HdbExtractorPath="archiving/hdbextractor/2",
        TdbExtractorPath="archiving/tdbextractor/2",
BRONES Romain's avatar
BRONES Romain committed
        loglevel="info",
            ):
    """
    Initialize the module.
    Instanciate tango.DeviceProxy for extractors (TDB and HDB)

        HdbExtractorPath, TdbExtractorPath: string
            Tango path to the extractors.
BRONES Romain's avatar
BRONES Romain committed

        loglevel: string
            loglevel to pass to logging.Logger
    """
    global _extractors
    global _AttrTables
    _extractors = (None, None)
    _AttrTables = (None, None)

BRONES Romain's avatar
BRONES Romain committed
    try:
        logger.setLevel(getattr(logging, loglevel.upper()))
    except AttributeError:
        logger.error("Wrong log level specified: {}".format(loglevel.upper()))

    logger.debug("Instanciating extractors device proxy...")

    _extractors = (tango.DeviceProxy(HdbExtractorPath), tango.DeviceProxy(TdbExtractorPath))
    logger.debug("{} and {} instanciated.".format(*_extractors))

    logger.debug("Configuring extractors device proxy...")
    for e in _extractors:
        # set timeout to 3 sec
        e.set_timeout_millis(3000)

    logger.debug("Filling attributes lookup tables...")
    _AttrTables = tuple(e.getattnameall() for e in _extractors)
    logger.debug("HDB: {} TDB: {} attributes counted".format(len(_AttrTables[0]), len(_AttrTables[1])))


##----------------------------------------------------------------------##
def findattr(pattern, db="H"):
    """
    Search for an attribute path using the pattern given.
    Case insensitive.

    Parameters:
    -----------
    pattern: str
        Pattern to search, wildchar * accepted.
        example "dg*dcct*current"

    db: str
        Which database to look in, 'H' or 'T'.

    Returns:
    --------
    results: (str,)
        List of string match
    """
    if not _check_initialized():
        return

    if not db in ("H", "T"):
        raise AttributeError("Attribute db should be 'H' or 'T'")

    global _AttrTables

    keywords=pattern.lower().split('*')

    # Select DB
    attr_table = _AttrTables[{'H':0, 'T':1}[db]]

    matches = [attr for attr in attr_table if all(k in attr.lower() for k in keywords)]

    return matches

##----------------------------------------------------------------------##
def infoattr(attribute, db='H'):
    """
    Get informations for an attribute and pack it into a python dict.

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    db: str
        Which database to look in, 'H' or 'T'.

    Returns
    -------
    info : dict
        Dictionnary of propertyname:propertyvalue
    """
    if not _check_initialized():
        return

    if not db in ("H", "T"):
        raise AttributeError("Attribute db should be 'H' or 'T'")

    info = dict()

    for func in ("GetAttDefinitionData", "GetAttPropertiesData"):
        R=getattr(_extractors[{'H':0, 'T':1}[db]], func)(attribute)
        if not R is None:
            for i in R:
                _s=i.split("::")
                info[_s[0]]=_s[1]
        else:
            logger.warning("Function %s on extractor returned None"%func)

    return info

##---------------------------------------------------------------------------##
def ExtrBetweenDates(
        attribute,
        dateStart,
        dateStop=None,
        db='H',
        ):
    """
    Query attribute data from an archiver database, get all points between dates.
    Use ExtractBetweenDates.

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    dateStart : datetime.datetime, string
        Start date for extraction. If string, it will be parsed.
        Example of string format %Y-%m-%d-%H:%M:%S or less precise.

    dateStop : datetime.datetime, string, None
        Stop date for extraction.
        If string, it will be parsed.
        Example of string format %Y-%m-%d-%H:%M:%S or less precise.
        If None, it takes the current date and time.
        Default is None (now).

    db: str
        Which database to look in, 'H' or 'T'.

    Exceptions
    ----------
    ValueError
        The attribute is not found in the database.

    Returns
    -------
    [date, value] : array
        date : numpy.ndarray of datetime.datime objects
            Dates of the values
        value : numpy.ndarray
            Archived values

    """
    if not _check_initialized():
        return

    if not db in ("H", "T"):
        raise AttributeError("Attribute db should be 'H' or 'T'")
    # Uncapitalize attribute
    attribute = attribute.lower()

    # Check attribute is in database
    _check_attribute(attribute, db=db)

BRONES Romain's avatar
BRONES Romain committed
    # Parse dates
    dateStart = _dateparse(dateStart)
    dateStop = _dateparse(dateStop)

    # Get info about the attribute
    info=infoattr(attribute, db=db)
    logger.debug("Attribute information \n%s"%info)

    # Detect spectrum
    attrtype="scalar"
    if int(info["max_dim_x"]) > 1:
        if int(info["max_dim_y"]) > 0:
            logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%(
                attribute, info["max_dim_x"], info["max_dim_y"]))
            attrtype="multi"
        else:
            logger.info("Attribute %s is a 1D vector, dimension = %s."%(
                attribute, info["max_dim_x"]))
            attrtype="vector"

    # Cut the time horizon in chunks
    cdates = _chunkerize(attribute, dateStart, dateStop, db)

    # Arrays to hold every chunks
    value = []
    date = []

    # For each date chunk
    for i_d in range(len(cdates)-1):

        # =============
        # For now we handle multi dimension the same way as scalar, which will get only the first element
        if (attrtype=="scalar") or (attrtype=="multi"):
            # Inform on retrieval request
            logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%(
                attribute,
                cdates[i_d].strftime(_DBDFMT),
                cdates[i_d+1].strftime(_DBDFMT))
                )

            cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [
                                                    attribute,
                                                    cdates[i_d].strftime(_DBDFMT),
                                                    cdates[i_d+1].strftime(_DBDFMT)
                                                    ])

            # Check command return
            if cmdreturn is None:
                logger.error("Could not extract this chunk. Check the device extractor")
                return None

            # Unpack return
            _date, _value = cmdreturn

            # Transform to datetime - value arrays
            # NOTE: it is faster than using pandas.to_datetime()
            _value = np.asarray(_value, dtype=float)
            if len(_date) > 0:
                _date = _ArrayTimeStampToDatetime(_date/1000.0)

            value.append(_value)
            date.append(_date)

        # =============
        if attrtype=="vector":
            logger.info("Perform GetAttDataBetweenDates (%s, %s, %s)"%(
                                                    attribute,
                                                    cdates[i_d].strftime(_DBDFMT),
                                                    cdates[i_d+1].strftime(_DBDFMT)
                                                    ))

            [N,], [name,] = _extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDates([
                attribute,
                cdates[i_d].strftime(_DBDFMT),
                cdates[i_d+1].strftime(_DBDFMT)
                ])
            N=int(N)

            # Read the history
            logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name))
            attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N)

            # Transform to datetime - value arrays
            _value = np.empty((N, int(info["max_dim_x"])), dtype=float)
            _value[:] = np.nan
            _date = np.empty(N, dtype=object)
            for i_h in range(N):
                _value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value
                _date[i_h]=attrHist[i_h].time.todatetime()

            # Remove dynamic attribute
            logger.debug("Remove dynamic attribute %s."%name)
            _extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name)


            value.append(_value)
            date.append(_date)

    logger.debug("Concatenate chunks")
    value = np.concatenate(value)
    date = np.concatenate(date)

    logger.debug("Extraction done for %s."%attribute)
    if attrtype=="vector":
        return pd.DataFrame(index=date, data=value).dropna(axis=1, how='all')
    else:
        return pd.Series(index=date, data=value)


BRONES Romain's avatar
BRONES Romain committed
##---------------------------------------------------------------------------##
def ExtrBetweenDates_MinMaxMean(
        attribute,
        dateStart,
        dateStop=None,
BRONES Romain's avatar
BRONES Romain committed
        timeInterval=datetime.timedelta(seconds=60),
        db='H',
        ):
    """
    Query attribute data from an archiver database, get all points between dates.
    Use ExtractBetweenDates.
BRONES Romain's avatar
BRONES Romain committed

BRONES Romain's avatar
BRONES Romain committed
    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
BRONES Romain's avatar
BRONES Romain committed
    dateStart : datetime.datetime, string
        Start date for extraction. If string, it will be parsed.
        Example of string format %Y-%m-%d-%H:%M:%S or less precise.
BRONES Romain's avatar
BRONES Romain committed
    dateStop : datetime.datetime, string
        Stop date for extraction. If string, it will be parsed.
        Example of string format %Y-%m-%d-%H:%M:%S or less precise.
        Default is now (datetime.datetime.now())
BRONES Romain's avatar
BRONES Romain committed
    timeInterval: datetime.timedelta, string
        Time interval used to perform min,max and mean.
        Can be a string with a number and a unit in "d", "h", "m" or "s"
BRONES Romain's avatar
BRONES Romain committed
    db: str
        Which database to look in, 'H' or 'T'.
BRONES Romain's avatar
BRONES Romain committed
    Exceptions
    ----------
    ValueError
        The attribute is not found in the database.
BRONES Romain's avatar
BRONES Romain committed
    Returns
    -------
    [mdates, value_min, value_max, value_mean] : array
        mdates : numpy.ndarray of datetime.datime objects
            Dates of the values, middle of timeInterval windows
        value_min : numpy.ndarray
            Minimum of the value on the interval
        value_max : numpy.ndarray
            Maximum of the value on the interval
        value_mean : numpy.ndarray
            Mean of the value on the interval
BRONES Romain's avatar
BRONES Romain committed
    """
    if not _check_initialized():
        return
BRONES Romain's avatar
BRONES Romain committed
    if not db in ("H", "T"):
        raise AttributeError("Attribute db should be 'H' or 'T'")
BRONES Romain's avatar
BRONES Romain committed
    # Uncapitalize attribute
    attribute = attribute.lower()
BRONES Romain's avatar
BRONES Romain committed
    # Check attribute is in database
    _check_attribute(attribute, db=db)
BRONES Romain's avatar
BRONES Romain committed
    # Parse dates
    dateStart = _dateparse(dateStart)
    dateStop = _dateparse(dateStop)
BRONES Romain's avatar
BRONES Romain committed
    # Parse timeInterval if string
    if type(timeInterval) is str:
        try:
            mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]]
        except KeyError:
            logger.error("timeInterval could not be parsed")
            raise ValueError("timeInterval could not be parsed")
        timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul)
BRONES Romain's avatar
BRONES Romain committed
    # Get info about the attribute
    info=infoattr(attribute)
    logger.debug("Attribute information \n%s"%info)
BRONES Romain's avatar
BRONES Romain committed
    # Detect spectrum
    attrtype="scalar"
    if int(info["max_dim_x"]) > 1:
        logger.error("Attribute is not a scalar. Cannot perform this kind of operation.")
BRONES Romain's avatar
BRONES Romain committed
        return None

    # Cut data range in time chunks
    cdates = [dateStart]
    while cdates[-1] < dateStop:
        cdates.append(cdates[-1]+timeInterval)
    cdates[-1] = dateStop
    mdates = np.asarray(cdates[:-1])+timeInterval/2
    logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval))
BRONES Romain's avatar
BRONES Romain committed

    # Prepare arrays
    value_min = np.empty(len(cdates)-1)
    value_max = np.empty(len(cdates)-1)
    value_mean = np.empty(len(cdates)-1)

    # For each time chunk
    for i_d in range(len(cdates)-1):
        for func, arr in zip(
                ["Max", "Min", "Avg"],
                [value_max, value_min, value_mean],
                ):
            # Make requests
            logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%(
BRONES Romain's avatar
BRONES Romain committed
                func,
                attribute,
                cdates[i_d].strftime(_DBDFMT2),
                cdates[i_d+1].strftime(_DBDFMT2))
                )
            _val =getattr(_extractors[{'H':0, 'T':1}[db]], "GetAttData%sBetweenDates"%func)([
BRONES Romain's avatar
BRONES Romain committed
                attribute,
                cdates[i_d].strftime(_DBDFMT2),
                cdates[i_d+1].strftime(_DBDFMT2)
                ])
BRONES Romain's avatar
BRONES Romain committed
            arr[i_d] = _val
    logger.debug("Extraction done for %s."%attribute)
    return pd.DataFrame(
            index=mdates,
            data={
                "Min":value_min,
                "Mean":value_mean,
                "Max":value_max,
                },)