Skip to content
Snippets Groups Projects
Commit 7c29e262 authored by BRONES Romain's avatar BRONES Romain
Browse files

Towards a real module package

* Add __init__.py in core (empty for now)
* Add a bash source script yo load Python3 env and add the directory to pyhton path
* Add a .gitignore
* Cut out some code in ArchiveExtractor.betweenDates to outside functions
parent 90cfbea5
No related branches found
No related tags found
No related merge requests found
*.pyc
...@@ -23,8 +23,6 @@ ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp) ...@@ -23,8 +23,6 @@ ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp)
class ArchiveExtractor: class ArchiveExtractor:
# Max number of point per extraction chunks
Nmax = 100000
########################################################################## ##########################################################################
def __init__( def __init__(
...@@ -181,30 +179,11 @@ class ArchiveExtractor: ...@@ -181,30 +179,11 @@ class ArchiveExtractor:
# Uncapitalize attribute # Uncapitalize attribute
attribute = attribute.lower() attribute = attribute.lower()
# Check that the attribute is in the database # Check attribute is in database
self.logger.debug("Check that %s is archived."%attribute) self._check_attribute(attribute)
if not self.extractor.IsArchived(attribute):
self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
# Get the number of points
N=self.extractor.GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(DBDFMT2),
dateStop.strftime(DBDFMT2)
])
self.logger.debug("On the period, there is %d entries"%N)
# If data chunk is too much, we need to cut it # Cut the time horizon in chunks
if N > self.Nmax: cdates = self.chunkerize(attribute, dateStart, dateStop)
dt = (dateStop-dateStart)/(N//self.Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
self.logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
# Arrays to hold every chunks # Arrays to hold every chunks
value = [] value = []
...@@ -213,31 +192,27 @@ class ArchiveExtractor: ...@@ -213,31 +192,27 @@ class ArchiveExtractor:
# For each date chunk # For each date chunk
for i_d in range(len(cdates)-1): for i_d in range(len(cdates)-1):
# 2 retries on DevFailed # Inform on retrieval request
for i in range(3): self.logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%(
# Make retrieval request attribute,
self.logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%( cdates[i_d].strftime(DBDFMT),
attribute, cdates[i_d+1].strftime(DBDFMT))
cdates[i_d].strftime(DBDFMT), )
cdates[i_d+1].strftime(DBDFMT))
) cmdreturn = self._cmd_with_retry("ExtractBetweenDates", [
attribute,
try: cdates[i_d].strftime(DBDFMT),
_date, _value = self.extractor.ExtractBetweenDates([ cdates[i_d+1].strftime(DBDFMT)
attribute, ])
cdates[i_d].strftime(DBDFMT),
cdates[i_d+1].strftime(DBDFMT) # Check command return
]) if cmdreturn is None:
except tango.DevFailed as e:
self.logger.warning("The extractor device returned the following error:")
self.logger.warning(e)
self.logger.warning("Retrying...")
continue
break
if i==2:
logger.error("Could not extract this chunk. Check the device extractor") logger.error("Could not extract this chunk. Check the device extractor")
return None return None
# Unpack return
_date, _value = cmdreturn
# Transform to datetime - value arrays # Transform to datetime - value arrays
# NOTE: it is faster than using pandas.to_datetime() # NOTE: it is faster than using pandas.to_datetime()
_value = np.asarray(_value, dtype=float) _value = np.asarray(_value, dtype=float)
...@@ -251,12 +226,10 @@ class ArchiveExtractor: ...@@ -251,12 +226,10 @@ class ArchiveExtractor:
value = np.concatenate(value) value = np.concatenate(value)
date = np.concatenate(date) date = np.concatenate(date)
self.logger.debug("Extraction done for %s."%attribute) self.logger.debug("Extraction done for %s."%attribute)
return pd.Series(index=date, data=value) return pd.Series(index=date, data=value)
##---------------------------------------------------------------------------## ##---------------------------------------------------------------------------##
def betweenDates_MinMaxMean( def betweenDates_MinMaxMean(
self, self,
...@@ -364,4 +337,96 @@ class ArchiveExtractor: ...@@ -364,4 +337,96 @@ class ArchiveExtractor:
self.logger.debug("Extraction done for %s."%attribute) self.logger.debug("Extraction done for %s."%attribute)
return [mdates, value_min, value_max, value_mean] return [mdates, value_min, value_max, value_mean]
def _check_attribute(self, attribute):
"""
Check that the attribute is in the database
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
"""
self.logger.debug("Check that %s is archived."%attribute)
if not self.extractor.IsArchived(attribute):
self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor))
def _cmd_with_retry(self, cmd, arg, retry=2):
"""
Run a command on extractor tango proxy, retrying on DevFailed.
Parameters
----------
cmd : str
Command to executte on the extractor
arg : list
Attribute to pass to the command
retry : int
Number of command retry on DevFailed
Returns
-------
cmdreturn :
Whatever the command returns.
None if failed after the amount of retries.
"""
for i in range(retry-1):
# Make retrieval request
self.logger.debug("Execute %s (%s)"%(cmd, arg))
try:
cmdreturn = getattr(self.extractor, cmd)(arg)
except tango.DevFailed as e:
self.logger.warning("The extractor device returned the following error:")
self.logger.warning(e)
self.logger.warning("Retrying...")
continue
break
if i==2:
logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg))
return None
return cmdreturn
def chunkerize(self, attribute, dateStart, dateStop, Nmax=100000):
"""
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e. "test/dg/panda/current".
dateStart : datetime.datetime
Start date for extraction.
dateStop : datetime.datetime
Stop date for extraction.
Returns
-------
cdates : list
List of datetime giving the limit of each chunks.
For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
"""
# Get the number of points
N=self.extractor.GetAttDataBetweenDatesCount([
attribute,
dateStart.strftime(DBDFMT2),
dateStop.strftime(DBDFMT2)
])
self.logger.debug("On the period, there is %d entries"%N)
# If data chunk is too much, we need to cut it
if N > Nmax:
dt = (dateStop-dateStart)/(N//Nmax)
cdates = [dateStart]
while cdates[-1] < dateStop:
cdates.append(cdates[-1]+dt)
cdates[-1] = dateStop
self.logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt))
else:
cdates=[dateStart, dateStop]
return cdates
# Source the python3 env
source /usr/Local/pyroot/pytango3rc
# Add the current directory to pythonpath
export PYTHONPATH=${PYTHONPATH}:`pwd`
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment