Skip to content
Snippets Groups Projects
Commit 69071287 authored by BRONES Romain's avatar BRONES Romain
Browse files

WIP move from class to module

parent 3b5f9e1a
No related branches found
No related tags found
No related merge requests found
...@@ -42,13 +42,19 @@ _ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp) ...@@ -42,13 +42,19 @@ _ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
def _check_initialized(): def _check_initialized():
""" """
Check if the module is initialized, raise exception RuntimeError if not. Check if the module is initialized.
Returns
-------
success : boolean
""" """
global _extractors global _extractors
if None in _extractors: if None in _extractors:
logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__)) logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
raise RuntimeError("Module not initialized") return False
return True
##----------------------------------------------------------------------##
def _dateparse(datestr): def _dateparse(datestr):
""" """
Convenient function to parse date strings. Convenient function to parse date strings.
...@@ -93,6 +99,124 @@ def _dateparse(datestr): ...@@ -93,6 +99,124 @@ def _dateparse(datestr):
return date return date
##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
"""
Check that the attribute is in the database
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
"""
global _extractors
logger.debug("Check that %s is archived."%attribute)
if not _extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
logger.error("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
"""
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime
Start date for extraction.
dateStop : datetime.datetime
Stop date for extraction.
db: str
Which database to look in, 'H' or 'T'.
Nmax: int
Max number of atoms in one chunk. Default 100000.
Returns
-------
cdates : list
List of datetime giving the limit of each chunks.
For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
"""
info=infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Get the number of points
N=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(_DBDFMT2),
dateStop.strftime(_DBDFMT2)
])
logger.debug("On the period, there is %d entries"%N)
dx=int(info["max_dim_x"])
if dx > 1:
logger.debug("Attribute is a vector with max dimension = %s"%dx)
N=N*dx
# If data chunk is too much, we need to cut it
if N > Nmax:
dt = (dateStop-dateStart)/(N//Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
return cdates
##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
"""
Run a command on tango.DeviceProxy, retrying on DevFailed.
Parameters
----------
dp: tango.DeviceProxy
Device proxy to try command onto.
cmd : str
Command to executte on the extractor
arg : list
Attribute to pass to the command
retry : int
Number of command retry on DevFailed
Returns
-------
cmdreturn :
Whatever the command returns.
None if failed after the amount of retries.
"""
for i in range(retry):
# Make retrieval request
logger.debug("Execute %s (%s)"%(cmd, arg))
try:
cmdreturn = getattr(dp, cmd)(arg)
except tango.DevFailed as e:
logger.warning("The extractor device returned the following error:")
logger.warning(e)
if i == retry-1:
logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
return None
logger.warning("Retrying...")
continue
break
return cmdreturn
########################################################################## ##########################################################################
### Module private variables ### ### Module private variables ###
...@@ -106,6 +230,7 @@ _AttrTables = (None, None) ...@@ -106,6 +230,7 @@ _AttrTables = (None, None)
########################################################################## ##########################################################################
### Module initialisation functions ### ### Module initialisation functions ###
########################################################################## ##########################################################################
def init( def init(
HdbExtractorPath="archiving/hdbextractor/2", HdbExtractorPath="archiving/hdbextractor/2",
TdbExtractorPath="archiving/tdbextractor/2", TdbExtractorPath="archiving/tdbextractor/2",
...@@ -118,6 +243,7 @@ def init( ...@@ -118,6 +243,7 @@ def init(
Tango path to the extractors. Tango path to the extractors.
""" """
global _extractors global _extractors
global _AttrTables
logger.debug("Instanciating extractors device proxy...") logger.debug("Instanciating extractors device proxy...")
...@@ -134,6 +260,249 @@ def init( ...@@ -134,6 +260,249 @@ def init(
logger.debug("HDB: {} TDB: {} attributes counted".format(len(_AttrTables[0]), len(_AttrTables[1]))) logger.debug("HDB: {} TDB: {} attributes counted".format(len(_AttrTables[0]), len(_AttrTables[1])))
##----------------------------------------------------------------------##
def findattr(pattern, db="H"):
"""
Search for an attribute path using the pattern given.
Case insensitive.
Parameters:
-----------
pattern: str
Pattern to search, wildchar * accepted.
example "dg*dcct*current"
db: str
Which database to look in, 'H' or 'T'.
Returns:
--------
results: (str,)
List of string match
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
global _AttrTables
keywords=pattern.lower().split('*')
# Select DB
attr_table = _AttrTables[{'H':0, 'T':1}[db]]
matches = [attr for attr in attr_table if all(k in attr.lower() for k in keywords)]
return matches
##----------------------------------------------------------------------##
def infoattr(attribute, db='H'):
"""
Get informations for an attribute and pack it into a python dict.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
Returns
-------
info : dict
Dictionnary of propertyname:propertyvalue
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
info = dict()
for func in ("GetAttDefinitionData", "GetAttPropertiesData"):
R=getattr(_extractors[{'H':0, 'T':1}[db]], func)(attribute)
if not R is None:
for i in R:
_s=i.split("::")
info[_s[0]]=_s[1]
else:
logger.warning("Function %s on extractor returned None"%func)
return info
##---------------------------------------------------------------------------##
def ExtrBetweenDates(
attribute,
dateStart,
dateStop=None,
db='H',
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
dateStop : datetime.datetime, string, None
Stop date for extraction.
If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
If None, it takes the current date and time.
Default is None (now).
db: str
Which database to look in, 'H' or 'T'.
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[date, value] : array
date : numpy.ndarray of datetime.datime objects
Dates of the values
value : numpy.ndarray
Archived values
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
# Parse date if it is string
if type(dateStart) is str:
dateStart = _dateparse(dateStart)
if dateStop is None:
dateStop = datetime.datetime.now()
if type(dateStop) is str:
dateStop = _dateparse(dateStop)
# Uncapitalize attribute
attribute = attribute.lower()
# Check attribute is in database
_check_attribute(attribute, db=db)
# Get info about the attribute
info=infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Detect spectrum
attrtype="scalar"
if int(info["max_dim_x"]) > 1:
if int(info["max_dim_y"]) > 0:
logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%(
attribute, info["max_dim_x"], info["max_dim_y"]))
attrtype="multi"
else:
logger.info("Attribute %s is a 1D vector, dimension = %s."%(
attribute, info["max_dim_x"]))
attrtype="vector"
# Cut the time horizon in chunks
cdates = _chunkerize(attribute, dateStart, dateStop, db)
# Arrays to hold every chunks
value = []
date = []
# For each date chunk
for i_d in range(len(cdates)-1):
# =============
# For now we handle multi dimension the same way as scalar, which will get only the first element
if (attrtype=="scalar") or (attrtype=="multi"):
# Inform on retrieval request
logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%(
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT))
)
cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
])
# Check command return
if cmdreturn is None:
logger.error("Could not extract this chunk. Check the device extractor")
return None
# Unpack return
_date, _value = cmdreturn
# Transform to datetime - value arrays
# NOTE: it is faster than using pandas.to_datetime()
_value = np.asarray(_value, dtype=float)
if len(_date) > 0:
_date = _ArrayTimeStampToDatetime(_date/1000.0)
value.append(_value)
date.append(_date)
# =============
if attrtype=="vector":
logger.info("Perform GetAttDataBetweenDates (%s, %s, %s)"%(
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
))
[N,], [name,] = _extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDates([
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
])
N=int(N)
# Read the history
logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name))
attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N)
# Transform to datetime - value arrays
_value = np.empty((N, int(info["max_dim_x"])), dtype=float)
_value[:] = np.nan
_date = np.empty(N, dtype=object)
for i_h in range(N):
_value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value
_date[i_h]=attrHist[i_h].time.todatetime()
# Remove dynamic attribute
logger.debug("Remove dynamic attribute %s."%name)
_extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name)
value.append(_value)
date.append(_date)
logger.debug("Concatenate chunks")
value = np.concatenate(value)
date = np.concatenate(date)
logger.debug("Extraction done for %s."%attribute)
if attrtype=="vector":
return pd.DataFrame(index=date, data=value).dropna(axis=1, how='all')
else:
return pd.Series(index=date, data=value)
class ArchiveExtractor: class ArchiveExtractor:
......
import PyTango as tango import tango
import re import re
# ============================================================================= # =============================================================================
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
# Import usefull packages # Import usefull packages
import PyTango as tango import tango
import numpy as np import numpy as np
import OpUtils as OU import OpUtils as OU
print("Imported: tango, np, OU") print("Imported: tango, np, OU")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment