Newer
Older
"""
Python module for extracting attribute from Arhive Extractor Device.
"""
import logging
import datetime
import numpy as np
import PyTango as tango
__version__ = "1.0.1"
##########################################################################
### Install logger for the module ###
##########################################################################
logger = logging.getLogger(__name__)
#logger.setLevel(getattr(logging, logger.upper()))
if not logger.hasHandlers():
# No handlers, create one
sh = logging.StreamHandler()
sh.setLevel(logger.level)
sh.setFormatter(logging.Formatter("%(levelname)s:%(message)s"))
logger.addHandler(sh)
##########################################################################
### Commodity private variables ###
##########################################################################
_DBDFMT = "%Y-%m-%d %H:%M:%S"
_DBDFMT2 = "%d-%m-%Y %H:%M:%S"
##########################################################################
### Commodity private functions ###
##########################################################################
_ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
Check if the module is initialized.
Returns
-------
success : boolean
"""
global _extractors
if None in _extractors:
logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
##----------------------------------------------------------------------##
def _dateparse(datestr):
"""
Convenient function to parse date strings.
Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
Parameters
---------
datestr : string
Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
Exceptions
----------
ValueError
If the parsing failed.
Returns
-------
date : datetime.datetime
Parsed date
"""
if datestr is None:
return datetime.datetime.now()
# This gives all format that will be tried, in order.
# Stop on first parse success. Raise error if none succeed.
fmt = [
"%Y-%m-%d-%H:%M:%S",
"%Y-%m-%d-%H:%M",
"%Y-%m-%d-%H",
"%Y-%m-%d",
"%Y-%m",
]
date = None
for f in fmt:
try:
date = datetime.datetime.strptime(datestr, f)
except ValueError:
continue
else:
break
else:
raise ValueError("Could not parse argument to a date")
return date
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
"""
Check that the attribute is in the database
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
"""
global _extractors
logger.debug("Check that %s is archived."%attribute)
if not _extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
logger.error("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
"""
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime
Start date for extraction.
dateStop : datetime.datetime
Stop date for extraction.
db: str
Which database to look in, 'H' or 'T'.
Nmax: int
Max number of atoms in one chunk. Default 100000.
Returns
-------
cdates : list
List of datetime giving the limit of each chunks.
For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
"""
info=infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Get the number of points
N=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(_DBDFMT2),
dateStop.strftime(_DBDFMT2)
])
logger.debug("On the period, there is %d entries"%N)
dx=int(info["max_dim_x"])
if dx > 1:
logger.debug("Attribute is a vector with max dimension = %s"%dx)
N=N*dx
# If data chunk is too much, we need to cut it
if N > Nmax:
dt = (dateStop-dateStart)/(N//Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
return cdates
##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
"""
Run a command on tango.DeviceProxy, retrying on DevFailed.
Parameters
----------
dp: tango.DeviceProxy
Device proxy to try command onto.
cmd : str
Command to executte on the extractor
arg : list
Attribute to pass to the command
retry : int
Number of command retry on DevFailed
Returns
-------
cmdreturn :
Whatever the command returns.
None if failed after the amount of retries.
"""
for i in range(retry):
# Make retrieval request
logger.debug("Execute %s (%s)"%(cmd, arg))
try:
cmdreturn = getattr(dp, cmd)(arg)
except tango.DevFailed as e:
logger.warning("The extractor device returned the following error:")
logger.warning(e)
if i == retry-1:
logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
return None
logger.warning("Retrying...")
continue
break
return cmdreturn
##########################################################################
### Module private variables ###
##########################################################################
# Tuple of extractor for HDB and TDB
_extractors = (None, None)
# Tuple for attribute tables
_AttrTables = (None, None)
##########################################################################
### Module initialisation functions ###
##########################################################################
def init(
HdbExtractorPath="archiving/hdbextractor/2",
TdbExtractorPath="archiving/tdbextractor/2",
):
"""
Initialize the module.
Instanciate tango.DeviceProxy for extractors (TDB and HDB)
HdbExtractorPath, TdbExtractorPath: string
Tango path to the extractors.
loglevel: string
loglevel to pass to logging.Logger
_extractors = (None, None)
_AttrTables = (None, None)
try:
logger.setLevel(getattr(logging, loglevel.upper()))
except AttributeError:
logger.error("Wrong log level specified: {}".format(loglevel.upper()))
logger.debug("Instanciating extractors device proxy...")
_extractors = (tango.DeviceProxy(HdbExtractorPath), tango.DeviceProxy(TdbExtractorPath))
logger.debug("{} and {} instanciated.".format(*_extractors))
logger.debug("Configuring extractors device proxy...")
for e in _extractors:
# set timeout to 3 sec
e.set_timeout_millis(3000)
logger.debug("Filling attributes lookup tables...")
_AttrTables = tuple(e.getattnameall() for e in _extractors)
logger.debug("HDB: {} TDB: {} attributes counted".format(len(_AttrTables[0]), len(_AttrTables[1])))
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
##----------------------------------------------------------------------##
def findattr(pattern, db="H"):
"""
Search for an attribute path using the pattern given.
Case insensitive.
Parameters:
-----------
pattern: str
Pattern to search, wildchar * accepted.
example "dg*dcct*current"
db: str
Which database to look in, 'H' or 'T'.
Returns:
--------
results: (str,)
List of string match
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
global _AttrTables
keywords=pattern.lower().split('*')
# Select DB
attr_table = _AttrTables[{'H':0, 'T':1}[db]]
matches = [attr for attr in attr_table if all(k in attr.lower() for k in keywords)]
return matches
##----------------------------------------------------------------------##
def infoattr(attribute, db='H'):
"""
Get informations for an attribute and pack it into a python dict.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
Returns
-------
info : dict
Dictionnary of propertyname:propertyvalue
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
info = dict()
for func in ("GetAttDefinitionData", "GetAttPropertiesData"):
R=getattr(_extractors[{'H':0, 'T':1}[db]], func)(attribute)
if not R is None:
for i in R:
_s=i.split("::")
info[_s[0]]=_s[1]
else:
logger.warning("Function %s on extractor returned None"%func)
return info
##---------------------------------------------------------------------------##
def ExtrBetweenDates(
attribute,
dateStart,
dateStop=None,
db='H',
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
dateStop : datetime.datetime, string, None
Stop date for extraction.
If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
If None, it takes the current date and time.
Default is None (now).
db: str
Which database to look in, 'H' or 'T'.
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[date, value] : array
date : numpy.ndarray of datetime.datime objects
Dates of the values
value : numpy.ndarray
Archived values
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
# Uncapitalize attribute
attribute = attribute.lower()
# Check attribute is in database
_check_attribute(attribute, db=db)
# Parse dates
dateStart = _dateparse(dateStart)
dateStop = _dateparse(dateStop)
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
# Get info about the attribute
info=infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Detect spectrum
attrtype="scalar"
if int(info["max_dim_x"]) > 1:
if int(info["max_dim_y"]) > 0:
logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%(
attribute, info["max_dim_x"], info["max_dim_y"]))
attrtype="multi"
else:
logger.info("Attribute %s is a 1D vector, dimension = %s."%(
attribute, info["max_dim_x"]))
attrtype="vector"
# Cut the time horizon in chunks
cdates = _chunkerize(attribute, dateStart, dateStop, db)
# Arrays to hold every chunks
value = []
date = []
# For each date chunk
for i_d in range(len(cdates)-1):
# =============
# For now we handle multi dimension the same way as scalar, which will get only the first element
if (attrtype=="scalar") or (attrtype=="multi"):
# Inform on retrieval request
logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%(
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT))
)
cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
])
# Check command return
if cmdreturn is None:
logger.error("Could not extract this chunk. Check the device extractor")
return None
# Unpack return
_date, _value = cmdreturn
# Transform to datetime - value arrays
# NOTE: it is faster than using pandas.to_datetime()
_value = np.asarray(_value, dtype=float)
if len(_date) > 0:
_date = _ArrayTimeStampToDatetime(_date/1000.0)
value.append(_value)
date.append(_date)
# =============
if attrtype=="vector":
logger.info("Perform GetAttDataBetweenDates (%s, %s, %s)"%(
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
))
[N,], [name,] = _extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDates([
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
])
N=int(N)
# Read the history
logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name))
attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N)
# Transform to datetime - value arrays
_value = np.empty((N, int(info["max_dim_x"])), dtype=float)
_value[:] = np.nan
_date = np.empty(N, dtype=object)
for i_h in range(N):
_value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value
_date[i_h]=attrHist[i_h].time.todatetime()
# Remove dynamic attribute
logger.debug("Remove dynamic attribute %s."%name)
_extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name)
value.append(_value)
date.append(_date)
logger.debug("Concatenate chunks")
value = np.concatenate(value)
date = np.concatenate(date)
logger.debug("Extraction done for %s."%attribute)
if attrtype=="vector":
return pd.DataFrame(index=date, data=value).dropna(axis=1, how='all')
else:
return pd.Series(index=date, data=value)
##---------------------------------------------------------------------------##
def ExtrBetweenDates_MinMaxMean(
attribute,
dateStart,
timeInterval=datetime.timedelta(seconds=60),
db='H',
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
dateStop : datetime.datetime, string
Stop date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
Default is now (datetime.datetime.now())
timeInterval: datetime.timedelta, string
Time interval used to perform min,max and mean.
Can be a string with a number and a unit in "d", "h", "m" or "s"
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[mdates, value_min, value_max, value_mean] : array
mdates : numpy.ndarray of datetime.datime objects
Dates of the values, middle of timeInterval windows
value_min : numpy.ndarray
Minimum of the value on the interval
value_max : numpy.ndarray
Maximum of the value on the interval
value_mean : numpy.ndarray
Mean of the value on the interval
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
# Uncapitalize attribute
attribute = attribute.lower()
# Check attribute is in database
_check_attribute(attribute, db=db)
# Parse dates
dateStart = _dateparse(dateStart)
dateStop = _dateparse(dateStop)
# Parse timeInterval if string
if type(timeInterval) is str:
try:
mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]]
except KeyError:
logger.error("timeInterval could not be parsed")
raise ValueError("timeInterval could not be parsed")
timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul)
info=infoattr(attribute)
logger.debug("Attribute information \n%s"%info)
# Detect spectrum
attrtype="scalar"
if int(info["max_dim_x"]) > 1:
logger.error("Attribute is not a scalar. Cannot perform this kind of operation.")
return None
# Cut data range in time chunks
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+timeInterval)
cdates[-1] = dateStop
mdates = np.asarray(cdates[:-1])+timeInterval/2
logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval))
# Prepare arrays
value_min = np.empty(len(cdates)-1)
value_max = np.empty(len(cdates)-1)
value_mean = np.empty(len(cdates)-1)
# For each time chunk
for i_d in range(len(cdates)-1):
for func, arr in zip(
["Max", "Min", "Avg"],
[value_max, value_min, value_mean],
):
# Make requests
logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%(
func,
attribute,
cdates[i_d].strftime(_DBDFMT2),
cdates[i_d+1].strftime(_DBDFMT2))
)
_val =getattr(_extractors[{'H':0, 'T':1}[db]], "GetAttData%sBetweenDates"%func)([
attribute,
cdates[i_d].strftime(_DBDFMT2),
cdates[i_d+1].strftime(_DBDFMT2)
])
logger.debug("Extraction done for %s."%attribute)
return pd.DataFrame(
index=mdates,
data={
"Min":value_min,
"Mean":value_mean,
"Max":value_max,
},)