Newer
Older
"""
Python module for extracting attribute from Arhive Extractor Device.
"""
import logging
import datetime
import numpy as np
import PyTango as tango
__version__ = "1.0.1"
##########################################################################
""" Commodity variables """
# Extractor date format for GetAttDataBetweenDates
DBDFMT = "%Y-%m-%d %H:%M:%S"
# Extractor date format for GetNearestValue
DBDFMT2 = "%d-%m-%Y %H:%M:%S"
# Vectorized fromtimestamp function
ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
# Max number of point per extraction chunks
Nmax = 100000
##########################################################################
def __init__(
self,
extractorKind='H', extractorNumber=2,
extractorPath=None,
):
"""
Constructor function
Parameters
----------
extractorKind: char
Either 'H' or 'T' for HDB or TDB.
extractorNumber: int
Number of the archive extractor instance to use.
extractorPath: string
Tango path to the extractor.
If this argument is given, it takes precedence over extractorKind and extractorNumber.
logger: logging.Logger, str
Logger object to use.
If string, can be a log level. A basic logger with stream handler will be instanciated.
Default to 'info'.
Return
------
ArchiveExtractor
"""
#######################################################
if type(logger) == logging.Logger:
self.logger = logger
else:
self.logger = logging.getLogger(__name__)
self.logger.setLevel(getattr(logging, logger.upper()))
if not self.logger.hasHandlers():
# No handlers, create one
sh = logging.StreamHandler()
sh.setLevel(self.logger.level)
sh.setFormatter(logging.Formatter("%(levelname)s:%(message)s"))
self.logger.addHandler(sh)
#######################################################
# Select Extractor
if extractorPath is None:
"archiving/%sDBExtractor/%d"%(extractorKind, extractorNumber)
self.extractor = tango.DeviceProxy(extractorPath)
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
self.extractor.set_timeout_millis(3000)
self.logger.debug("Archive Extractor %s used."%self.extractor.name())
##---------------------------------------------------------------------------##
@staticmethod
def dateparse(datestr):
"""
Convenient function to parse date strings.
Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
Parameters
---------
datestr : string
Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
Exceptions
----------
ValueError
If the parsing failed.
Returns
-------
date : datetime.datetime
Parsed date
"""
# This gives all format that will be tried, in order.
# Stop on first parse success. Raise error if none succeed.
fmt = [
"%Y-%m-%d-%H:%M:%S",
"%Y-%m-%d-%H:%M",
"%Y-%m-%d-%H",
"%Y-%m-%d",
"%Y-%m",
]
date = None
for f in fmt:
try:
date = datetime.datetime.strptime(datestr, f)
except ValueError:
continue
else:
break
else:
raise ValueError("Could not parse argument to a date")
##---------------------------------------------------------------------------##
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
dateStop : datetime.datetime, string, None
Stop date for extraction.
If string, it will be parsed.
If None, it takes the current date and time.
Default is None (now).
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[date, value] : array
date : numpy.ndarray of datetime.datime objects
Dates of the values
value : numpy.ndarray
Archived values
"""
# Parse date if it is string
if type(dateStart) is str:
dateStart = self.dateparse(dateStart)
if dateStop is None:
dateStop = datetime.datetime.now()
if type(dateStop) is str:
dateStop = self.dateparse(dateStop)
# Uncapitalize attribute
attribute = attribute.lower()
# Check that the attribute is in the database
self.logger.debug("Check that %s is archived."%attribute)
if not self.extractor.IsArchived(attribute):
self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
# Get the number of points
N=self.extractor.GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(DBDFMT2),
dateStop.strftime(DBDFMT2)
])
self.logger.debug("On the period, there is %d entries"%N)
# If data chunk is too much, we need to cut it
if N > self.Nmax:
dt = (dateStop-dateStart)/(N//self.Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
self.logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
# Arrays to hold every chunks
value = []
date = []
# For each date chunk
for i_d in range(len(cdates)-1):
# 2 retries on DevFailed
for i in range(3):
# Make retrieval request
self.logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%(
attribute,
cdates[i_d].strftime(DBDFMT),
cdates[i_d+1].strftime(DBDFMT))
)
try:
_date, _value = self.extractor.ExtractBetweenDates([
attribute,
cdates[i_d].strftime(DBDFMT),
cdates[i_d+1].strftime(DBDFMT)
])
except tango.DevFailed as e:
self.logger.warning("The extractor device returned the following error:")
self.logger.warning(e)
self.logger.warning("Retrying...")
continue
break
if i==2:
logger.error("Could not extract this chunk. Check the device extractor")
return None
# Transform to datetime - value arrays
_value = np.asarray(_value, dtype=float)
if len(_date) > 0:
_date = ArrayTimeStampToDatetime(_date/1000.0)
value.append(_value)
date.append(_date)
self.logger.debug("Concatenate chunks")
value = np.concatenate(value)
date = np.concatenate(date)
self.logger.debug("Extraction done for %s."%attribute)
return [date, value]
##---------------------------------------------------------------------------##
def betweenDates_MinMaxMean(
self,
attribute,
dateStart,
dateStop=datetime.datetime.now(),
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
dateStop : datetime.datetime, string
Stop date for extraction. If string, it will be parsed.
Default is now (datetime.datetime.now())
timeInterval: datetime.timedelta, string
Time interval used to perform min,max and mean.
Can be a string with a number and a unit in "d", "h", "m" or "s"
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[mdates, value_min, value_max, value_mean] : array
mdates : numpy.ndarray of datetime.datime objects
Dates of the values, middle of timeInterval windows
value_min : numpy.ndarray
Minimum of the value on the interval
value_max : numpy.ndarray
Maximum of the value on the interval
value_mean : numpy.ndarray
Mean of the value on the interval
# Parse date if it is string
if type(dateStart) is str:
dateStart = self.dateparse(dateStart)
if type(dateStop) is str:
dateStop = self.dateparse(dateStop)
# Parse timeInterval if string
if type(timeInterval) is str:
try:
mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]]
except KeyError:
self.logger.error("timeInterval could not be parsed")
raise ValueError("timeInterval could not be parsed")
timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul)
# Check that the attribute is in the database
self.logger.debug("Check that %s is archived."%attribute)
if not self.extractor.IsArchived(attribute):
self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
# Cut data range in time chunks
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+timeInterval)
mdates = np.asarray(cdates[:-1])+timeInterval/2
self.logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval))
value_min = np.empty(len(cdates)-1)
value_max = np.empty(len(cdates)-1)
value_mean = np.empty(len(cdates)-1)
# For each time chunk
for i_d in range(len(cdates)-1):
for func, arr in zip(
["Max", "Min", "Avg"],
[value_max, value_min, value_mean],
):
# Make requests
self.logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%(
func,
attribute,
cdates[i_d].strftime(DBDFMT2),
cdates[i_d+1].strftime(DBDFMT2))
)
_val =getattr(self.extractor, "GetAttData%sBetweenDates"%func)([
attribute,
cdates[i_d].strftime(DBDFMT2),
cdates[i_d+1].strftime(DBDFMT2)
])
self.logger.debug("Extraction done for %s."%attribute)
return [mdates, value_min, value_max, value_mean]