Skip to content
Snippets Groups Projects
Amenities.py 8.12 KiB
Newer Older
import logging
import datetime
import numpy as np
import ArchiveExtractor as ae
BRONES Romain's avatar
BRONES Romain committed
import tango
# Get the module logger
logger = logging.getLogger("ArchiveExtractor")

##########################################################################
###               Commodity private variables                          ###
##########################################################################

# Extractor date format for GetAttDataBetweenDates
_DBDFMT = "%Y-%m-%d %H:%M:%S"

# Extractor date format for GetNearestValue
_DBDFMT2 = "%d-%m-%Y %H:%M:%S"

##########################################################################
###               Commodity private functions                          ###
##########################################################################

# Vectorized fromtimestamp function
# NOTE: it is faster than using pandas.to_datetime()
_ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)

# Vectorized bool map dictionnary
_ArrayStr2Bool = np.vectorize({
    "true":True, 't':True,
    "false":False, 'f':False,
    }.get)


def _check_initialized():
    """
    Check if the module is initialized.

    Returns
    -------
    success : boolean
    """
    if None in ae._Extractors:
        logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
        return False
    return True

##----------------------------------------------------------------------##
def _dateparse(datestr):
    """
    Convenient function to parse date or duration strings.
    Exact date format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
    Duration format is 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
    If datstr is None, take the actual date and time.

    Parameters
    ---------
    datestr : string
        Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
        Duration as a string, format 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)

    Exceptions
    ----------
    ValueError
        If the parsing failed.

    Returns
    -------
    date : datetime.datetime or datetime.timedelta
        Parsed date or duration
    """
    logger.debug("Parsing date string '%s'"%datestr)

    # Determine date/duration by looking at the last char
    if datestr[-1] in "mhdM":
        # Duration
        logger.debug("Assuming a duration")

        try:
            q=float(datestr[:-1])
        except ValueError as e:
            logger.error("Failed to parse date string. Given the last character, a duration was assumed.")
            raise Exception("Could not parse argument to a date") from e

        # Convert all in minutes
        minutes = q*{'m':1, 'h':60, 'd':60*24, 'M':30*60*24}[datestr[-1]]

        return datetime.timedelta(minutes=minutes)

    else:
        # Probably a date string

        # This gives all format that will be tried, in order.
        # Stop on first parse success. Raise error if none succeed.
        fmt = [
            "%Y-%m-%d-%H:%M:%S",
            "%Y-%m-%d-%H:%M",
            "%Y-%m-%d-%H",
            "%Y-%m-%d",
            "%Y-%m",
            ]

        date = None
        for f in fmt:
            try:
                date = datetime.datetime.strptime(datestr, f)
            except ValueError:
                continue
            else:
                break
        else:
            raise ValueError("Could not parse argument to a date")

        return date

##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
    """
    Check that the attribute is in the database

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    db: str
        Which database to look in, 'H' or 'T'.
    """

    logger.debug("Check that %s is archived."%attribute)
    if not ae._Extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
        logger.error("Attribute '%s' is not archived in DB %s"%(attribute, ae._Extractors[{'H':0, 'T':1}[db]]))
        raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, ae._Extractors[{'H':0, 'T':1}[db]]))

##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
    """

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    dateStart : datetime.datetime
        Start date for extraction.

    dateStop : datetime.datetime
        Stop date for extraction.

    db: str
        Which database to look in, 'H' or 'T'.

    Nmax: int
        Max number of atoms in one chunk. Default 100000.

    Returns
    -------
    cdates : list
        List of datetime giving the limit of each chunks.
        For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
    """
    info=ae.infoattr(attribute, db=db)
    logger.debug("Attribute information \n%s"%info)

    # Get the number of points
    N=ae._Extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
            attribute,
            dateStart.strftime(_DBDFMT2),
            dateStop.strftime(_DBDFMT2)
            ])
    logger.debug("On the period, there is %d entries"%N)

    dx=int(info["max_dim_x"])
    if dx > 1:
        logger.debug("Attribute is a vector with max dimension = %s"%dx)

        # Quick fix : max dimension is not always used. Cap it to a few hundreds
        N=N*min(dx,2048)
        if dx > 2048:
            logger.warning("Attribute vector has max dimension above 2048, assume no more than this limit for chunkerize.")

    # If data chunk is too much, we need to cut it
    if N > Nmax:
        dt = (dateStop-dateStart)/(N//Nmax)
        cdates = [dateStart]
        while cdates[-1] < dateStop:
            cdates.append(cdates[-1]+dt)
        cdates[-1] = dateStop
        logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
    else:
        cdates=[dateStart, dateStop]

    return cdates

##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
    """
    Run a command on tango.DeviceProxy, retrying on DevFailed.

    Parameters
    ----------
    dp: tango.DeviceProxy
        Device proxy to try command onto.

    cmd : str
        Command to executte on the extractor

    arg : list
        Attribute to pass to the command

    retry : int
        Number of command retry on DevFailed

    Returns
    -------
    cmdreturn :
        Whatever the command returns.
        None if failed after the amount of retries.
    """
    logger.info("Perform Command {} {}".format(cmd, arg))

    for i in range(retry):
        # Make retrieval request
        logger.debug("Execute %s (%s)"%(cmd, arg))
        try:
            cmdreturn = getattr(dp, cmd)(arg)
        except tango.DevFailed as e:
            logger.warning("The extractor device returned a DevFailed")
            logger.debug(e)
            if  i == retry-1:
                logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
                return None
            logger.warning("Retrying...")
            continue
        break
    return cmdreturn


def _cast_bool(value):
    """
    Cast a value, or array of values, to boolean.
    Try to assess the input data type. If string, then try to find true or false word inside.

    Parameters:
    -----------
    value: string, integer, or array of such
        value to convert.

    Return:
    boolean:
        value or array of boolean.
    """

    # Force to array
    value = np.asarray(value)

    # cast back to single value
    def castback(v):
        if v.shape == ():
            return v.item()
        return v

    # Simply try to cast to bool first
    try:
        value = value.astype("bool")
        logger.debug("Direct conversion to boolean")
        return castback(value)
    except ValueError:
        # Keep trying to cast
        pass

    logger.debug("Try to convert to boolean")

    value = np.char.strip(np.char.lower(value))
    value = _ArrayStr2Bool(value)

    return castback(value)