diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..0d20b6487c61e7d1bde93acf4a14b7a89083a16d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/core/ArchiveExtractor.py b/core/ArchiveExtractor.py index 6da1dd91c2087eeebf3c2c95204b2dddc47b0aa5..8e0ff3391afd07e22330298798f1401c7cba81a1 100755 --- a/core/ArchiveExtractor.py +++ b/core/ArchiveExtractor.py @@ -23,8 +23,6 @@ ArrayTimeStampToDatetime = np.vectorize(datetime.datetime.fromtimestamp) class ArchiveExtractor: - # Max number of point per extraction chunks - Nmax = 100000 ########################################################################## def __init__( @@ -181,30 +179,11 @@ class ArchiveExtractor: # Uncapitalize attribute attribute = attribute.lower() - # Check that the attribute is in the database - self.logger.debug("Check that %s is archived."%attribute) - if not self.extractor.IsArchived(attribute): - self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) - raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) - - # Get the number of points - N=self.extractor.GetAttDataBetweenDatesCount([ - attribute, - dateStart.strftime(DBDFMT2), - dateStop.strftime(DBDFMT2) - ]) - self.logger.debug("On the period, there is %d entries"%N) + # Check attribute is in database + self._check_attribute(attribute) - # If data chunk is too much, we need to cut it - if N > self.Nmax: - dt = (dateStop-dateStart)/(N//self.Nmax) - cdates = [dateStart] - while cdates[-1] < dateStop: - cdates.append(cdates[-1]+dt) - cdates[-1] = dateStop - self.logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt)) - else: - cdates=[dateStart, dateStop] + # Cut the time horizon in chunks + cdates = self.chunkerize(attribute, dateStart, dateStop) # Arrays to hold every chunks value = [] @@ -213,31 +192,27 @@ class ArchiveExtractor: # For each date chunk for i_d in range(len(cdates)-1): - # 2 retries on DevFailed - for i in range(3): - # Make retrieval request - self.logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%( - attribute, - cdates[i_d].strftime(DBDFMT), - cdates[i_d+1].strftime(DBDFMT)) - ) - - try: - _date, _value = self.extractor.ExtractBetweenDates([ - attribute, - cdates[i_d].strftime(DBDFMT), - cdates[i_d+1].strftime(DBDFMT) - ]) - except tango.DevFailed as e: - self.logger.warning("The extractor device returned the following error:") - self.logger.warning(e) - self.logger.warning("Retrying...") - continue - break - if i==2: + # Inform on retrieval request + self.logger.info("Perform ExtractBetweenDates (%s, %s, %s)"%( + attribute, + cdates[i_d].strftime(DBDFMT), + cdates[i_d+1].strftime(DBDFMT)) + ) + + cmdreturn = self._cmd_with_retry("ExtractBetweenDates", [ + attribute, + cdates[i_d].strftime(DBDFMT), + cdates[i_d+1].strftime(DBDFMT) + ]) + + # Check command return + if cmdreturn is None: logger.error("Could not extract this chunk. Check the device extractor") return None + # Unpack return + _date, _value = cmdreturn + # Transform to datetime - value arrays # NOTE: it is faster than using pandas.to_datetime() _value = np.asarray(_value, dtype=float) @@ -251,12 +226,10 @@ class ArchiveExtractor: value = np.concatenate(value) date = np.concatenate(date) - self.logger.debug("Extraction done for %s."%attribute) return pd.Series(index=date, data=value) - ##---------------------------------------------------------------------------## def betweenDates_MinMaxMean( self, @@ -364,4 +337,96 @@ class ArchiveExtractor: self.logger.debug("Extraction done for %s."%attribute) return [mdates, value_min, value_max, value_mean] + def _check_attribute(self, attribute): + """ + Check that the attribute is in the database + + Parameters + ---------- + attribute : String + Name of the attribute. Full Tango name i.e. "test/dg/panda/current". + """ + self.logger.debug("Check that %s is archived."%attribute) + if not self.extractor.IsArchived(attribute): + self.logger.error("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) + raise ValueError("Attribute '%s' is not archived in DB %s"%(attribute, extractor)) + + def _cmd_with_retry(self, cmd, arg, retry=2): + """ + Run a command on extractor tango proxy, retrying on DevFailed. + + Parameters + ---------- + cmd : str + Command to executte on the extractor + + arg : list + Attribute to pass to the command + + retry : int + Number of command retry on DevFailed + + Returns + ------- + cmdreturn : + Whatever the command returns. + None if failed after the amount of retries. + """ + + for i in range(retry-1): + # Make retrieval request + self.logger.debug("Execute %s (%s)"%(cmd, arg)) + try: + cmdreturn = getattr(self.extractor, cmd)(arg) + except tango.DevFailed as e: + self.logger.warning("The extractor device returned the following error:") + self.logger.warning(e) + self.logger.warning("Retrying...") + continue + break + if i==2: + logger.error("Could not execute command %s (%s). Check the device extractor"%(cmd, arg)) + return None + return cmdreturn + + + def chunkerize(self, attribute, dateStart, dateStop, Nmax=100000): + """ + + Parameters + ---------- + attribute : String + Name of the attribute. Full Tango name i.e. "test/dg/panda/current". + + dateStart : datetime.datetime + Start date for extraction. + + dateStop : datetime.datetime + Stop date for extraction. + + Returns + ------- + cdates : list + List of datetime giving the limit of each chunks. + For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included. + """ + # Get the number of points + N=self.extractor.GetAttDataBetweenDatesCount([ + attribute, + dateStart.strftime(DBDFMT2), + dateStop.strftime(DBDFMT2) + ]) + self.logger.debug("On the period, there is %d entries"%N) + + # If data chunk is too much, we need to cut it + if N > Nmax: + dt = (dateStop-dateStart)/(N//Nmax) + cdates = [dateStart] + while cdates[-1] < dateStop: + cdates.append(cdates[-1]+dt) + cdates[-1] = dateStop + self.logger.debug("Cutting access to %d little chunks of time, %s each."%(len(cdates)-1, dt)) + else: + cdates=[dateStart, dateStop] + return cdates diff --git a/source_env b/source_env new file mode 100644 index 0000000000000000000000000000000000000000..bf982991d1a910c56e94d1339a6a370712bcf8e8 --- /dev/null +++ b/source_env @@ -0,0 +1,5 @@ +# Source the python3 env +source /usr/Local/pyroot/pytango3rc + +# Add the current directory to pythonpath +export PYTHONPATH=${PYTHONPATH}:`pwd`