Newer
Older
###################################################################################################
# CONFIGURATION FUNCTIONS
###################################################################################################
#
# Contains functions to configure blocks of the FOFB application.
#
###################################################################################################
# Get the module logger
logger = logging.getLogger("FofbTool")
###################################################################################################
# CONFIGURATION PARSER
###################################################################################################
# Configuration
config = configparser.ConfigParser()
# Try to load default
defconfpath = os.path.abspath(os.path.join(os.path.dirname(__spec__.origin), "default.cfg"))
if len(config.read(defconfpath)) < 1:
logger.warning("Default configuration file not found: {}".format(defconfpath))
def logdebugconf():
logger.debug("Display config:")
for s in config.sections():
for o in config.options(s):
logger.debug("{:20} {:10} {}".format(s,o, config[s][o]))
def loadconfig(filepath):
"""
Load a configuration file for the module.
It supersed the default configuration, if found.
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
PARAMETERS
----------
filepath: str
Path to the config file
"""
global config
logger.info("Load configuration from file {}".format(filepath))
if len(config.read(filepath)) < 1:
logger.warning("Configuration file not found: {}".format(filepath))
def getconf(option, section, nodename="", t="", l=False):
"""
Helper method to retrieve an option in the loaded configuration.
It tries to find a section named section.nodename and fall back to section only if fails.
PARAMETERS
----------
option: str
Name of the option to retrieve
section: str
Section in wich to search
nodename: str
Specify a particular nodename. If not found in config, fall back to section default.
t: str ('i', 'f', 'b', '')
Give the type to cast, integer, float, boolean or string.
l: boolean
Cast from a list
RETURNS
-------
roption: str, int, float or boolean
Read option
"""
if l:
if t=='b':
raise NotImplemented("List of boolean not implemented")
try:
val= np.asarray(config.get("{}.{}".format(section,nodename), option).split(), dtype=t)
except (configparser.NoSectionError, configparser.NoOptionError):
val= np.asarray(config.get(section, option).split(), dtype=t)
else:
try:
func = getattr(config, {'i':"getint", 'f':"getfloat", 'b':"getboolean", '':"get"}[t])
except KeyError as e:
log.debug(str(e))
raise ValueError("Allowed t argument are 'i', 'f', 'b' or ''")
try:
val= func("{}.{}".format(section,nodename), option)
except (configparser.NoSectionError, configparser.NoOptionError):
val= func(section, option)
return val
###################################################################################################
###################################################################################################
# Filter list depends on the cellnode. These are the default values.
bpmfilterlist = {
"cellnode-c01":np.array(list(range(1,23))+list(range(115,123))),
"cellnode-c06":np.array(range(23,53)),
"cellnode-c09":np.array(range(53,83)),
"cellnode-c14":np.array(range(83,115)),
}
def cellnode_configure_combpm(cellnodename, bpmallowed=None):
"""
Configure the combpm block of a CellNode.
The BPM data allowed through are either taken in the argument, or default to configuration.
PARAMETERS
----------
cellnodename: str
The target cellnode, ie 'cellnode-c09'
bpmallowed: [int], None
List of BPMID allowed through the block. If None, default to nominal values.
RETURN
------
success: boolean
True if configuration is a success
"""
# Get device proxy
try:
p = config["tangopath.fofbnodes"][cellnodename.lower()]
logger.error("Wrong cellnodename. Possibilities are {}".format(list(config['tangopath.fofbnodes'].keys())))
return False
try:
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
logger.debug(str(e))
return False
# Select BPM id allowed to pass through
if bpmallowed is None:
logger.debug("Default to nominal values for BPM filter")
bpmallowed = getconf("bpmfilter", "combpm", cellnodename.lower(), 'i', True)
logger.debug("Activate BPMs {} in the filter for {}".format(bpmallowed, p))
f = prx["combpm_filter_table"].value
f[:] = 0
f[np.array(bpmallowed)] = 0x80
prx["combpm_filter_table"] = f
logger.info("Configuration of ComBpm done on {}.".format(p))
return True
###################################################################################################
###################################################################################################
def cellnode_configure_ccn(cellnodename, nbpm=None, npsc=None):
Configure the ComCellNode block on a cellnode.
Automatically set the number of bpm/psc packets and MAC length.
cellnodename: str
The target cellnode, ie 'cellnode-c09'
nbpm:
Number of BPM allowed by the filter, hence the number of expected BPM packets.
If None, auto detect the number from the combpm_filter_table attribute.
npsc:
Number of total PSC, hence the number of expected PSC packets.
RETURN
------
success: boolean
True if configuration is a success
p = config["tangopath.fofbnodes"][cellnodename.lower()]
logger.error("Wrong cellnodename. Possibilities are {}".format(list(config['tangopath.fofbnodes'].keys())))
return False
try:
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
logger.debug(str(e))
return False
logger.debug("Default to nominal values for number of BPM packets")
nbpm = getconf("nbpm", "ccn", cellnodename, 'i')
logger.debug("{} bpm allowed in the ethernet frame on {}".format(nbpm, p))
if npsc is None:
logger.debug("Default to nominal values for number of PSC packets")
npsc = getconf("npsc", "ccn", cellnodename, 'i')
logger.debug("{} psc expected in the ethernet frame on {}".format(nbpm, p))
maclen = 10*nbpm+10
logger.debug("Configure packeter framesize (mac length) to {}".format(maclen))
logger.debug("Configure packeter npackets to {}".format(nbpm-1))
maclen = npsc*6+10
logger.debug("Configure unpacketer framesize (mac length) to {}".format(maclen))
logger.info("Configuration of CCN done on {}.".format(p))
return True
def centralnode_configure_ccn(nbpm=None, npsc=None):
Configure the ComCellNode block on the centralnode.
Automatically set the number of bpm/psc packets and MAC length.
PARAMETERS
----------
nbpm: list(int)
Number of BPM packet received on each interface.
npsc:
Number of total PSC, hence the number of expected PSC packets.
RETURN
------
success: boolean
True if configuration is a success
p = config["tangopath.fofbnodes"]["centralnode"]
try:
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
logger.debug(str(e))
return False
if nbpm is None:
logger.debug("Use default value for nbpm")
nbpm = [getconf("nbpm", "ccn", n, 'i') for n in config['tangopath.fofbnodes'].keys() if 'cellnode' in n]
logger.debug("{} bpm expected in the ethernet frame on {}".format(nbpm, p))
if npsc is None:
logger.debug("Default to nominal values for number of BPM packets")
# Get number of BPM activated in the filter
npsc = getconf("npsc", "ccn", 'centralnode', 'i')
logger.debug("{} psc allowed in the ethernet frame on {}".format(nbpm, p))
for n in range(4):
maclen = npsc*6+10
logger.debug("Configure packeter {} framesize (mac length) to {}".format(n, maclen))
logger.debug("Configure packeter {} npackets to {}".format(n, npsc-1))
maclen = 10*nbpm[n]+10
logger.debug("Configure unpacketer {} framesize (mac length) to {}".format(n, maclen))
logger.info("Configuration of CCN done on {}.".format(p))
return True
###################################################################################################
# CONFIGURE COM CORR
###################################################################################################
def cellnode_configure_comcorr(cellnodename, pscid=None, enable=True):
"""
Configure the comcorr block of a CellNode.
The PSC ID either taken in the argument, or default to nominal values.
PARAMETERS
----------
cellnodename: str
The target cellnode, ie 'cellnode-c09'
pscid: [int], None
List of 32 PSCID to program on the output board. If None, default to nominal values.
enable: boolean
Enable or not the hardware outputs.
RETURN
------
success: boolean
True if configuration is a success
"""
p = config["tangopath.fofbnodes"][cellnodename.lower()]
logger.error("Wrong cellnodename. Possibilities are {}".format(list(config['tangopath.fofbnodes'].keys())))
return False
try:
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
logger.debug(str(e))
return False
if pscid is None:
logger.debug("Default to nominal values for PSCID")
pscid = getconf("pscid", "comcorr", cellnodename, 'i', True)
else:
if len(pscid) != 32:
logger.error("pscid shall have length 32")
return False
logger.debug("Set PSCIDs {} in the filter for {}".format(pscid, p))
f= prx["comcorr_line_id"].value
f[:] = pscid
f[:] = f+ 0x10000*bool(enable)
logger.debug("Set comcorr_line_id {} for {}".format(f, p))
prx["comcorr_line_id"] = f
logger.info("Configuration of ComCorr done on {}.".format(p))
return True
###################################################################################################
# CONFIGURE MATRIX CORRECTOR
###################################################################################################
def centralnode_configure_corr():
"""
Configure the correction algorithm on the centralnode.
"""
p = config["tangopath.fofbnodes"]["centralnode"]
try:
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'. Configuration of corrector algorithm failed.".format(p))
logger.debug(str(e))
return False
prx["corr_k1a_x"]=getconf( "k1a_x", "corr", t='i')
prx["corr_k1b_x"]=getconf( "k1b_x", "corr", t='i')
prx["corr_k1ic_x"]=getconf( "k1ic_x", "corr", t='i')
prx["corr_k1d_x"]=getconf( "k1d_x", "corr", t='i')
prx["corr_k1a_y"]=getconf( "k1a_y", "corr", t='i')
prx["corr_k1b_y"]=getconf( "k1b_y", "corr", t='i')
prx["corr_k1ic_y"]=getconf( "k1ic_y", "corr", t='i')
prx["corr_k1d_y"]=getconf( "k1d_y", "corr", t='i')
prx["corr_k2a_x"]=getconf( "k2a_x", "corr", t='i')
prx["corr_k2b_x"]=getconf( "k2b_x", "corr", t='i')
prx["corr_k2ic_x"]=getconf( "k2ic_x", "corr", t='i')
prx["corr_k2d_x"]=getconf( "k2d_x", "corr", t='i')
prx["corr_k2a_y"]=getconf( "k2a_y", "corr", t='i')
prx["corr_k2b_y"]=getconf( "k2b_y", "corr", t='i')
prx["corr_k2ic_y"]=getconf( "k2ic_y", "corr", t='i')
prx["corr_k2d_y"]=getconf( "k2d_y", "corr", t='i')
f[0:100] = getconf('pscid', 'corr', t='i', l=True)
prx["corr_num_bpm"] = getconf('numbpm', 'corr', t='i')
logger.info("Configuration of Corr done on {}.".format(p))
return True