Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
A
ArchiveExtractor
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
DG
ArchiveExtractor
Commits
d500986a
Commit
d500986a
authored
Nov 17, 2022
by
BRONES Romain
Browse files
Options
Downloads
Patches
Plain Diff
Finally remove class
parent
69071287
Branches
Branches containing commit
Tags
Tags containing commit
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
core/ArchiveExtractor.py
+123
-534
123 additions, 534 deletions
core/ArchiveExtractor.py
with
123 additions
and
534 deletions
core/ArchiveExtractor.py
+
123
−
534
View file @
d500986a
...
...
@@ -59,6 +59,7 @@ def _dateparse(datestr):
"""
Convenient function to parse date strings.
Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
If datstr is None, take the actual date and time.
Parameters
---------
...
...
@@ -76,6 +77,9 @@ def _dateparse(datestr):
Parsed date
"""
if
datestr
is
None
:
return
datetime
.
datetime
.
now
()
# This gives all format that will be tried, in order.
# Stop on first parse success. Raise error if none succeed.
fmt
=
[
...
...
@@ -217,7 +221,6 @@ def _cmd_with_retry(dp, cmd, arg, retry=2):
break
return
cmdreturn
##########################################################################
### Module private variables ###
##########################################################################
...
...
@@ -234,6 +237,7 @@ _AttrTables = (None, None)
def
init
(
HdbExtractorPath
=
"
archiving/hdbextractor/2
"
,
TdbExtractorPath
=
"
archiving/tdbextractor/2
"
,
loglevel
=
"
info
"
,
):
"""
Initialize the module.
...
...
@@ -241,10 +245,18 @@ def init(
HdbExtractorPath, TdbExtractorPath: string
Tango path to the extractors.
loglevel: string
loglevel to pass to logging.Logger
"""
global
_extractors
global
_AttrTables
try
:
logger
.
setLevel
(
getattr
(
logging
,
loglevel
.
upper
()))
except
AttributeError
:
logger
.
error
(
"
Wrong log level specified: {}
"
.
format
(
loglevel
.
upper
()))
logger
.
debug
(
"
Instanciating extractors device proxy...
"
)
_extractors
=
(
tango
.
DeviceProxy
(
HdbExtractorPath
),
tango
.
DeviceProxy
(
TdbExtractorPath
))
...
...
@@ -378,26 +390,22 @@ def ExtrBetweenDates(
Archived values
"""
if
not
_check_initialized
():
return
if
not
db
in
(
"
H
"
,
"
T
"
):
raise
AttributeError
(
"
Attribute db should be
'
H
'
or
'
T
'"
)
# Parse date if it is string
if
type
(
dateStart
)
is
str
:
dateStart
=
_dateparse
(
dateStart
)
if
dateStop
is
None
:
dateStop
=
datetime
.
datetime
.
now
()
if
type
(
dateStop
)
is
str
:
dateStop
=
_dateparse
(
dateStop
)
# Uncapitalize attribute
attribute
=
attribute
.
lower
()
# Check attribute is in database
_check_attribute
(
attribute
,
db
=
db
)
# Parse dates
dateStart
=
_dateparse
(
dateStart
)
dateStop
=
_dateparse
(
dateStop
)
# Get info about the attribute
info
=
infoattr
(
attribute
,
db
=
db
)
logger
.
debug
(
"
Attribute information
\n
%s
"
%
info
)
...
...
@@ -503,284 +511,13 @@ def ExtrBetweenDates(
return
pd
.
Series
(
index
=
date
,
data
=
value
)
class
ArchiveExtractor
:
##########################################################################
def
__init__
(
self
,
extractorKind
=
'
H
'
,
extractorNumber
=
2
,
extractorPath
=
None
,
logger
=
'
info
'
,
):
"""
Constructor function
Parameters
----------
extractorKind: char
Either
'
H
'
or
'
T
'
for HDB or TDB.
extractorNumber: int
Number of the archive extractor instance to use.
extractorPath: string
Tango path to the extractor.
If this argument is given, it takes precedence over extractorKind and extractorNumber.
logger: logging.Logger, str
Logger object to use.
If string, can be a log level. A basic logger with stream handler will be instanciated.
Default to
'
info
'
.
Return
------
ArchiveExtractor
"""
#######################################################
# Get logger
if
type
(
logger
)
==
logging
.
Logger
:
self
.
logger
=
logger
else
:
self
.
logger
=
logging
.
getLogger
(
__name__
)
self
.
logger
.
setLevel
(
getattr
(
logging
,
logger
.
upper
()))
if
not
self
.
logger
.
hasHandlers
():
# No handlers, create one
sh
=
logging
.
StreamHandler
()
sh
.
setLevel
(
self
.
logger
.
level
)
sh
.
setFormatter
(
logging
.
Formatter
(
"
%(levelname)s:%(message)s
"
))
self
.
logger
.
addHandler
(
sh
)
#######################################################
# Select Extractor
if
extractorPath
is
None
:
self
.
extractor
=
tango
.
DeviceProxy
(
"
archiving/%sDBExtractor/%d
"
%
(
extractorKind
,
extractorNumber
)
)
else
:
self
.
extractor
=
tango
.
DeviceProxy
(
extractorPath
)
self
.
extractor
.
set_timeout_millis
(
3000
)
self
.
logger
.
debug
(
"
Archive Extractor %s used.
"
%
self
.
extractor
.
name
())
# Get the attribute table
self
.
attr_table
=
self
.
extractor
.
getattnameall
()
##---------------------------------------------------------------------------##
@staticmethod
def
dateparse
(
datestr
):
"""
Convenient function to parse date strings.
Global format is %Y-%m-%d-%H:%M:%S and it can be reduced to be less precise.
Parameters
---------
datestr : string
Date as a string, format %Y-%m-%d-%H:%M:%S or less precise.
Exceptions
----------
ValueError
If the parsing failed.
Returns
-------
date : datetime.datetime
Parsed date
"""
# This gives all format that will be tried, in order.
# Stop on first parse success. Raise error if none succeed.
fmt
=
[
"
%Y-%m-%d-%H:%M:%S
"
,
"
%Y-%m-%d-%H:%M
"
,
"
%Y-%m-%d-%H
"
,
"
%Y-%m-%d
"
,
"
%Y-%m
"
,
]
date
=
None
for
f
in
fmt
:
try
:
date
=
datetime
.
datetime
.
strptime
(
datestr
,
f
)
except
ValueError
:
continue
else
:
break
else
:
raise
ValueError
(
"
Could not parse argument to a date
"
)
return
date
##---------------------------------------------------------------------------##
def
betweenDates
(
self
,
attribute
,
dateStart
,
dateStop
=
None
,
):
"""
Query attribute data from an archiver database, get all points between dates.
Use ExtractBetweenDates.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e.
"
test/dg/panda/current
"
.
dateStart : datetime.datetime, string
Start date for extraction. If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
dateStop : datetime.datetime, string, None
Stop date for extraction.
If string, it will be parsed.
Example of string format %Y-%m-%d-%H:%M:%S or less precise.
If None, it takes the current date and time.
Default is None (now).
Exceptions
----------
ValueError
The attribute is not found in the database.
Returns
-------
[date, value] : array
date : numpy.ndarray of datetime.datime objects
Dates of the values
value : numpy.ndarray
Archived values
"""
# Parse date if it is string
if
type
(
dateStart
)
is
str
:
dateStart
=
self
.
dateparse
(
dateStart
)
if
dateStop
is
None
:
dateStop
=
datetime
.
datetime
.
now
()
if
type
(
dateStop
)
is
str
:
dateStop
=
self
.
dateparse
(
dateStop
)
# Uncapitalize attribute
attribute
=
attribute
.
lower
()
# Check attribute is in database
self
.
_check_attribute
(
attribute
)
# Get info about the attribute
info
=
self
.
infoattr
(
attribute
)
self
.
logger
.
debug
(
"
Attribute information
\n
%s
"
%
info
)
# Detect spectrum
attrtype
=
"
scalar
"
if
int
(
info
[
"
max_dim_x
"
])
>
1
:
if
int
(
info
[
"
max_dim_y
"
])
>
0
:
self
.
logger
.
warning
(
"
Attribute %s is a (%s; %s) vector. This is poorly handled by this script.
"
%
(
attribute
,
info
[
"
max_dim_x
"
],
info
[
"
max_dim_y
"
]))
attrtype
=
"
multi
"
else
:
self
.
logger
.
info
(
"
Attribute %s is a 1D vector, dimension = %s.
"
%
(
attribute
,
info
[
"
max_dim_x
"
]))
attrtype
=
"
vector
"
# Cut the time horizon in chunks
cdates
=
self
.
chunkerize
(
attribute
,
dateStart
,
dateStop
)
# Arrays to hold every chunks
value
=
[]
date
=
[]
# For each date chunk
for
i_d
in
range
(
len
(
cdates
)
-
1
):
# =============
# For now we handle multi dimension the same way as scalar, which will get only the first element
if
(
attrtype
==
"
scalar
"
)
or
(
attrtype
==
"
multi
"
):
# Inform on retrieval request
self
.
logger
.
info
(
"
Perform ExtractBetweenDates (%s, %s, %s)
"
%
(
attribute
,
cdates
[
i_d
].
strftime
(
_DBDFMT
),
cdates
[
i_d
+
1
].
strftime
(
_DBDFMT
))
)
cmdreturn
=
self
.
_cmd_with_retry
(
"
ExtractBetweenDates
"
,
[
attribute
,
cdates
[
i_d
].
strftime
(
_DBDFMT
),
cdates
[
i_d
+
1
].
strftime
(
_DBDFMT
)
])
# Check command return
if
cmdreturn
is
None
:
logger
.
error
(
"
Could not extract this chunk. Check the device extractor
"
)
return
None
# Unpack return
_date
,
_value
=
cmdreturn
# Transform to datetime - value arrays
# NOTE: it is faster than using pandas.to_datetime()
_value
=
np
.
asarray
(
_value
,
dtype
=
float
)
if
len
(
_date
)
>
0
:
_date
=
_ArrayTimeStampToDatetime
(
_date
/
1000.0
)
value
.
append
(
_value
)
date
.
append
(
_date
)
# =============
if
attrtype
==
"
vector
"
:
self
.
logger
.
info
(
"
Perform GetAttDataBetweenDates (%s, %s, %s)
"
%
(
attribute
,
cdates
[
i_d
].
strftime
(
_DBDFMT
),
cdates
[
i_d
+
1
].
strftime
(
_DBDFMT
)
))
[
N
,],
[
name
,]
=
self
.
extractor
.
GetAttDataBetweenDates
([
attribute
,
cdates
[
i_d
].
strftime
(
_DBDFMT
),
cdates
[
i_d
+
1
].
strftime
(
_DBDFMT
)
])
N
=
int
(
N
)
# Read the history
self
.
logger
.
debug
(
"
Retrieve history of %d values. Dynamic attribute named %s.
"
%
(
N
,
name
))
attrHist
=
self
.
extractor
.
attribute_history
(
name
,
N
)
# Transform to datetime - value arrays
_value
=
np
.
empty
((
N
,
int
(
info
[
"
max_dim_x
"
])),
dtype
=
float
)
_value
[:]
=
np
.
nan
_date
=
np
.
empty
(
N
,
dtype
=
object
)
for
i_h
in
range
(
N
):
_value
[
i_h
,:
attrHist
[
i_h
].
dim_x
]
=
attrHist
[
i_h
].
value
_date
[
i_h
]
=
attrHist
[
i_h
].
time
.
todatetime
()
# Remove dynamic attribute
self
.
logger
.
debug
(
"
Remove dynamic attribute %s.
"
%
name
)
self
.
extractor
.
RemoveDynamicAttribute
(
name
)
value
.
append
(
_value
)
date
.
append
(
_date
)
self
.
logger
.
debug
(
"
Concatenate chunks
"
)
value
=
np
.
concatenate
(
value
)
date
=
np
.
concatenate
(
date
)
self
.
logger
.
debug
(
"
Extraction done for %s.
"
%
attribute
)
if
attrtype
==
"
vector
"
:
return
pd
.
DataFrame
(
index
=
date
,
data
=
value
).
dropna
(
axis
=
1
,
how
=
'
all
'
)
else
:
return
pd
.
Series
(
index
=
date
,
data
=
value
)
##---------------------------------------------------------------------------##
def
betweenDates_MinMaxMean
(
self
,
def
ExtrBetweenDates_MinMaxMean
(
attribute
,
dateStart
,
dateStop
=
datetime
.
datetime
.
now
(),
timeInterval
=
datetime
.
timedelta
(
seconds
=
60
),
db
=
'
H
'
,
):
"""
Query attribute data from an archiver database, get all points between dates.
...
...
@@ -804,6 +541,9 @@ class ArchiveExtractor:
Time interval used to perform min,max and mean.
Can be a string with a number and a unit in
"
d
"
,
"
h
"
,
"
m
"
or
"
s
"
db: str
Which database to look in,
'
H
'
or
'
T
'
.
Exceptions
----------
ValueError
...
...
@@ -822,29 +562,31 @@ class ArchiveExtractor:
Mean of the value on the interval
"""
if
not
_check_initialized
():
return
# Parse date if it is string
if
type
(
dateStart
)
is
str
:
dateStart
=
self
.
dateparse
(
dateStart
)
if
type
(
dateStop
)
is
str
:
dateStop
=
self
.
dateparse
(
dateStop
)
if
not
db
in
(
"
H
"
,
"
T
"
):
raise
AttributeError
(
"
Attribute db should be
'
H
'
or
'
T
'"
)
# Uncapitalize attribute
attribute
=
attribute
.
lower
()
# Check attribute is in database
_check_attribute
(
attribute
,
db
=
db
)
# Parse dates
dateStart
=
_dateparse
(
dateStart
)
dateStop
=
_dateparse
(
dateStop
)
# Parse timeInterval if string
if
type
(
timeInterval
)
is
str
:
try
:
mul
=
{
'
s
'
:
1
,
'
m
'
:
60
,
'
h
'
:
60
*
60
,
'
d
'
:
60
*
60
*
24
}[
timeInterval
[
-
1
]]
except
KeyError
:
self
.
logger
.
error
(
"
timeInterval could not be parsed
"
)
logger
.
error
(
"
timeInterval could not be parsed
"
)
raise
ValueError
(
"
timeInterval could not be parsed
"
)
timeInterval
=
datetime
.
timedelta
(
seconds
=
int
(
timeInterval
[:
-
1
])
*
mul
)
# Check that the attribute is in the database
self
.
logger
.
debug
(
"
Check that %s is archived.
"
%
attribute
)
if
not
self
.
extractor
.
IsArchived
(
attribute
):
self
.
logger
.
error
(
"
Attribute
'
%s
'
is not archived in DB %s
"
%
(
attribute
,
extractor
))
raise
ValueError
(
"
Attribute
'
%s
'
is not archived in DB %s
"
%
(
attribute
,
extractor
))
# Get info about the attribute
info
=
self
.
infoattr
(
attribute
)
self
.
logger
.
debug
(
"
Attribute information
\n
%s
"
%
info
)
...
...
@@ -893,156 +635,3 @@ class ArchiveExtractor:
self
.
logger
.
debug
(
"
Extraction done for %s.
"
%
attribute
)
return
[
mdates
,
value_min
,
value_max
,
value_mean
]
def
_check_attribute
(
self
,
attribute
):
"""
Check that the attribute is in the database
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e.
"
test/dg/panda/current
"
.
"""
self
.
logger
.
debug
(
"
Check that %s is archived.
"
%
attribute
)
if
not
self
.
extractor
.
IsArchived
(
attribute
):
self
.
logger
.
error
(
"
Attribute
'
%s
'
is not archived in DB %s
"
%
(
attribute
,
self
.
extractor
))
raise
ValueError
(
"
Attribute
'
%s
'
is not archived in DB %s
"
%
(
attribute
,
self
.
extractor
))
def
_cmd_with_retry
(
self
,
cmd
,
arg
,
retry
=
2
):
"""
Run a command on extractor tango proxy, retrying on DevFailed.
Parameters
----------
cmd : str
Command to executte on the extractor
arg : list
Attribute to pass to the command
retry : int
Number of command retry on DevFailed
Returns
-------
cmdreturn :
Whatever the command returns.
None if failed after the amount of retries.
"""
for
i
in
range
(
retry
):
# Make retrieval request
self
.
logger
.
debug
(
"
Execute %s (%s)
"
%
(
cmd
,
arg
))
try
:
cmdreturn
=
getattr
(
self
.
extractor
,
cmd
)(
arg
)
except
tango
.
DevFailed
as
e
:
self
.
logger
.
warning
(
"
The extractor device returned the following error:
"
)
self
.
logger
.
warning
(
e
)
if
i
==
retry
-
1
:
logger
.
error
(
"
Could not execute command %s (%s). Check the device extractor
"
%
(
cmd
,
arg
))
return
None
self
.
logger
.
warning
(
"
Retrying...
"
)
continue
break
return
cmdreturn
def
chunkerize
(
self
,
attribute
,
dateStart
,
dateStop
,
Nmax
=
100000
):
"""
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e.
"
test/dg/panda/current
"
.
dateStart : datetime.datetime
Start date for extraction.
dateStop : datetime.datetime
Stop date for extraction.
Returns
-------
cdates : list
List of datetime giving the limit of each chunks.
For N chunks, there is N+1 elements in cdates, as the start and end boundaries are included.
"""
info
=
self
.
infoattr
(
attribute
)
self
.
logger
.
debug
(
"
Attribute information
\n
%s
"
%
info
)
# Get the number of points
N
=
self
.
extractor
.
GetAttDataBetweenDatesCount
([
attribute
,
dateStart
.
strftime
(
_DBDFMT2
),
dateStop
.
strftime
(
_DBDFMT2
)
])
self
.
logger
.
debug
(
"
On the period, there is %d entries
"
%
N
)
dx
=
int
(
info
[
"
max_dim_x
"
])
if
dx
>
1
:
self
.
logger
.
debug
(
"
Attribute is a vector with max dimension = %s
"
%
dx
)
N
=
N
*
dx
# If data chunk is too much, we need to cut it
if
N
>
Nmax
:
dt
=
(
dateStop
-
dateStart
)
/
(
N
//
Nmax
)
cdates
=
[
dateStart
]
while
cdates
[
-
1
]
<
dateStop
:
cdates
.
append
(
cdates
[
-
1
]
+
dt
)
cdates
[
-
1
]
=
dateStop
self
.
logger
.
debug
(
"
Cutting access to %d little chunks of time, %s each.
"
%
(
len
(
cdates
)
-
1
,
dt
))
else
:
cdates
=
[
dateStart
,
dateStop
]
return
cdates
def
findattr
(
self
,
pattern
):
"""
Search for an attribute path using the pattern given.
Case insensitive.
Parameters:
-----------
pattern: str
Pattern to search, wildchar * accepted.
example
"
dg*dcct*current
"
Returns:
--------
results: (str,)
List of string match
"""
keywords
=
pattern
.
lower
().
split
(
'
*
'
)
matches
=
[
attr
for
attr
in
self
.
attr_table
if
all
(
k
in
attr
.
lower
()
for
k
in
keywords
)]
return
matches
def
infoattr
(
self
,
attribute
):
"""
Get informations for an attribute and pack it into a python dict.
Parameters
----------
attribute : String
Name of the attribute. Full Tango name i.e.
"
test/dg/panda/current
"
.
Returns
-------
info : dict
Dictionnary of propertyname:propertyvalue
"""
info
=
dict
()
for
func
in
(
"
GetAttDefinitionData
"
,
"
GetAttPropertiesData
"
):
R
=
getattr
(
self
.
extractor
,
func
)(
attribute
)
if
not
R
is
None
:
for
i
in
R
:
_s
=
i
.
split
(
"
::
"
)
info
[
_s
[
0
]]
=
_s
[
1
]
else
:
self
.
logger
.
warning
(
"
Function %s on extractor returned None
"
%
func
)
return
info
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment