Configuration.py 12.86 KiB
# Contains functions to configure blocks of the FOFB application.
import tango
import logging
import configparser
import numpy as np
import FofbTool.Utils
import os
# Get the module logger
logger = logging.getLogger("FofbTool")
# Configuration
config = configparser.ConfigParser()
# Try to load default
defconfpath = os.path.abspath(os.path.join(os.path.dirname(__spec__.origin), "default.cfg"))
if len(config.read(defconfpath)) < 1:
logger.warning("Default configuration file not found: {}".format(defconfpath))
def logdebugconf():
logger.debug("Display config:")
for s in config.sections():
for o in config.options(s):
logger.debug("{:20} {:10} {}".format(s,o, config[s][o]))
def loadconfig(filepath):
Load a configuration file for the module.
It initialize the internal configuration.
filepath: str
Path to the config file
global config
logger.info("Load configuration from file {}".format(filepath))
if len(config.read(filepath)) < 1:
logger.warning("Configuration file not found: {}".format(filepath))
def getconf(option, section, nodename="", t="", l=False):
Helper method to retrieve an option in the loaded configuration.
It tries to find a section named section.nodename and fall back to section only if fails.
option: str
Name of the option to retrieve
section: str
Section in wich to search
nodename: str
Specify a particular nodename. If not found in config, fall back to section default.
t: str ('i', 'f', 'b', '')
Give the type to cast, integer, float, boolean or string.
l: boolean
Cast from a list
roption: str, int, float or boolean
Read option
if l:
if t=='b':
raise NotImplemented("List of boolean not implemented")
val= np.asarray(config.get("{}.{}".format(section,nodename), option).split(), dtype=t)
except (configparser.NoSectionError, configparser.NoOptionError):
val= np.asarray(config.get(section, option).split(), dtype=t)
func = getattr(config, {'i':"getint", 'f':"getfloat", 'b':"getboolean", '':"get"}[t])
except KeyError as e:
raise ValueError("Allowed t argument are 'i', 'f', 'b' or ''")
val= func("{}.{}".format(section,nodename), option)
except (configparser.NoSectionError, configparser.NoOptionError):
val= func(section, option)
return val
# Filter list depends on the cellnode. These are the default values.
bpmfilterlist = {
def cellnode_configure_combpm(cellnodename, bpmallowed=None):
Configure the combpm block of a CellNode.
The BPM data allowed through are either taken in the argument, or default to configuration.
cellnodename: str
The target cellnode, ie 'cellnode-c09'
bpmallowed: [int], None
List of BPMID allowed through the block. If None, default to nominal values.
success: boolean
True if configuration is a success
# Get device proxy
p = FofbTool.Utils.tangopath_cellnodes[cellnodename.lower()]
except KeyError:
logger.error("Wrong cellnodename. Possibilities are {}".format(FofbTool.Utils.tangopath_cellnodes.keys()))
return False
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
return False
# Select BPM id allowed to pass through
if bpmallowed is None:
logger.debug("Default to nominal values for BPM filter")
bpmallowed = getconf("bpmfilter", "combpm", cellnodename.lower(), 'i', True)
logger.debug("Activate BPMs {} in the filter for {}".format(bpmallowed, p))
f = prx["combpm_filter_table"].value
f[:] = 0
f[np.array(bpmallowed)] = 0x80
prx["combpm_filter_table"] = f
logger.info("Configuration of ComBpm done on {}.".format(p))
return True
def cellnode_configure_ccn(cellnodename, nbpm=None, npsc=None):
Configure the ComCellNode block on a cellnode.
Automatically set the number of bpm/psc packets and MAC length.
cellnodename: str
The target cellnode, ie 'cellnode-c09'
Number of BPM allowed by the filter, hence the number of expected BPM packets.
If None, auto detect the number from the combpm_filter_table attribute.
Number of total PSC, hence the number of expected PSC packets.
Default to 100.
success: boolean
True if configuration is a success
# Get device proxy
p = FofbTool.Utils.tangopath_cellnodes[cellnodename.lower()]
except KeyError:
logger.error("Wrong cellnodename. Possibilities are {}".format(FofbTool.Utils.tangopath_cellnodes.keys()))
return False
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
return False
if nbpm is None:
logger.debug("Default to nominal values for number of BPM packets")
# Get number of BPM activated in the filter
nbpm = getconf("nbpm", "ccn", cellnodename, 'i')
logger.debug("{} bpm allowed in the ethernet frame on {}".format(nbpm, p))
if npsc is None:
logger.debug("Default to nominal values for number of PSC packets")
npsc = getconf("npsc", "ccn", cellnodename, 'i')
logger.debug("{} psc expected in the ethernet frame on {}".format(nbpm, p))
maclen = 10*nbpm+10
logger.debug("Configure packeter framesize (mac length) to {}".format(maclen))
prx["ccnpack0_framesize"] = maclen
logger.debug("Configure packeter npackets to {}".format(nbpm-1))
prx["ccnpack0_npackets"] = nbpm-1
maclen = npsc*6+10
logger.debug("Configure unpacketer framesize (mac length) to {}".format(maclen))
prx["ccnunpack0_framesize"] = maclen
logger.info("Configuration of CCN done on {}.".format(p))
return True
def centralnode_configure_ccn(nbpm=None, npsc=None):
Configure the ComCellNode block on the centralnode.
Automatically set the number of bpm/psc packets and MAC length.
nbpm: list(int)
Number of BPM packet received on each interface.
Number of total PSC, hence the number of expected PSC packets.
success: boolean
True if configuration is a success
p = FofbTool.Utils.tangopath_nodes["centralnode"]
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
return False
if nbpm is None:
logger.debug("Use default value for nbpm")
nbpm = [getconf("nbpm", "ccn", n, 'i') for n in FofbTool.Utils.tangopath_cellnodes.keys()]
logger.debug("{} bpm expected in the ethernet frame on {}".format(nbpm, p))
if npsc is None:
logger.debug("Default to nominal values for number of BPM packets")
# Get number of BPM activated in the filter
npsc = getconf("npsc", "ccn", 'centralnode', 'i')
logger.debug("{} psc allowed in the ethernet frame on {}".format(nbpm, p))
for n in range(4):
maclen = npsc*6+10
logger.debug("Configure packeter {} framesize (mac length) to {}".format(n, maclen))
prx["ccnpack{}_framesize".format(n)] = maclen
logger.debug("Configure packeter {} npackets to {}".format(n, npsc-1))
prx["ccnpack{}_npackets".format(n)] = npsc-1
maclen = 10*nbpm[n]+10
logger.debug("Configure unpacketer {} framesize (mac length) to {}".format(n, maclen))
prx["ccnunpack{}_framesize".format(n)] = maclen
logger.info("Configuration of CCN done on {}.".format(p))
return True
def cellnode_configure_comcorr(cellnodename, pscid=None, enable=True):
Configure the comcorr block of a CellNode.
The PSC ID either taken in the argument, or default to nominal values.
cellnodename: str
The target cellnode, ie 'cellnode-c09'
pscid: [int], None
List of 32 PSCID to program on the output board. If None, default to nominal values.
enable: boolean
Enable or not the hardware outputs.
success: boolean
True if configuration is a success
# Get device proxy
p = FofbTool.Utils.tangopath_cellnodes[cellnodename.lower()]
except KeyError:
logger.error("Wrong cellnodename. Possibilities are {}".format(FofbTool.Utils.tangopath_cellnodes.keys()))
return False
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'".format(p))
return False
if pscid is None:
logger.debug("Default to nominal values for PSCID")
pscid = getconf("pscid", "comcorr", cellnodename, 'i', True)
if len(pscid) != 32:
logger.error("pscid shall have length 32")
return False
logger.debug("Set PSCIDs {} in the filter for {}".format(pscid, p))
f= prx["comcorr_line_id"].value
f[:] = pscid
f[:] = f+ 0x10000*bool(enable)
logger.debug("Set comcorr_line_id {} for {}".format(f, p))
prx["comcorr_line_id"] = f
logger.info("Configuration of ComCorr done on {}.".format(p))
return True
def centralnode_configure_corr():
Configure the correction algorithm on the centralnode.
p = FofbTool.Utils.tangopath_nodes["centralnode"]
prx= tango.DeviceProxy(p)
except tango.DevFailed as e:
logger.error("Failed to get the Device proxy to '{}'. Configuration of corrector algorithm failed.".format(p))
return False
# Legacy
prx["corr_k1a_x"]=getconf( "k1a_x", "corr", t='i')
prx["corr_k1b_x"]=getconf( "k1b_x", "corr", t='i')
prx["corr_k1ic_x"]=getconf( "k1ic_x", "corr", t='i')
prx["corr_k1d_x"]=getconf( "k1d_x", "corr", t='i')
prx["corr_k1a_y"]=getconf( "k1a_y", "corr", t='i')
prx["corr_k1b_y"]=getconf( "k1b_y", "corr", t='i')
prx["corr_k1ic_y"]=getconf( "k1ic_y", "corr", t='i')
prx["corr_k1d_y"]=getconf( "k1d_y", "corr", t='i')
# Unitary
prx["corr_k2a_x"]=getconf( "k2a_x", "corr", t='i')
prx["corr_k2b_x"]=getconf( "k2b_x", "corr", t='i')
prx["corr_k2ic_x"]=getconf( "k2ic_x", "corr", t='i')
prx["corr_k2d_x"]=getconf( "k2d_x", "corr", t='i')
prx["corr_k2a_y"]=getconf( "k2a_y", "corr", t='i')
prx["corr_k2b_y"]=getconf( "k2b_y", "corr", t='i')
prx["corr_k2ic_y"]=getconf( "k2ic_y", "corr", t='i')
prx["corr_k2d_y"]=getconf( "k2d_y", "corr", t='i')
f= prx["corr_pscid"].value
f[0:100] = getconf('pscid', 'corr', t='i', l=True)
prx["corr_pscid"] = f
prx["corr_num_bpm"] = getconf('numbpm', 'corr', t='i')
logger.info("Configuration of Corr done on {}.".format(p))
return True