Skip to content
Snippets Groups Projects
Core.py 10.7 KiB
Newer Older


##########################################################################
###                    Module core functions                           ###
##########################################################################

def _extract_attribute(attribute, method, date1, date2, db):
    """
    Check if exists, check scalar or spectrum and dispatch
    """

    # Uncapitalize attribute
    attribute = attribute.lower()
    aea._check_attribute(attribute, db)

    # Get info about the attribute
    info=infoattr(attribute, db=db)
    logger.debug("Attribute information \n%s"%info)

    # Detect spectrum
    attrtype="scalar"
    if int(info["max_dim_x"]) > 1:
        if int(info["max_dim_y"]) > 0:
            logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%(
                attribute, info["max_dim_x"], info["max_dim_y"]))
            attrtype="multi"
        else:
            logger.info("Attribute %s is a 1D vector, dimension = %s."%(
                attribute, info["max_dim_x"]))
            attrtype="vector"

    # =============
    # For now we handle multi dimension the same way as scalar, which will get only the first element
    if (attrtype=="scalar") or (attrtype=="multi"):
        if info["data_type"] == '1':
            # Boolean data type, quick fix
            dtype=bool
        else:
            dtype=float

        return _extract_scalar(attribute, method, date1, date2, db, dtype)
    if attrtype=="vector":
        return _extract_vector(attribute, method, date1, date2, db)


##---------------------------------------------------------------------------##
def _extract_scalar(attribute, method, date1, date2, db, dtype):

    # =====================
    if method == "nearest":
        cmdreturn = aea._cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "GetNearestValue", [
                                                attribute,
                                                date1.strftime(aea._DBDFMT),
                                                ])

        # Unpack return
        try:
            _date, _value = cmdreturn.split(';')
        except TypeError:
            logger.error("Could not extract this chunk. Check the device extractor")
            return None

        # Transform by datatype
        if dtype is bool:
            _value = _cast_bool(_value)

        # Fabricate return pandas.Series
        d=pd.Series(index=[datetime.datetime.fromtimestamp(int(_date)/1000),], data=[_value,], name=attribute)

        return d

    # =====================
    if method == "between":
        # Cut the time horizon in chunks
        cdates = aea._chunkerize(attribute, date1, date2, db)

        # Array to hold data
        data = []

        # For each date chunk
        for i_d in range(len(cdates)-1):
            cmdreturn = aea._cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [
                                                    attribute,
                                                    cdates[i_d].strftime(aea._DBDFMT),
                                                    cdates[i_d+1].strftime(aea._DBDFMT)
                                                    ])


            # Unpack return
            try:
                _date, _value = cmdreturn
            except TypeError:
                logger.error("Could not extract this chunk. Check the device extractor")
                return None


            # Transform to datetime - value arrays
            if dtype is bool:
                _value = _cast_bool(_value)
            else:
                _value = np.asarray(_value, dtype=dtype)

            if len(_date) > 0:
                _date = aea._ArrayTimeStampToDatetime(_date/1000.0)

            # Fabricate return pandas.Series
            data.append(pd.Series(index=_date, data=_value, name=attribute))

        # Concatenate chunks
        return pd.concat(data)

    # ========================
    if method == "minmaxmean":

        # If we are here, the method is not implemented
        logger.error("Method {} is not implemented for scalars.".format(method))
        raise NotImplemented

##---------------------------------------------------------------------------##
def _extract_vector(attribute, method, date1, date2, db):

    # Get info about the attribute
    info=infoattr(attribute, db=db)

    # =====================
    if method == "nearest":
        # Get nearest does not work with vector.
        # Make a between date with surounding dates.

        # Dynamically find surounding
        cnt=0
        dt=datetime.timedelta(seconds=10)
        while cnt<1:
            logger.debug("Seeking points in {} to {}".format(date1-dt,date1+dt))
            cnt=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
                    attribute,
                    (date1-dt).strftime(aea._DBDFMT2),
                    (date1+dt).strftime(aea._DBDFMT2)
                    ])
            dt=dt*1.5
        logger.debug("Found {} points in a +- {} interval".format(cnt,str(dt/1.5)))


        # For vector, we have to use the GetAttxxx commands
        cmdreturn = aea._cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "GetAttDataBetweenDates", [
                                                attribute,
                                                (date1-dt).strftime(aea._DBDFMT),
                                                (date1+dt).strftime(aea._DBDFMT),
                                                ])

        # Unpack return
        try:
            [N,], [name,] = cmdreturn
            N=int(N)
        except TypeError:
            logger.error("Could not extract this attribute. Check the device extractor")
            return None

        # Read the history
        logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name))
        attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N)

        # Transform to datetime - value arrays
        _value = np.empty((N, int(info["max_dim_x"])), dtype=float)
        _value[:] = np.nan
        _date = np.empty(N, dtype=object)
        for i_h in range(N):
            _value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value
            _date[i_h]=attrHist[i_h].time.todatetime()

        # Seeking nearest entry
        idx=np.argmin(abs(_date-date1))
        logger.debug("Found nearest value at index {}: {}".format(idx, _date[idx]))

        # Fabricate return pandas.Series
        d=pd.Series(index=[_date[idx],], data=[_value[idx],], name=attribute)

        return d

    # If we are here, the method is not implemented
    logger.error("Method {} is not implemented for vectors.".format(method))
    raise NotImplemented


