Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Amenities.py 7.89 KiB
import logging
import datetime
import numpy as np
import ArchiveExtractor as ae
# Get the module logger
logger = logging.getLogger("ArchiveExtractor")
##########################################################################
### Commodity private variables ###
##########################################################################
# Extractor date format for GetAttDataBetweenDates
_DBDFMT = "%Y-%m-%d %H:%M:%S"
# Extractor date format for GetNearestValue
_DBDFMT2 = "%d-%m-%Y %H:%M:%S"
##########################################################################
### Commodity private functions ###
##########################################################################
# Vectorized fromtimestamp function
# NOTE: it is faster than using pandas.to_datetime()
_ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
# Vectorized bool map dictionnary
_ArrayStr2Bool = np.vectorize({
"true":True, 't':True,
"false":False, 'f':False,
}.get)
def _check_initialized():
"""
Check if the module is initialized.
Returns
-------
success : boolean
"""
if None in ae._Extractors:
logger.error("Module {0} is not initialied. You should run {0}.init().".format(__name__))
return False
return True
##----------------------------------------------------------------------##
def _dateparse(datestr):
"""
Convenient function to parse date or duration strings.
Exact date format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
Duration format is 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
If datstr is None, take the actual date and time.
Parameters
---------
datestr : string
Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
Duration as a string, format 'Xu' where X is a number and u is a unit in ('m':minutes, 'h':hours, 'd':days, 'M':months)
Exceptions
----------
ValueError
If the parsing failed.
Returns
-------
date : datetime.datetime or datetime.timedelta
Parsed date or duration
"""
logger.debug("Parsing date string '%s'"%datestr)
# Determine date/duration by looking at the last char
if datestr[-1] in "mhdM":
# Duration
logger.debug("Assuming a duration")
try:
q=float(datestr[:-1])
except ValueError as e:
logger.error("Failed to parse date string. Given the last character, a duration was assumed.")
raise Exception("Could not parse argument to a date") from e
# Convert all in minutes
minutes = q*{'m':1, 'h':60, 'd':60*24, 'm':30*60*24}[datestr[-1]]
return datetime.timedelta(minutes=minutes)
else:
# Probably a date string
# This gives all format that will be tried, in order.
# Stop on first parse success. Raise error if none succeed.
fmt = [
"%Y-%m-%d-%H:%M:%S",
"%Y-%m-%d-%H:%M",
"%Y-%m-%d-%H",
"%Y-%m-%d",
"%Y-%m",
]
date = None
for f in fmt:
try:
date = datetime.datetime.strptime(datestr, f)
except ValueError:
continue
else:
break
else:
raise ValueError("Could not parse argument to a date")
return date
##----------------------------------------------------------------------##
def _check_attribute(attribute, db):
"""
Check that the attribute is in the database
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
db: str
Which database to look in, 'H' or 'T'.
"""
logger.debug("Check that %s is archived."%attribute)
if not ae._Extractors[{'H':0, 'T':1}[db]].IsArchived(attribute):
logger.error("Attribute '%s' is not archived in DB %s"%(attribute, ae._Extractors[{'H':0, 'T':1}[db]]))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, ae._Extractors[{'H':0, 'T':1}[db]]))
##----------------------------------------------------------------------##
def _chunkerize(attribute, dateStart, dateStop, db, Nmax=100000):
"""
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime
Start date for extraction.
dateStop : datetime.datetime
Stop date for extraction.
db: str
Which database to look in, 'H' or 'T'.
Nmax: int
Max number of atoms in one chunk. Default 100000.
Returns
-------
cdates : list
List of datetime giving the limit of each chunks.
For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
"""
info=ae.infoattr(attribute, db=db)
logger.debug("Attribute information \n%s"%info)
# Get the number of points
N=ae._Extractors[{'H':0, 'T':1}[db]].GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(_DBDFMT2),
dateStop.strftime(_DBDFMT2)
])
logger.debug("On the period, there is %d entries"%N)
dx=int(info["max_dim_x"])
if dx > 1:
logger.debug("Attribute is a vector with max dimension = %s"%dx)
N=N*dx
# If data chunk is too much, we need to cut it
if N > Nmax:
dt = (dateStop-dateStart)/(N//Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
return cdates
##----------------------------------------------------------------------##
def _cmd_with_retry(dp, cmd, arg, retry=2):
"""
Run a command on tango.DeviceProxy, retrying on DevFailed.
Parameters
----------
dp: tango.DeviceProxy
Device proxy to try command onto.
cmd : str
Command to executte on the extractor
arg : list
Attribute to pass to the command
retry : int
Number of command retry on DevFailed
Returns
-------
cmdreturn :
Whatever the command returns.
None if failed after the amount of retries.
"""
logger.info("Perform Command {} {}".format(cmd, arg))
for i in range(retry):
# Make retrieval request
logger.debug("Execute %s (%s)"%(cmd, arg))
try:
cmdreturn = getattr(dp, cmd)(arg)
except tango.DevFailed as e:
logger.warning("The extractor device returned the following error:")
logger.warning(e)
if i == retry-1:
logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
return None
logger.warning("Retrying...")
continue
break
return cmdreturn
def _cast_bool(value):
"""
Cast a value, or array of values, to boolean.
Try to assess the input data type. If string, then try to find true or false word inside.
Parameters:
-----------
value: string, integer, or array of such
value to convert.
Return:
boolean:
value or array of boolean.
"""
# Force to array
value = np.asarray(value)
# cast back to single value
def castback(v):
if v.shape == ():
return v.item()
return v
# Simply try to cast to bool first
try:
value = value.astype("bool")
logger.debug("Direct conversion to boolean")
return castback(value)
except ValueError:
# Keep trying to cast
pass
logger.debug("Try to convert to boolean")
value = np.char.strip(np.char.lower(value))
value = _ArrayStr2Bool(value)
return castback(value)