Newer
Older
###################################################################################################
# Utilities
###################################################################################################
#
# This file contains usefull function and attributes to simplify life
#
###################################################################################################
import tango
import logging
import configparser
import os
import numpy as np
# Get the module logger
logger = logging.getLogger("FofbTool")
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
###################################################################################################
# CONFIGURATION PARSER
###################################################################################################
# Configuration
config = configparser.ConfigParser()
# Try to load default
defconfpath = os.path.abspath(os.path.join(os.path.dirname(__spec__.origin), "default.cfg"))
if len(config.read(defconfpath)) < 1:
logger.warning("Default configuration file not found: {}".format(defconfpath))
def logdebugconf():
logger.debug("Display config:")
for s in config.sections():
for o in config.options(s):
logger.debug("{:20} {:10} {}".format(s,o, config[s][o]))
def loadconfig(filepath):
"""
Load a configuration file for the module.
It supersed the default configuration, if found.
PARAMETERS
----------
filepath: str
Path to the config file
"""
global config
logger.info("Load configuration from file {}".format(filepath))
if len(config.read(filepath)) < 1:
logger.warning("Configuration file not found: {}".format(filepath))
def getconf(option, section, nodename="", t="", l=False):
"""
Helper method to retrieve an option in the loaded configuration.
It tries to find a section named section.nodename and fall back to section only if fails.
PARAMETERS
----------
option: str
Name of the option to retrieve
section: str
Section in wich to search
nodename: str
Specify a particular nodename. If not found in config, fall back to section default.
t: str ('i', 'f', 'b', '')
Give the type to cast, integer, float, boolean or string.
l: boolean
Cast from a list
RETURNS
-------
roption: str, int, float or boolean
Read option
"""
if l:
if t=='b':
raise NotImplemented("List of boolean not implemented")
try:
val= np.asarray(config.get("{}.{}".format(section,nodename), option).split(), dtype=t)
except (configparser.NoSectionError, configparser.NoOptionError):
val= np.asarray(config.get(section, option).split(), dtype=t)
else:
try:
func = getattr(config, {'i':"getint", 'f':"getfloat", 'b':"getboolean", '':"get"}[t])
except KeyError as e:
log.debug(str(e))
raise ValueError("Allowed t argument are 'i', 'f', 'b' or ''")
try:
val= func("{}.{}".format(section,nodename), option)
except (configparser.NoSectionError, configparser.NoOptionError):
val= func(section, option)
return val
###################################################################################################
# INIT FUNCTIONS
###################################################################################################
def init_opcua(force=False):
"""
Run init on all OPCUA devices. Catch DevFailed and inform via log.
Does nothing if Fofb is running.
if not check_fofbnotrunning(force):
logger.warning("Not running because FOFB seems to be running.")
for n,p in config["tangopath.fofbnodes"].items():
logger.info("Perform init() on {} '{}'".format(n,p))
try:
tango.DeviceProxy(p).init()
except tango.DevFailed as e:
logger.error("Could not perform init() '{}', got DevFailed.".format(p))
logger.debug(str(e))
def init_watcher(force=False):
"""
Run init on Fofb-Watcher, waiting for its completion then init on the FofbCommand.
Does nothing if Fofb is running.
if not check_fofbnotrunning(force):
logger.warning("Not running because FOFB seems to be running.")
wprx=tango.DeviceProxy(p)
wprx.set_timeout_millis(60000)
logger.error("Could not get DeviceProxy on {}".format(p))
logger.info("Perform init() on Fofb-Watcher. This takes nearly a minute.")
try:
wprx.init()
except tango.DevFailed as e:
logger.error("Could not perform init() on Fofb-Watcher.")
logger.info("Perform init() on Fofb-Command.")
tango.DeviceProxy(p).init()
logger.error("Could not perform init() on '{}', got DevFailed.".format(p))
def confds_opcua():
"""
Apply attribute configuration on all OPCUA devices. Catch DevFailed and inform via log.
"""
for n,p in config["tangopath.fofbnodes"].items():
try:
prx = tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Could not get proxy '{}', got DevFailed.".format(p))
logger.debug(str(e))
break
try:
if 'central' in n:
FofbTool.DeviceAttributeConfiguration.set_attr_config_centralnode(prx)
FofbTool.DeviceAttributeConfiguration.set_attr_config_cellnode(prx)
except tango.DevFailed as e:
logger.error("Could not set attribute configuration for '{}', got DevFailed.".format(p))
logger.debug(str(e))
###################################################################################################
# OPERATE FUNCTIONS
###################################################################################################
def check_fofbnotrunning(force=False):
"""
Check if the FOFB is not running.
If it fails to know, return False and log error.
RETURN
------
running: Boolean
True if FOFB is not running
"""
if force:
logger.warning("Forced action even if FOFB might be running")
return True
prx = tango.DeviceProxy(p)
logger.error("Failed to get the Device proxy to '{}'".format(p))
logger.debug(str(e))
return False
try:
return not (prx.fofbrunning_x or prx.fofbrunning_y)
except tango.DevFailed as e:
logger.error("Failed to read the FOFB status on device Fofb-Watcher")
logger.debug(str(e))
return False
def conf_all_combpm(force=False):
"""
Run default configuration of all combpm blocks.
Check beforehand that the FOFB is not running.
RETURN
------
running: Boolean
True if FOFB is not running
"""
if not check_fofbnotrunning(force):
logger.warning("Not running configuration of combpm because FOFB seems to be running.")
return False
success=True
for n,p in config["tangopath.fofbnodes"].items():
bpmallowed = getconf("bpmfilter", "combpm", n.lower(), 'i', True)
s=FofbTool.Configuration.cellnode_configure_combpm(p, bpmallowed)
success = success and s
def conf_all_comcorr(force=False, enable=True):
"""
Run default configuration of all comcorr blocks.
Check beforehand that the FOFB is not running.
PARAMETERS
----------
enable: Boolean
True to enable frame output to PSC (steerers).
RETURN
------
running: Boolean
True if FOFB is not running
"""
if not check_fofbnotrunning(force):
logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
return False
success=True
for n,p in config["tangopath.fofbnodes"].items():
pscid = getconf("pscid", "comcorr", n, 'i', True)
s=FofbTool.Configuration.cellnode_configure_comcorr(p, pscid, enable=enable)
success = success and s
def conf_all_ccn(force=False):
"""
Run default configuration of all comcellnode blocks.
Check beforehand that the FOFB is not running.
RETURN
------
running: Boolean
True if FOFB is not running
"""
if not check_fofbnotrunning(force):
logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
return False
success=True
for n,p in config["tangopath.fofbnodes"].items():
nbpm = getconf("nbpm", "ccn", n, 'i')
npsc = getconf("npsc", "ccn", n, 'i')
s=FofbTool.Configuration.cellnode_configure_ccn(p, nbpm, npsc)
success = success and s
p = config["tangopath.fofbnodes"]["centralnode"]
nbpm = [getconf("nbpm", "ccn", n, 'i') for n in config['tangopath.fofbnodes'].keys() if 'cellnode' in n]
npsc = getconf("npsc", "ccn", 'centralnode', 'i')
s=FofbTool.Configuration.centralnode_configure_ccn(p, nbpm, npsc)
def get_prx_from_nodename(nodename):
"""
Return a tango.DeviceProxy from a node name.
On failure, log error and return None.
PARAMETERS
----------
nodename: str
The target fofbnode, ie 'cellnode-c09' or 'centralnode'
RETURN
------
prx: tango.DeviceProxy or None
"""
# Get device proxy
try:
p = config["tangopath.fofbnodes"][nodename.lower()]
logger.error("Wrong nodename. Possibilities are {}".format(list(config["tangopath.fofbnodes"].keys())))
return None
try:
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
logger.debug(str(e))
return None
return prx
def stop_all_combpm(force=False):
"""
Apply stop command on all Cellnodes.
Check beforehand that the FOFB is not running. Display warning if.
"""
if not check_fofbnotrunning(force):
logger.warning("Not running stop combpm because FOFB seems to be running.")
for n,p in config["tangopath.fofbnodes"].items():
def stop_all_comlbp(force=False):
"""
Apply stop and reset command on all Cellnodes.
Check beforehand that the FOFB is not running. Display warning if.
"""
if not check_fofbnotrunning(force):
logger.warning("Not running stop comlbp because FOFB seems to be running.")
for n,p in config["tangopath.fofbnodes"].items():
if 'cellnode' in n:
try:
FofbTool.Operation.reset_comlbp(p)
FofbTool.Operation.stop_comlbp(p)
except (tango.DevFailed, TypeError):
logger.error("Could not stop cpmlbp on {}".format(n))
def stop_all_ccn(force=False):
"""
Apply stop and reset commands on all fofbnodes.
Check beforehand that the FOFB is not running. Display warning if.
"""
if not check_fofbnotrunning(force):
logger.warning("Not running stop comccn because FOFB seems to be running.")
for n,p in config["tangopath.fofbnodes"].items():
FofbTool.Operation.stop_ccn(p)
FofbTool.Operation.reset_ccn(p)
def start_all_combpm():
"""
Apply start command on all Cellnodes.
"""
for n,p in config["tangopath.fofbnodes"].items():
def start_all_comlbp():
"""
Apply start comlbp command on all Cellnodes.
"""
for n,p in config["tangopath.fofbnodes"].items():
if 'cellnode' in n:
try:
except (tango.DevFailed, TypeError):
logger.error("Could not start LBP on {}".format(n))
Apply start commands on all fofbnodes.
for n,p in config["tangopath.fofbnodes"].items():
def align_all_ccn(cellnodename, force=False):
"""
Launch alignement FA sequence for Electron and Brillance Plus.
"""
if not check_fofbnotrunning(force):
logger.warning("Not running align ccn because FOFB seems to be running.")
return
logger.debug("Use {} for sequence alignement".format(cellnodename))
cellnodepath=FofbTool.Utils.get_prx_from_nodename(cellnodename)
seqoffset = FofbTool.Operation.align_ccn(cellnodepath, 0)
if seqoffset is None:
logger.error("Could not align all ccn")
return
for n,p in config["tangopath.fofbnodes"].items():
if 'cellnode' in n:
try:
FofbTool.Configuration.cellnode_configure_comlbp(n, seqoffset)
except (tango.DevFailed, TypeError):
logger.error("Could not set comlbp offset on {}".format(n))
def conf_centralnode_corr():
p = config["tangopath.fofbnodes"]["centralnode"]
FofbTool.Configuration.centralnode_configure_corr(p,
getconf('numbpm', 'corr', t='i'),
getconf('pscid', 'corr', t='i', l=True),
[getconf( "k1{}_x".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
[getconf( "k1{}_y".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
[getconf( "k2{}_x".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
[getconf( "k2{}_y".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
)
def sync_all_bpm():
"""
Synchronize all BPM electronics, Electron and Brillance Plus.
This will use the timing system (central and local board).
"""
db = tango.Database()
bpmlist = [n.split(':')[2] for n in db.get_property("FOFB", "bpmlist")['bpmlist'] if ":LIBERA:" in n]
bpmidlist = [(int(n.split(':')[0]), n.split(':')[2]) for n in db.get_property("FOFB", "bpmlist")['bpmlist'] if 'LIBERA' in n]
tlocal = tango.Group('tlocal')
tlocal.add([n.split(':')[2] for n in db.get_property("FOFB", 'TimingBoardList')['TimingBoardList'] if "LOCAL" in n])
# Set a group of Libera Brillance Plus EVRX board, from FofbTool configuration
lbpevrx = tango.Group('lbpevrx')
lbpevrx.add(FofbTool.Configuration.config["tangopath"]["lbpevrx"].split())
tcentral = tango.DeviceProxy(FofbTool.Configuration.config["tangopath"]["timing-central"])
FofbTool.Operation.sync_bpm(bpmlist, lbpevrx, tlocal, tcentral)