##---------------------------------------------------------------------------##
def ExtrBetweenDates_MinMaxMean(
        attribute,
        dateStart,
        dateStop=None,
        timeInterval=datetime.timedelta(seconds=60),
        db='H',
        ):
    """
    Query attribute data from an archiver database, get all points between dates.
    Use ExtractBetweenDates.

    Parameters
    ----------
    attribute : String
        Name of the attribute. Full Tango name i.e. "test/dg/panda/current".

    dateStart : datetime.datetime, string
        Start date for extraction. If string, it will be parsed.
        Example of string format %Y-%m-%d-%H:%M:%S or less precise.

    dateStop : datetime.datetime, string
        Stop date for extraction. If string, it will be parsed.
        Example of string format %Y-%m-%d-%H:%M:%S or less precise.
        Default is now (datetime.datetime.now())

    timeInterval: datetime.timedelta, string
        Time interval used to perform min,max and mean.
        Can be a string with a number and a unit in "d", "h", "m" or "s"

    db: str
        Which database to look in, 'H' or 'T'.

    Exceptions
    ----------
    ValueError
        The attribute is not found in the database.

    Returns
    -------
    [mdates, value_min, value_max, value_mean] : array
        mdates : numpy.ndarray of datetime.datime objects
            Dates of the values, middle of timeInterval windows
        value_min : numpy.ndarray
            Minimum of the value on the interval
        value_max : numpy.ndarray
            Maximum of the value on the interval
        value_mean : numpy.ndarray
            Mean of the value on the interval

    """
    if not _check_initialized():
        return

    if not db in ("H", "T"):
        raise AttributeError("Attribute db should be 'H' or 'T'")

    # Uncapitalize attribute
    attribute = attribute.lower()

    # Check attribute is in database
    _check_attribute(attribute, db=db)

    # Parse dates
    dateStart = _dateparse(dateStart)
    dateStop = _dateparse(dateStop)

    # Parse timeInterval if string
    if type(timeInterval) is str:
        try:
            mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]]
        except KeyError:
            logger.error("timeInterval could not be parsed")
            raise ValueError("timeInterval could not be parsed")
        timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul)

    # Get info about the attribute
    info=infoattr(attribute)
    logger.debug("Attribute information \n%s"%info)

    # Detect spectrum
    attrtype="scalar"
    if int(info["max_dim_x"]) > 1:
        logger.error("Attribute is not a scalar. Cannot perform this kind of operation.")
        return None

    # Cut data range in time chunks
    cdates = [dateStart]
    while cdates[-1] < dateStop:
        cdates.append(cdates[-1]+timeInterval)
    cdates[-1] = dateStop
    mdates = np.asarray(cdates[:-1])+timeInterval/2
    logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval))

    # Prepare arrays
    value_min = np.empty(len(cdates)-1)
    value_max = np.empty(len(cdates)-1)
    value_mean = np.empty(len(cdates)-1)

    # For each time chunk
    for i_d in range(len(cdates)-1):
        for func, arr in zip(
                ["Max", "Min", "Avg"],
                [value_max, value_min, value_mean],
                ):
            # Make requests
            logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%(
                func,
                attribute,
                cdates[i_d].strftime(_DBDFMT2),
                cdates[i_d+1].strftime(_DBDFMT2))
                )

            _val =getattr(_extractors[{'H':0, 'T':1}[db]], "GetAttData%sBetweenDates"%func)([
                attribute,
                cdates[i_d].strftime(_DBDFMT2),
                cdates[i_d+1].strftime(_DBDFMT2)
                ])

            arr[i_d] = _val

    logger.debug("Extraction done for %s."%attribute)
    return pd.DataFrame(
            index=mdates,
            data={
                "Min":value_min,
                "Mean":value_mean,
                "Max":value_max,
                },)