Skip to content
Snippets Groups Projects
Commit 778e45cb authored by Operateur's avatar Operateur
Browse files

Major changes

* Remove one layer of package, split in files
* Now use a unique entry point function for extraction
* Add support for multiexport (list, dict) and timedelta
* Add deploy script with a quick test
parents e15172f7 c86e3c17
Branches
Tags 2.0
No related merge requests found
"""
Python module for extracting attribute from Arhive Extractor Device.
"""
import logging
import datetime
import numpy as np
import PyTango as tango
import pandas as pd
__version__ = "1.0.1"
##########################################################################
### Install logger for the module ###
##########################################################################
logger = logging.getLogger(__name__)
#logger.setLevel(getattr(logging, logger.upper()))
if not logger.hasHandlers():
# No handlers, create one
sh = logging.StreamHandler()
sh.setLevel(logger.level)
sh.setFormatter(logging.Formatter("%(levelname)s:%(message)s"))
logger.addHandler(sh)
##########################################################################
### Commodity private variables ###
##########################################################################
# Extractor date format for GetAttDataBetweenDates
_DBDFMT = "%Y-%m-%d %H:%M:%S"
# Extractor date format for GetNearestValue
_DBDFMT2 = "%d-%m-%Y %H:%M:%S"
##########################################################################
### Commodity private functions ###
##########################################################################
# Vectorized fromtimestamp function
_ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
def _check_initialized():
"""
Check if the module is initialized.
Returns
-------
success : boolean
"""
global _extractors
if None in _extractors:
logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
return False
return True
##----------------------------------------------------------------------##
def _dateparse(datestr):
"""
Convenient function to parse date strings.
Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
If datstr is None, take the actual date and time.
Parameters
---------
datestr : string
Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
Exceptions
----------
ValueError
If the parsing failed.
Returns
-------
date : datetime.datetime
Parsed date
"""
if datestr is None:
return datetime.datetime.now()
# This gives all format that will be tried, in order.
# Stop on first parse success. Raise error if none succeed.
fmt = [
"%Y-%m-%d-%H:%M:%S",
"%Y-%m-%d-%H:%M",
"%Y-%m-%d-%H",
"%Y-%m-%d",
"%Y-%m",
]
date = None
for f in fmt:
try:
date = datetime.datetime.strptime(datestr, f)
except ValueError:
continue
else:
break
else:
raise ValueError("Could not parse argument to a date")
return date
##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
"""
Check that the attribute is in the database
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
"""
global _extractors
logger.debug("Check that %s is archived."%attribute)
if not _extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
logger.error("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, _extractors[{'H':0, 'T':1}[db]]))
##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
"""
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime
Start date for extraction.
dateStop : datetime.datetime
Stop date for extraction.
db: str
Which database to look in, 'H' or 'T'.
Nmax: int
Max number of atoms in one chunk. Default 100000.
Returns
-------
cdates : list
List of datetime giving the limit of each chunks.
For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
"""
info=infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Get the number of points
N=_extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(_DBDFMT2),
dateStop.strftime(_DBDFMT2)
])
logger.debug("On the period, there is %d entries"%N)
dx=int(info["max_dim_x"])
if dx > 1:
logger.debug("Attribute is a vector with max dimension = %s"%dx)
N=N*dx
# If data chunk is too much, we need to cut it
if N > Nmax:
dt = (dateStop-dateStart)/(N//Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
return cdates
##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
"""
Run a command on tango.DeviceProxy, retrying on DevFailed.
Parameters
----------
dp: tango.DeviceProxy
Device proxy to try command onto.
cmd : str
Command to executte on the extractor
arg : list
Attribute to pass to the command
retry : int
Number of command retry on DevFailed
Returns
-------
cmdreturn :
Whatever the command returns.
None if failed after the amount of retries.
"""
for i in range(retry):
# Make retrieval request
logger.debug("Execute %s (%s)"%(cmd, arg))
try:
cmdreturn = getattr(dp, cmd)(arg)
except tango.DevFailed as e:
logger.warning("The extractor device returned the following error:")
logger.warning(e)
if i == retry-1:
logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
return None
logger.warning("Retrying...")
continue
break
return cmdreturn
##########################################################################
### Module private variables ###
##########################################################################
# Tuple of extractor for HDB and TDB
_extractors = (None, None)
# Tuple for attribute tables
_AttrTables = (None, None)
##########################################################################
### Module initialisation functions ###
##########################################################################
def init(
HdbExtractorPath="archiving/hdbextractor/2",
TdbExtractorPath="archiving/tdbextractor/2",
loglevel="info",
):
"""
Initialize the module.
Instanciate tango.DeviceProxy for extractors (TDB and HDB)
HdbExtractorPath, TdbExtractorPath: string
Tango path to the extractors.
loglevel: string
loglevel to pass to logging.Logger
"""
global _extractors
global _AttrTables
try:
logger.setLevel(getattr(logging, loglevel.upper()))
except AttributeError:
logger.error("Wrong log level specified: {}".format(loglevel.upper()))
logger.debug("Instanciating extractors device proxy...")
_extractors = (tango.DeviceProxy(HdbExtractorPath), tango.DeviceProxy(TdbExtractorPath))
logger.debug("{} and {} instanciated.".format(*_extractors))
logger.debug("Configuring extractors device proxy...")
for e in _extractors:
# set timeout to 3 sec
e.set_timeout_millis(3000)
logger.debug("Filling attributes lookup tables...")
_AttrTables = tuple(e.getattnameall() for e in _extractors)
logger.debug("HDB: {} TDB: {} attributes counted".format(len(_AttrTables[0]), len(_AttrTables[1])))
##----------------------------------------------------------------------##
def findattr(pattern, db="H"):
"""
Search for an attribute path using the pattern given.
Case insensitive.
Parameters:
-----------
pattern: str
Pattern to search, wildchar * accepted.
example "dg*dcct*current"
db: str
Which database to look in, 'H' or 'T'.
Returns:
--------
results: (str,)
List of string match
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
global _AttrTables
keywords=pattern.lower().split('*')
# Select DB
attr_table = _AttrTables[{'H':0, 'T':1}[db]]
matches = [attr for attr in attr_table if all(k in attr.lower() for k in keywords)]
return matches
##----------------------------------------------------------------------##
def infoattr(attribute, db='H'):
"""
Get informations for an attribute and pack it into a python dict.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
Returns
-------
info : dict
Dictionnary of propertyname:propertyvalue
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
info = dict()
for func in ("GetAttDefinitionData", "GetAttPropertiesData"):
R=getattr(_extractors[{'H':0, 'T':1}[db]], func)(attribute)
if not R is None:
for i in R:
_s=i.split("::")
info[_s[0]]=_s[1]
else:
logger.warning("Function %s on extractor returned None"%func)
return info
##---------------------------------------------------------------------------##
def ExtrBetweenDates(
attribute,
dateStart,
dateStop=None,
db='H',
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
dateStop : datetime.datetime, string, None
Stop date for extraction.
If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
If None, it takes the current date and time.
Default is None (now).
db: str
Which database to look in, 'H' or 'T'.
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[date, value] : array
date : numpy.ndarray of datetime.datime objects
Dates of the values
value : numpy.ndarray
Archived values
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
# Uncapitalize attribute
attribute = attribute.lower()
# Check attribute is in database
_check_attribute(attribute, db=db)
# Parse dates
dateStart = _dateparse(dateStart)
dateStop = _dateparse(dateStop)
# Get info about the attribute
info=infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Detect spectrum
attrtype="scalar"
if int(info["max_dim_x"]) > 1:
if int(info["max_dim_y"]) > 0:
logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%(
attribute, info["max_dim_x"], info["max_dim_y"]))
attrtype="multi"
else:
logger.info("Attribute %s is a 1D vector, dimension = %s."%(
attribute, info["max_dim_x"]))
attrtype="vector"
# Cut the time horizon in chunks
cdates = _chunkerize(attribute, dateStart, dateStop, db)
# Arrays to hold every chunks
value = []
date = []
# For each date chunk
for i_d in range(len(cdates)-1):
# =============
# For now we handle multi dimension the same way as scalar, which will get only the first element
if (attrtype=="scalar") or (attrtype=="multi"):
# Inform on retrieval request
logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%(
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT))
)
cmdreturn = _cmd_with_retry(_extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
])
# Check command return
if cmdreturn is None:
logger.error("Could not extract this chunk. Check the device extractor")
return None
# Unpack return
_date, _value = cmdreturn
# Transform to datetime - value arrays
# NOTE: it is faster than using pandas.to_datetime()
_value = np.asarray(_value, dtype=float)
if len(_date) > 0:
_date = _ArrayTimeStampToDatetime(_date/1000.0)
value.append(_value)
date.append(_date)
# =============
if attrtype=="vector":
logger.info("Perform GetAttDataBetweenDates (%s, %s, %s)"%(
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
))
[N,], [name,] = _extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDates([
attribute,
cdates[i_d].strftime(_DBDFMT),
cdates[i_d+1].strftime(_DBDFMT)
])
N=int(N)
# Read the history
logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name))
attrHist = _extractors[{'H':0, 'T':1}[db]].attribute_history(name, N)
# Transform to datetime - value arrays
_value = np.empty((N, int(info["max_dim_x"])), dtype=float)
_value[:] = np.nan
_date = np.empty(N, dtype=object)
for i_h in range(N):
_value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value
_date[i_h]=attrHist[i_h].time.todatetime()
# Remove dynamic attribute
logger.debug("Remove dynamic attribute %s."%name)
_extractors[{'H':0, 'T':1}[db]].RemoveDynamicAttribute(name)
value.append(_value)
date.append(_date)
logger.debug("Concatenate chunks")
value = np.concatenate(value)
date = np.concatenate(date)
logger.debug("Extraction done for %s."%attribute)
if attrtype=="vector":
return pd.DataFrame(index=date, data=value).dropna(axis=1, how='all')
else:
return pd.Series(index=date, data=value)
##---------------------------------------------------------------------------##
def ExtrBetweenDates_MinMaxMean(
attribute,
dateStart,
dateStop=None,
timeInterval=datetime.timedelta(seconds=60),
db='H',
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
dateStop : datetime.datetime, string
Stop date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
Default is now (datetime.datetime.now())
timeInterval: datetime.timedelta, string
Time interval used to perform min,max and mean.
Can be a string with a number and a unit in "d", "h", "m" or "s"
db: str
Which database to look in, 'H' or 'T'.
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[mdates, value_min, value_max, value_mean] : array
mdates : numpy.ndarray of datetime.datime objects
Dates of the values, middle of timeInterval windows
value_min : numpy.ndarray
Minimum of the value on the interval
value_max : numpy.ndarray
Maximum of the value on the interval
value_mean : numpy.ndarray
Mean of the value on the interval
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
# Uncapitalize attribute
attribute = attribute.lower()
# Check attribute is in database
_check_attribute(attribute, db=db)
# Parse dates
dateStart = _dateparse(dateStart)
dateStop = _dateparse(dateStop)
# Parse timeInterval if string
if type(timeInterval) is str:
try:
mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]]
except KeyError:
logger.error("timeInterval could not be parsed")
raise ValueError("timeInterval could not be parsed")
timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul)
# Get info about the attribute
info=infoattr(attribute)
logger.debug("Attribute information \n%s"%info)
# Detect spectrum
attrtype="scalar"
if int(info["max_dim_x"]) > 1:
logger.error("Attribute is not a scalar. Cannot perform this kind of operation.")
return None
# Cut data range in time chunks
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+timeInterval)
cdates[-1] = dateStop
mdates = np.asarray(cdates[:-1])+timeInterval/2
logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval))
# Prepare arrays
value_min = np.empty(len(cdates)-1)
value_max = np.empty(len(cdates)-1)
value_mean = np.empty(len(cdates)-1)
# For each time chunk
for i_d in range(len(cdates)-1):
for func, arr in zip(
["Max", "Min", "Avg"],
[value_max, value_min, value_mean],
):
# Make requests
logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%(
func,
attribute,
cdates[i_d].strftime(_DBDFMT2),
cdates[i_d+1].strftime(_DBDFMT2))
)
_val =getattr(_extractors[{'H':0, 'T':1}[db]], "GetAttData%sBetweenDates"%func)([
attribute,
cdates[i_d].strftime(_DBDFMT2),
cdates[i_d+1].strftime(_DBDFMT2)
])
arr[i_d] = _val
logger.debug("Extraction done for %s."%attribute)
return pd.DataFrame(
index=mdates,
data={
"Min":value_min,
"Mean":value_mean,
"Max":value_max,
},)
import logging
import datetime
import traceback
import pandas as pd
import numpy as np
import tango
import ArchiveExtractor as ae
import ArchiveExtractor.Amenities as aea
import ArchiveExtractor.Core as aec
##########################################################################
### Install logger for the module ###
##########################################################################
logger = logging.getLogger("ArchiveExtractor")
if not logger.hasHandlers():
# No handlers, create one
sh = logging.StreamHandler()
sh.setLevel(logger.level)
sh.setFormatter(logging.Formatter("{name}-{levelname:8}: {message}", style='{'))
logger.addHandler(sh)
##########################################################################
### Module initialisation functions ###
##########################################################################
def init(
HdbExtractorPath="archiving/hdbextractor/2",
TdbExtractorPath="archiving/tdbextractor/2",
loglevel="info",
):
"""
Initialize the module.
Instanciate tango.DeviceProxy for extractors (TDB and HDB)
Parameters:
-----------
HdbExtractorPath, TdbExtractorPath: string
Tango path to the extractors.
loglevel: string
loglevel to pass to logging.Logger
"""
ae._Extractors = (None, None)
ae._AttrTables = (None, None)
try:
logger.setLevel(getattr(logging, loglevel.upper()))
except AttributeError:
logger.error("Wrong log level specified: {}".format(loglevel.upper()))
logger.debug("Instanciating extractors device proxy...")
ae._Extractors = (tango.DeviceProxy(HdbExtractorPath), tango.DeviceProxy(TdbExtractorPath))
logger.debug("{} and {} instanciated.".format(*ae._Extractors))
logger.debug("Configuring extractors device proxy...")
for e in ae._Extractors:
# set timeout to 3 sec
e.set_timeout_millis(3000)
logger.debug("Filling attributes lookup tables...")
ae._AttrTables = tuple(e.getattnameall() for e in ae._Extractors)
logger.debug("HDB: {} TDB: {} attributes counted".format(len(ae._AttrTables[0]), len(ae._AttrTables[1])))
##########################################################################
### Module access functions ###
##########################################################################
def extract(
attr,
date1, date2=None,
method="nearest",
db='H',
):
"""
Access function to perform extraction between date1 and date2.
Can extract one or several attributes.
date1 and date2 can be both exact date, or one of two can be a time interval that will be taken relative to the other.
Parameters:
-----------
attr: string, list, dict
Attribute(s) to extract.
If string, extract the given attribute, returning a pandas.Series.
If list, extract attributes and return a list of pandas.Series.
If a dict, extract attributes and return a dict of pandas.Series with same keys.
date1, date2: string, datetime.datetime, datetime.timedelta, None
Exact date, or duration relative to date2.
If string, it will be parsed.
A start date can be given with string format '%Y-%m-%d-%H:%M:%S' or less precise (ie '2021-02', '2022-11-03' '2022-05-10-21:00'.i..).
A duration can be given with string format 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
A datetime.datetime object or datetime.timedelta object will be used as is.
date2 can be None. In that case it is replaced by the current time.
method: str
Method of extraction
'nearest': Retrieve nearest value of date1, date2 is ignored.
'between': Retrive data between date1 and date2.
db: str
Which database to look in, 'H' or 'T'.
"""
## _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
# Perform a few sanity checks
if not aea._check_initialized():
# Stop here, the function has produced a message if necessary
return
if not db in ("H", "T"):
raise ValueError("Attribute 'db' should be 'H' or 'T'")
allowedmethods=("nearest", "between", "minmaxmean")
if not method in allowedmethods:
raise ValueError("Attribute 'method' should be in {}".format(str(allowedmethods)))
## _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
# Work with dates
if not type(date1) in (datetime.datetime, datetime.timedelta):
date1 = aea._dateparse(date1)
if date2 is None:
date2 = datetime.datetime.now()
else:
if not type(date2) in (datetime.datetime, datetime.timedelta):
date2 = aea._dateparse(date2)
if not datetime.datetime in (type(date1), type(date2)):
logger.error("One of date1 date2 should be an exact date.\nGot {} {}".format(date1, date2))
raise ValueError("date1 and date2 not valid")
# Use timedelta relative to the other date. date1 is always before date2
if type(date1) is datetime.timedelta:
date1 = date2-date1
if type(date2) is datetime.timedelta:
date2 = date1+date2
if date1 > date2:
logger.error("date1 must precede date2.\nGot {} {}".format(date1, date2))
raise ValueError("date1 and date2 not valid")
## _-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_-_
# Perform extraction and return
if type(attr) is dict:
d=dict()
for k,v in attr.items():
try:
d.update({k:aec._extract_attribute(v, method, date1, date2, db)})
except Exception as e:
logger.debug("Exception in _extract_attribute(): "+str(e))
logger.debug(traceback.print_tb(e.__traceback__))
logger.error("Could not extract {}.".format(v))
return d
if type(attr) in (list,tuple):
d=[]
for v in attr:
try:
d.append(aec._extract_attribute(v, method, date1, date2, db))
except Exception as e:
logger.debug("Exception in _extract_attribute(): "+str(e))
logger.debug(traceback.print_tb(e.__traceback__))
logger.error("Could not extract {}.".format(v))
return d
try:
d=aec._extract_attribute(attr, method, date1, date2, db)
except Exception as e:
logger.debug("Exception in _extract_attribute(): "+str(e))
logger.debug(traceback.print_tb(e.__traceback__))
logger.error("Could not extract {}.".format(attr))
return None
return d
##----------------------------------------------------------------------##
def findattr(pattern, db="H"):
"""
Search for an attribute path using the pattern given.
Case insensitive.
Parameters:
-----------
pattern: str
Pattern to search, wildchar * accepted.
example "dg*dcct*current"
db: str
Which database to look in, 'H' or 'T'.
Returns:
--------
results: (str,)
List of string match
"""
if not aea._check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
keywords=pattern.lower().split('*')
# Select DB
attr_table = ae._AttrTables[{'H':0, 'T':1}[db]]
matches = [attr for attr in attr_table if all(k in attr.lower() for k in keywords)]
return matches
##----------------------------------------------------------------------##
def infoattr(attribute, db='H'):
"""
Get informations for an attribute and pack it into a python dict.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
Returns
-------
info : dict
Dictionnary of propertyname:propertyvalue
"""
if not aea._check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
info = dict()
for func in ("GetAttDefinitionData", "GetAttPropertiesData"):
R=getattr(ae._Extractors[{'H':0, 'T':1}[db]], func)(attribute)
if not R is None:
for i in R:
_s=i.split("::")
info[_s[0]]=_s[1]
else:
logger.warning("Function %s on extractor returned None"%func)
return info
import logging
import datetime
import numpy as np
import ArchiveExtractor as ae
# Get the module logger
logger = logging.getLogger("ArchiveExtractor")
##########################################################################
### Commodity private variables ###
##########################################################################
# Extractor date format for GetAttDataBetweenDates
_DBDFMT = "%Y-%m-%d %H:%M:%S"
# Extractor date format for GetNearestValue
_DBDFMT2 = "%d-%m-%Y %H:%M:%S"
##########################################################################
### Commodity private functions ###
##########################################################################
# Vectorized fromtimestamp function
# NOTE: it is faster than using pandas.to_datetime()
_ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
# Vectorized bool map dictionnary
_ArrayStr2Bool = np.vectorize({
"true":True, 't':True,
"false":False, 'f':False,
}.get)
def _check_initialized():
"""
Check if the module is initialized.
Returns
-------
success : boolean
"""
if None in ae._Extractors:
logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
return False
return True
##----------------------------------------------------------------------##
def _dateparse(datestr):
"""
Convenient function to parse date or duration strings.
Exact date format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
Duration format is 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
If datstr is None, take the actual date and time.
Parameters
---------
datestr : string
Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
Duration as a string, format 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
Exceptions
----------
ValueError
If the parsing failed.
Returns
-------
date : datetime.datetime or datetime.timedelta
Parsed date or duration
"""
logger.debug("Parsing date string '%s'"%datestr)
# Determine date/duration by looking at the last char
if datestr[-1] in "mhdM":
# Duration
logger.debug("Assuming a duration")
try:
q=float(datestr[:-1])
except ValueError as e:
logger.error("Failed to parse date string. Given the last character, a duration was assumed.")
raise Exception("Could not parse argument to a date") from e
# Convert all in minutes
minutes = q*{'m':1, 'h':60, 'd':60*24, 'm':30*60*24}[datestr[-1]]
return datetime.timedelta(minutes=minutes)
else:
# Probably a date string
# This gives all format that will be tried, in order.
# Stop on first parse success. Raise error if none succeed.
fmt = [
"%Y-%m-%d-%H:%M:%S",
"%Y-%m-%d-%H:%M",
"%Y-%m-%d-%H",
"%Y-%m-%d",
"%Y-%m",
]
date = None
for f in fmt:
try:
date = datetime.datetime.strptime(datestr, f)
except ValueError:
continue
else:
break
else:
raise ValueError("Could not parse argument to a date")
return date
##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
"""
Check that the attribute is in the database
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
"""
logger.debug("Check that %s is archived."%attribute)
if not ae._Extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
logger.error("Attribute '%s' is not archived in DB %s"%(attribute, ae._Extractors[{'H':0, 'T':1}[db]]))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, ae._Extractors[{'H':0, 'T':1}[db]]))
##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
"""
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime
Start date for extraction.
dateStop : datetime.datetime
Stop date for extraction.
db: str
Which database to look in, 'H' or 'T'.
Nmax: int
Max number of atoms in one chunk. Default 100000.
Returns
-------
cdates : list
List of datetime giving the limit of each chunks.
For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
"""
info=ae.infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Get the number of points
N=ae._Extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(_DBDFMT2),
dateStop.strftime(_DBDFMT2)
])
logger.debug("On the period, there is %d entries"%N)
dx=int(info["max_dim_x"])
if dx > 1:
logger.debug("Attribute is a vector with max dimension = %s"%dx)
N=N*dx
# If data chunk is too much, we need to cut it
if N > Nmax:
dt = (dateStop-dateStart)/(N//Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
return cdates
##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
"""
Run a command on tango.DeviceProxy, retrying on DevFailed.
Parameters
----------
dp: tango.DeviceProxy
Device proxy to try command onto.
cmd : str
Command to executte on the extractor
arg : list
Attribute to pass to the command
retry : int
Number of command retry on DevFailed
Returns
-------
cmdreturn :
Whatever the command returns.
None if failed after the amount of retries.
"""
logger.info("Perform Command {} {}".format(cmd, arg))
for i in range(retry):
# Make retrieval request
logger.debug("Execute %s (%s)"%(cmd, arg))
try:
cmdreturn = getattr(dp, cmd)(arg)
except tango.DevFailed as e:
logger.warning("The extractor device returned the following error:")
logger.warning(e)
if i == retry-1:
logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
return None
logger.warning("Retrying...")
continue
break
return cmdreturn
def _cast_bool(value):
"""
Cast a value, or array of values, to boolean.
Try to assess the input data type. If string, then try to find true or false word inside.
Parameters:
-----------
value: string, integer, or array of such
value to convert.
Return:
boolean:
value or array of boolean.
"""
# Force to array
value = np.asarray(value)
# cast back to single value
def castback(v):
if v.shape == ():
return v.item()
return v
# Simply try to cast to bool first
try:
value = value.astype("bool")
logger.debug("Direct conversion to boolean")
return castback(value)
except ValueError:
# Keep trying to cast
pass
logger.debug("Try to convert to boolean")
value = np.char.strip(np.char.lower(value))
value = _ArrayStr2Bool(value)
return castback(value)
import logging
import datetime
import numpy as np
import pandas as pd
import ArchiveExtractor as ae
import ArchiveExtractor.Amenities as aea
# Get the module logger
logger = logging.getLogger("ArchiveExtractor")
##########################################################################
### Module core functions ###
##########################################################################
def _extract_attribute(attribute, method, date1, date2, db):
"""
Check if exists, check scalar or spectrum and dispatch
"""
# Uncapitalize attribute
attribute = attribute.lower()
aea._check_attribute(attribute, db)
# Get info about the attribute
info=ae.infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Detect spectrum
attrtype="scalar"
if int(info["max_dim_x"]) > 1:
if int(info["max_dim_y"]) > 0:
logger.warning("Attribute %s is a (%s; %s) vector. This is poorly handled by this module."%(
attribute, info["max_dim_x"], info["max_dim_y"]))
attrtype="multi"
else:
logger.info("Attribute %s is a 1D vector, dimension = %s."%(
attribute, info["max_dim_x"]))
attrtype="vector"
# =============
# For now we handle multi dimension the same way as scalar, which will get only the first element
if (attrtype=="scalar") or (attrtype=="multi"):
if info["data_type"] == '1':
# Boolean data type, quick fix
dtype=bool
else:
dtype=float
return _extract_scalar(attribute, method, date1, date2, db, dtype)
if attrtype=="vector":
return _extract_vector(attribute, method, date1, date2, db)
##---------------------------------------------------------------------------##
def _extract_scalar(attribute, method, date1, date2, db, dtype):
# =====================
if method == "nearest":
cmdreturn = aea._cmd_with_retry(ae._Extractors[{'H':0, 'T':1}[db]], "GetNearestValue", [
attribute,
date1.strftime(aea._DBDFMT),
])
# Unpack return
try:
_date, _value = cmdreturn.split(';')
except TypeError:
logger.error("Could not extract this chunk. Check the device extractor")
return None
# Transform by datatype
if dtype is bool:
_value = _cast_bool(_value)
# Fabricate return pandas.Series
d=pd.Series(index=[datetime.datetime.fromtimestamp(int(_date)/1000),], data=[_value,], name=attribute)
return d
# =====================
if method == "between":
# Cut the time horizon in chunks
cdates = aea._chunkerize(attribute, date1, date2, db)
# Array to hold data
data = []
# For each date chunk
for i_d in range(len(cdates)-1):
cmdreturn = aea._cmd_with_retry(ae._Extractors[{'H':0, 'T':1}[db]], "ExtractBetweenDates", [
attribute,
cdates[i_d].strftime(aea._DBDFMT),
cdates[i_d+1].strftime(aea._DBDFMT)
])
# Unpack return
try:
_date, _value = cmdreturn
except TypeError:
logger.error("Could not extract this chunk. Check the device extractor")
return None
# Transform to datetime - value arrays
if dtype is bool:
_value = _cast_bool(_value)
else:
_value = np.asarray(_value, dtype=dtype)
if len(_date) > 0:
_date = aea._ArrayTimeStampToDatetime(_date/1000.0)
# Fabricate return pandas.Series
data.append(pd.Series(index=_date, data=_value, name=attribute))
# Concatenate chunks
return pd.concat(data)
# ========================
if method == "minmaxmean":
# If we are here, the method is not implemented
logger.error("Method {} is not implemented for scalars.".format(method))
raise NotImplemented
##---------------------------------------------------------------------------##
def _extract_vector(attribute, method, date1, date2, db):
# Get info about the attribute
info=ae.infoattr(attribute, db=db)
# =====================
if method == "nearest":
# Get nearest does not work with vector.
# Make a between date with surounding dates.
# Dynamically find surounding
cnt=0
dt=datetime.timedelta(seconds=10)
while cnt<1:
logger.debug("Seeking points in {} to {}".format(date1-dt,date1+dt))
cnt=ae._Extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
attribute,
(date1-dt).strftime(aea._DBDFMT2),
(date1+dt).strftime(aea._DBDFMT2)
])
dt=dt*1.5
logger.debug("Found {} points in a +- {} interval".format(cnt,str(dt/1.5)))
# For vector, we have to use the GetAttxxx commands
cmdreturn = aea._cmd_with_retry(ae._Extractors[{'H':0, 'T':1}[db]], "GetAttDataBetweenDates", [
attribute,
(date1-dt).strftime(aea._DBDFMT),
(date1+dt).strftime(aea._DBDFMT),
])
# Unpack return
try:
[N,], [name,] = cmdreturn
N=int(N)
except TypeError:
logger.error("Could not extract this attribute. Check the device extractor")
return None
# Read the history
logger.debug("Retrieve history of %d values. Dynamic attribute named %s."%(N, name))
attrHist = ae._Extractors[{'H':0, 'T':1}[db]].attribute_history(name, N)
# Transform to datetime - value arrays
_value = np.empty((N, int(info["max_dim_x"])), dtype=float)
_value[:] = np.nan
_date = np.empty(N, dtype=object)
for i_h in range(N):
_value[i_h,:attrHist[i_h].dim_x]=attrHist[i_h].value
_date[i_h]=attrHist[i_h].time.todatetime()
# Seeking nearest entry
idx=np.argmin(abs(_date-date1))
logger.debug("Found nearest value at index {}: {}".format(idx, _date[idx]))
# Fabricate return pandas.Series
d=pd.Series(index=[_date[idx],], data=[_value[idx],], name=attribute)
return d
# If we are here, the method is not implemented
logger.error("Method {} is not implemented for vectors.".format(method))
raise NotImplemented
##---------------------------------------------------------------------------##
def ExtrBetweenDates_MinMaxMean(
attribute,
dateStart,
dateStop=None,
timeInterval=datetime.timedelta(seconds=60),
db='H',
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
dateStop : datetime.datetime, string
Stop date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
Default is now (datetime.datetime.now())
timeInterval: datetime.timedelta, string
Time interval used to perform min,max and mean.
Can be a string with a number and a unit in "d", "h", "m" or "s"
db: str
Which database to look in, 'H' or 'T'.
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[mdates, value_min, value_max, value_mean] : array
mdates : numpy.ndarray of datetime.datime objects
Dates of the values, middle of timeInterval windows
value_min : numpy.ndarray
Minimum of the value on the interval
value_max : numpy.ndarray
Maximum of the value on the interval
value_mean : numpy.ndarray
Mean of the value on the interval
"""
if not _check_initialized():
return
if not db in ("H", "T"):
raise AttributeError("Attribute db should be 'H' or 'T'")
# Uncapitalize attribute
attribute = attribute.lower()
# Check attribute is in database
_check_attribute(attribute, db=db)
# Parse dates
dateStart = _dateparse(dateStart)
dateStop = _dateparse(dateStop)
# Parse timeInterval if string
if type(timeInterval) is str:
try:
mul = {'s':1, 'm':60, 'h':60*60, 'd':60*60*24}[timeInterval[-1]]
except KeyError:
logger.error("timeInterval could not be parsed")
raise ValueError("timeInterval could not be parsed")
timeInterval= datetime.timedelta(seconds=int(timeInterval[:-1])*mul)
# Get info about the attribute
info=infoattr(attribute)
logger.debug("Attribute information \n%s"%info)
# Detect spectrum
attrtype="scalar"
if int(info["max_dim_x"]) > 1:
logger.error("Attribute is not a scalar. Cannot perform this kind of operation.")
return None
# Cut data range in time chunks
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+timeInterval)
cdates[-1] = dateStop
mdates = np.asarray(cdates[:-1])+timeInterval/2
logger.debug("Cutting time range to %d chunks of time, %s each."%(len(cdates)-1, timeInterval))
# Prepare arrays
value_min = np.empty(len(cdates)-1)
value_max = np.empty(len(cdates)-1)
value_mean = np.empty(len(cdates)-1)
# For each time chunk
for i_d in range(len(cdates)-1):
for func, arr in zip(
["Max", "Min", "Avg"],
[value_max, value_min, value_mean],
):
# Make requests
logger.debug("Perform GetAttData%sBetweenDates (%s, %s, %s)"%(
func,
attribute,
cdates[i_d].strftime(_DBDFMT2),
cdates[i_d+1].strftime(_DBDFMT2))
)
_val =getattr(ae._Extractors[{'H':0, 'T':1}[db]], "GetAttData%sBetweenDates"%func)([
attribute,
cdates[i_d].strftime(_DBDFMT2),
cdates[i_d+1].strftime(_DBDFMT2)
])
arr[i_d] = _val
logger.debug("Extraction done for %s."%attribute)
return pd.DataFrame(
index=mdates,
data={
"Min":value_min,
"Mean":value_mean,
"Max":value_max,
},)
"""
Python module for extracting attribute from Archive Extractor Device.
"""
__version__ = "AUTOVERSIONREPLACE"
__all__ = ["Access", ]
##########################################################################
### Module private variables ###
##########################################################################
# Tuple of extractor for HDB and TDB
_Extractors = (None, None)
# Tuple for attribute tables
_AttrTables = (None, None)
##########################################################################
### Functions in Access are entry points ###
##########################################################################
from .Access import *
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
## Initialize on import
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
init()
# pySoleilControl
Useful python tools for SOLEIL control system.
The ultimate aim is to provide tools that can be used in a shell prompt, ipython prompt or in python applications (such as GUI).
Under construction, the first brick is a module to retrieve data from the Tango Archiver.
## ArchiveExtractor
# ArchiveExtractor
This module brings some functions to extract data from HDB/TDB.
## Quick example
Usage example, with an ipython prompt
```python
In [1]: import pySoleilControl.ArchiveExtractor as AE
In [2]: # For now, we need manual initialization of the module
...: AE.init()
In [1]: import ArchiveExtractor as AE
In [3]: # Looking for an attribute in HDB
In [2]: # Looking for an attribute in HDB
...: AE.findattr("ans/dg/*dcct*")
Out[3]:
Out[2]:
['ANS/DG/DCCT-CTRL/State',
'ANS/DG/DCCT-CTRL/Status',
'ANS/DG/DCCT-CTRL/current',
......@@ -26,10 +17,10 @@ Out[3]:
'ANS/DG/DCCT-CTRL/lifeTime',
'ANS/DG/DCCT-CTRL/lifeTimeErr']
In [4]: # Get data between two dates, this return a pandas.Dataframe object
...: AE.ExtrBetweenDates('ANS/DG/DCCT-CTRL/current', '2021-12-13', '2021-12-13-12:00')
In [3]: # Get data between two dates, this return a pandas.Dataframe object
...: AE.extract('ANS/DG/DCCT-CTRL/current', '2021-12-13', '2021-12-13-12:00')
INFO:Perform ExtractBetweenDates (ans/dg/dcct-ctrl/current, 2021-12-13 00:00:00, 2021-12-13 12:00:00)
Out[4]:
Out[3]:
2021-12-13 00:00:00 450.993568
2021-12-13 00:00:01 450.981979
2021-12-13 00:00:02 450.971455
......@@ -43,75 +34,48 @@ Out[4]:
2021-12-13 12:00:00 15.005410
Length: 42725, dtype: float64
In [5]: # Get min, max and mean with a 10 minute window
...: d=AE.ExtrBetweenDates_MinMaxMean('ANS/DG/DCCT-CTRL/current', '2021-12-13', '2021-12-13-12:00', timeInterval='10m')
In [6]: d
Out[6]:
Min Mean Max
2021-12-13 00:05:00 449.762286 450.619654 451.617095
2021-12-13 00:15:00 449.761171 450.676306 451.595391
2021-12-13 00:25:00 449.764910 450.684333 451.606520
2021-12-13 00:35:00 449.766284 450.688881 451.655843
2021-12-13 00:45:00 449.766808 450.716444 451.678886
... ... ... ...
2021-12-13 11:15:00 15.435944 15.495834 15.552427
2021-12-13 11:25:00 15.325022 15.383286 15.437459
2021-12-13 11:35:00 15.214556 15.270340 15.326395
2021-12-13 11:45:00 15.106309 15.163566 15.219333
2021-12-13 11:55:00 15.003812 15.056515 15.110022
[72 rows x 3 columns]
In [7]: # Activate inline matplotlib
...: %matplotlib
Using matplotlib backend: TkAgg
In [7]: # Simply plot
...: d.plot()
In [8]: # ipython prompt supports autocompletion. The doc of function can be quickly read by adding a '?'
...: AE.ExtrBetweenDates?
Signature: AE.ExtrBetweenDates(attribute, dateStart, dateStop=None, db='H')
```
## Main function help
```
Signature: AE.extract(attr, date1, date2=None, method='nearest', db='H')
Docstring:
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Access function to perform extraction between date1 and date2.
Can extract one or several attributes.
date1 and date2 can be both exact date, or one of two can be a time interval that will be taken relative to the other.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
Parameters:
-----------
attr: string, list, dict
Attribute(s) to extract.
If string, extract the given attribute, returning a pandas.Series.
If list, extract attributes and return a list of pandas.Series.
If a dict, extract attributes and return a dict of pandas.Series with same keys.
dateStop : datetime.datetime, string, None
Stop date for extraction.
date1, date2: string, datetime.datetime, datetime.timedelta, None
Exact date, or duration relative to date2.
If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
If None, it takes the current date and time.
Default is None (now).
A start date can be given with string format '%Y-%m-%d-%H:%M:%S' or less precise (ie '2021-02', '2022-11-03' '2022-05-10-21:00'.i..).
A duration can be given with string format 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
A datetime.datetime object or datetime.timedelta object will be used as is.
date2 can be None. In that case it is replaced by the current time.
method: str
Method of extraction
'nearest': Retrieve nearest value of date1, date2 is ignored.
'between': Retrive data between date1 and date2.
db: str
Which database to look in, 'H' or 'T'.
Exceptions
----------
ValueError
The attribute is not found in the database.
```
Returns
-------
[date, value] : array
date : numpy.ndarray of datetime.datime objects
Dates of the values
value : numpy.ndarray
Archived values
File: ~/GrpDiagnostics/RBT/pySoleilControl/ArchiveExtractor.py
Type: function
## For developpers
### Contribute
There is a simple test directory with only a manual test scipt.
Please run and check it before proposing merge.
```
\ No newline at end of file
#!/bin/bash
echo "Deploy module in RCM environment"
# Get the current version
version=$(git describe --tags)
# Copy Python module
ipath=/home/operateur/.local/lib/python3.6/site-packages/ArchiveExtractor
mkdir -p $ipath
cp ArchiveExtractor/*.py $ipath/ -rvf
# cwAWKSed the __init__.py file to replace the version variable
awk -v version="$version" '{ gsub("AUTOVERSIONREPLACE",version) ; print}' ArchiveExtractor/__init__.py > $ipath/__init__.py
import logging
logger = logging.getLogger("tester")
sh = logging.StreamHandler()
sh.setFormatter(logging.Formatter("{name}-{levelname:8}: {message}", style='{'))
logger.addHandler(sh)
logger.setLevel(logging.DEBUG)
# Put root folder in path
import sys, os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# test import
logger.info("Testing import")
import ArchiveExtractor
# it should have been automatically initialized, check that extractors are set
logger.info("Testing auto init")
logger.debug(ArchiveExtractor._Extractors)
if None in ArchiveExtractor._Extractors:
raise RuntimeError("ArchiveExtractor does not seems properly initialized")
# Test init()
logger.info("Testing init")
ArchiveExtractor.init(loglevel='debug')
###############################################################################
# Test findattr
logger.info("Testing findattr()")
attrs = ArchiveExtractor.findattr("*dg*dcct*current*")
logger.debug(attrs)
if len(attrs) < 1:
raise RuntimeError("Failed to get attributes with findattr")
attrs = ArchiveExtractor.findattr("*dg*dcct*current*", db='T')
logger.debug(attrs)
if len(attrs) < 1:
raise RuntimeError("Failed to get attributes with findattr")
###############################################################################
# Test infoattr
logger.info("Testing infoattr()")
info = ArchiveExtractor.infoattr(attrs[0], db='T')
logger.debug(info)
###############################################################################
# Test extractions
attr = ArchiveExtractor.findattr("ans/dg*dcct*current*")[0]
logger.info("Testing extract() ; scalar, nearest, timedelta")
val = ArchiveExtractor.extract(attr, "0h")
logger.debug(val)
if val is None:
raise RuntimeError("Could not perform extraction")
logger.info("Testing extract() ; scalar, between, precise date and timedelta")
val = ArchiveExtractor.extract(attr, "1h", "2023-12-13-00:30", method='between')
logger.debug(val)
if val is None:
raise RuntimeError("Could not perform extraction")
logger.info("Testing extract() ; scalar, nearest, specific date")
# Test several formats
for fmt in [
"2023-08",
"2024-01-10",
"2024-01-10-12:00",
]:
logger.debug(fmt)
val = ArchiveExtractor.extract(attr, fmt)
logger.debug(val)
if val is None:
raise RuntimeError("Could not perform extraction")
logger.info("Testing extract() ; dict, nearest, specific date")
val = ArchiveExtractor.extract({"attr":attr, "attr2":attr}, "2023-06")
logger.debug(val)
logger.info("Testing extract() ; list, nearest, specific date")
val = ArchiveExtractor.extract(ArchiveExtractor.findattr("dg*dcct*current"), "2023-06")
logger.debug(val)
logger.info("Testing extract() ; scalar, between, timedelta")
val = ArchiveExtractor.extract(attr, "3h", method='between')
logger.debug(val)
if val is None:
raise RuntimeError("Could not perform extraction")
logger.info("Testing extract() ; scalar, between, precise date")
val = ArchiveExtractor.extract(attr, "2023-12-13-00:30", "2023-12-13-01:30", method='between')
logger.debug(val)
if val is None:
raise RuntimeError("Could not perform extraction")
logger.info("Testing extract() ; spectrum, nearest, precise date")
val = ArchiveExtractor.extract('ANS/DG/BPM-MANAGER/zRefOrbit', "2023-12-13-00:30")
logger.debug(val)
if val is None:
raise RuntimeError("Could not perform extraction")
logger.info("Test success !")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment