Skip to content
Snippets Groups Projects
Amenities.py 7.77 KiB
Newer Older


##########################################################################
###               Commodity private variables                          ###
##########################################################################

# Extractor date format for GetAttDataBetweenDates
_DBDFMT = "%Y-%m-%d %H:%M:%S"

# Extractor date format for GetNearestValue
_DBDFMT2 = "%d-%m-%Y %H:%M:%S"

##########################################################################
###               Commodity private functions                          ###
##########################################################################

# Vectorized fromtimestamp function
# NOTE: it is faster than using pandas.to_datetime()
_ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)

# Vectorized bool map dictionnary
_ArrayStr2Bool = np.vectorize({
    "true":True, 't':True,
    "false":False, 'f':False,
    }.get)


def _check_initialized():
    """
    Check if the module is initialized.

    Returns
    -------
    success : boolean
    """
    global _extractors
    if None in _extractors:
        logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
        return False
    return True

##----------------------------------------------------------------------##
def _dateparse(datestr):
    """
    Convenient function to parse date or duration strings.
    Exact date format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
    Duration format is 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
    If datstr is None, take the actual date and time.

    Parameters
    ---------
    datestr : string
        Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
        Duration as a string, format 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)

    Exceptions
    ----------
    ValueError
        If the parsing failed.

    Returns
    -------
    date : datetime.datetime or datetime.timedelta
        Parsed date or duration
    """
    logger.debug("Parsing date string '%s'"%datestr)

    # Determine date/duration by looking at the last char
    if datestr[-1] in "mhdM":
        # Duration
        logger.debug("Assuming a duration")

        try:
            q=float(datestr[:-1])
        except ValueError as e:
            logger.error("Failed to parse date string. Given the last character, a duration was assumed.")
            raise Exception("Could not parse argument to a date") from e

        # Convert all in minutes
        minutes = q*{'m':1, 'h':60, 'd':60*24, 'm':30*60*24}[datestr[-1]]

        return datetime.timedelta(minutes=minutes)

    else:
        # Probably a date string

        # This gives all format that will be tried, in order.
        # Stop on first parse success. Raise error if none succeed.
        fmt = [
            "%Y-%m-%d-%H:%M:%S",
            "%Y-%m-%d-%H:%M",
            "%Y-%m-%d-%H",
            "%Y-%m-%d",
            "%Y-%m",
            ]

        date = None
        for f in fmt:
            try:
                date = datetime.datetime.strptime(datestr, f)
            except ValueError:
                continue
            else:
                break
        else:
            raise ValueError("Could not parse argument to a date")

        return date

##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
    """
    Check that the attribute is in the database

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    db: str
        Which database to look in, 'H' or 'T'.
    """
    global _extractors

    logger.debug("Check that %s is archived."%attribute)
    if not _extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
        logger.error("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
        raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))

##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
    """

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    dateStart : datetime.datetime
        Start date for extraction.

    dateStop : datetime.datetime
        Stop date for extraction.

    db: str
        Which database to look in, 'H' or 'T'.

    Nmax: int
        Max number of atoms in one chunk. Default 100000.

    Returns
    -------
    cdates : list
        List of datetime giving the limit of each chunks.
        For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
    """
    info=infoattr(attribute, db=db)
    logger.debug("Attribute information \n%s"%info)

    # Get the number of points
    N=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
            attribute,
            dateStart.strftime(_DBDFMT2),
            dateStop.strftime(_DBDFMT2)
            ])
    logger.debug("On the period, there is %d entries"%N)

    dx=int(info["max_dim_x"])
    if dx > 1:
        logger.debug("Attribute is a vector with max dimension = %s"%dx)
        N=N*dx

    # If data chunk is too much, we need to cut it
    if N > Nmax:
        dt = (dateStop-dateStart)/(N//Nmax)
        cdates = [dateStart]
        while cdates[-1] < dateStop:
            cdates.append(cdates[-1]+dt)
        cdates[-1] = dateStop
        logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
    else:
        cdates=[dateStart, dateStop]

    return cdates

##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
    """
    Run a command on tango.DeviceProxy, retrying on DevFailed.

    Parameters
    ----------
    dp: tango.DeviceProxy
        Device proxy to try command onto.

    cmd : str
        Command to executte on the extractor

    arg : list
        Attribute to pass to the command

    retry : int
        Number of command retry on DevFailed

    Returns
    -------
    cmdreturn :
        Whatever the command returns.
        None if failed after the amount of retries.
    """
    logger.info("Perform Command {} {}".format(cmd, arg))

    for i in range(retry):
        # Make retrieval request
        logger.debug("Execute %s (%s)"%(cmd, arg))
        try:
            cmdreturn = getattr(dp, cmd)(arg)
        except tango.DevFailed as e:
            logger.warning("The extractor device returned the following error:")
            logger.warning(e)
            if  i == retry-1:
                logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
                return None
            logger.warning("Retrying...")
            continue
        break
    return cmdreturn


def _cast_bool(value):
    """
    Cast a value, or array of values, to boolean.
    Try to assess the input data type. If string, then try to find true or false word inside.

    Parameters:
    -----------
    value: string, integer, or array of such
        value to convert.

    Return:
    boolean:
        value or array of boolean.
    """

    # Force to array
    value = np.asarray(value)

    # cast back to single value
    def castback(v):
        if v.shape == ():
            return v.item()
        return v

    # Simply try to cast to bool first
    try:
        value = value.astype("bool")
        logger.debug("Direct conversion to boolean")
        return castback(value)
    except ValueError:
        # Keep trying to cast
        pass

    logger.debug("Try to convert to boolean")

    value = np.char.strip(np.char.lower(value))
    value = _ArrayStr2Bool(value)

    return castback(value)