Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Configuration.py 12.86 KiB
###################################################################################################
#    CONFIGURATION FUNCTIONS
###################################################################################################
#
# Contains functions to configure blocks of the FOFB application.
#
###################################################################################################

import tango
import logging
import configparser
import numpy as np
import FofbTool.Utils
import os


# Get the module logger
logger = logging.getLogger("FofbTool")

###################################################################################################
#    CONFIGURATION PARSER
###################################################################################################

# Configuration
config = configparser.ConfigParser()

# Try to load default
defconfpath = os.path.abspath(os.path.join(os.path.dirname(__spec__.origin), "default.cfg"))
if len(config.read(defconfpath)) < 1:
    logger.warning("Default configuration file not found: {}".format(defconfpath))

def logdebugconf():
    logger.debug("Display config:")
    for s in config.sections():
        for o in config.options(s):
            logger.debug("{:20} {:10} {}".format(s,o, config[s][o]))

def loadconfig(filepath):
    """
    Load a configuration file for the module.
    It initialize the internal configuration.

    PARAMETERS
    ----------
    filepath: str
        Path to the config file
    """
    global config
    logger.info("Load configuration from file {}".format(filepath))
    if len(config.read(filepath)) < 1:
        logger.warning("Configuration file not found: {}".format(filepath))


def getconf(option, section, nodename="", t="", l=False):
    """
    Helper method to retrieve an option in the loaded configuration.
    It tries to find a section named section.nodename and fall back to section only if fails.

    PARAMETERS
    ----------
    option: str
        Name of the option to retrieve
    section: str
        Section in wich to search
    nodename: str
        Specify a particular nodename. If not found in config, fall back to section default.
    t: str ('i', 'f', 'b', '')
        Give the type to cast, integer, float, boolean or string.
    l: boolean
        Cast from a list

    RETURNS
    -------
    roption: str, int, float or boolean
        Read option
    """
    if l:
        if t=='b':
            raise NotImplemented("List of boolean not implemented")
        try:
            val= np.asarray(config.get("{}.{}".format(section,nodename), option).split(), dtype=t)
        except (configparser.NoSectionError, configparser.NoOptionError):
            val= np.asarray(config.get(section, option).split(), dtype=t)

    else:
        try:
            func = getattr(config, {'i':"getint", 'f':"getfloat", 'b':"getboolean", '':"get"}[t])
        except KeyError as e:
            log.debug(str(e))
            raise ValueError("Allowed t argument are 'i', 'f', 'b' or ''")

        try:
            val= func("{}.{}".format(section,nodename), option)
        except (configparser.NoSectionError, configparser.NoOptionError):
            val= func(section, option)

    return val



###################################################################################################
#    CONFIGURE COM BPM
###################################################################################################

# Filter list depends on the cellnode. These are the default values.
bpmfilterlist = {
        "cellnode-c01":np.array(list(range(1,23))+list(range(115,123))),
        "cellnode-c06":np.array(range(23,53)),
        "cellnode-c09":np.array(range(53,83)),
        "cellnode-c14":np.array(range(83,115)),
        }

def cellnode_configure_combpm(cellnodename, bpmallowed=None):
    """
    Configure the combpm block of a CellNode.
    The BPM data allowed through are either taken in the argument, or default to configuration.


    PARAMETERS
    ----------
    cellnodename: str
        The target cellnode, ie 'cellnode-c09'
    bpmallowed: [int], None
        List of BPMID allowed through the block. If None, default to nominal values.

    RETURN
    ------
    success: boolean
        True if configuration is a success
    """

    # Get device proxy
    try:
        p = FofbTool.Utils.tangopath_cellnodes[cellnodename.lower()]
    except KeyError:
        logger.error("Wrong cellnodename. Possibilities are {}".format(FofbTool.Utils.tangopath_cellnodes.keys()))
        return False

    try:
        prx= tango.DeviceProxy(p)
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
        logger.debug(str(e))
        return False

    # Select BPM id allowed to pass through
    if bpmallowed is None:
        logger.debug("Default to nominal values for BPM filter")
        bpmallowed = getconf("bpmfilter", "combpm", cellnodename.lower(), 'i', True)


    logger.debug("Activate BPMs {} in the filter for {}".format(bpmallowed, p))
    f = prx["combpm_filter_table"].value
    f[:] = 0
    f[np.array(bpmallowed)] = 0x80
    prx["combpm_filter_table"] = f

    logger.info("Configuration of ComBpm done on {}.".format(p))
    return True


###################################################################################################
#    CONFIGURE CCN: COM CELLNODES
###################################################################################################


def cellnode_configure_ccn(cellnodename, nbpm=None, npsc=None):
    """
    Configure the ComCellNode block on a cellnode.
    Automatically set the number of bpm/psc packets and MAC length.

    PARAMETERS
    ----------
    cellnodename: str
        The target cellnode, ie 'cellnode-c09'
    nbpm:
        Number of BPM allowed by the filter, hence the number of expected BPM packets.
        If None, auto detect the number from the combpm_filter_table attribute.
    npsc:
        Number of total PSC, hence the number of expected PSC packets.
        Default to 100.

    RETURN
    ------
    success: boolean
        True if configuration is a success
    """

    # Get device proxy
    try:
        p = FofbTool.Utils.tangopath_cellnodes[cellnodename.lower()]
    except KeyError:
        logger.error("Wrong cellnodename. Possibilities are {}".format(FofbTool.Utils.tangopath_cellnodes.keys()))
        return False

    try:
        prx= tango.DeviceProxy(p)
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
        logger.debug(str(e))
        return False

    if nbpm is None:
        logger.debug("Default to nominal values for number of BPM packets")
        # Get number of BPM activated in the filter
        nbpm = getconf("nbpm", "ccn", cellnodename, 'i')
    logger.debug("{} bpm allowed in the ethernet frame on {}".format(nbpm, p))

    if npsc is None:
        logger.debug("Default to nominal values for number of PSC packets")
        npsc = getconf("npsc", "ccn", cellnodename, 'i')
    logger.debug("{} psc expected in the ethernet frame on {}".format(nbpm, p))

    maclen = 10*nbpm+10
    logger.debug("Configure packeter framesize (mac length) to {}".format(maclen))
    prx["ccnpack0_framesize"] = maclen

    logger.debug("Configure packeter npackets to {}".format(nbpm-1))
    prx["ccnpack0_npackets"] = nbpm-1

    maclen = npsc*6+10
    logger.debug("Configure unpacketer framesize (mac length) to {}".format(maclen))
    prx["ccnunpack0_framesize"] = maclen

    logger.info("Configuration of CCN done on {}.".format(p))
    return True

def centralnode_configure_ccn(nbpm=None, npsc=None):
    """
    Configure the ComCellNode block on the centralnode.
    Automatically set the number of bpm/psc packets and MAC length.

    PARAMETERS
    ----------
    nbpm: list(int)
        Number of BPM packet received on each interface.
    npsc:
        Number of total PSC, hence the number of expected PSC packets.

    RETURN
    ------
    success: boolean
        True if configuration is a success
    """
    p = FofbTool.Utils.tangopath_nodes["centralnode"]
    try:
        prx= tango.DeviceProxy(p)
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
        logger.debug(str(e))
        return False

    if nbpm is None:
        logger.debug("Use default value for nbpm")
        nbpm = [getconf("nbpm", "ccn", n, 'i') for n in FofbTool.Utils.tangopath_cellnodes.keys()]
    logger.debug("{} bpm expected in the ethernet frame on {}".format(nbpm, p))

    if npsc is None:
        logger.debug("Default to nominal values for number of BPM packets")
        # Get number of BPM activated in the filter
        npsc = getconf("npsc", "ccn", 'centralnode', 'i')
    logger.debug("{} psc allowed in the ethernet frame on {}".format(nbpm, p))

    for n in range(4):
        maclen = npsc*6+10
        logger.debug("Configure packeter {} framesize (mac length) to {}".format(n, maclen))
        prx["ccnpack{}_framesize".format(n)] = maclen

        logger.debug("Configure packeter {}  npackets to {}".format(n, npsc-1))
        prx["ccnpack{}_npackets".format(n)] = npsc-1

        maclen = 10*nbpm[n]+10
        logger.debug("Configure unpacketer {} framesize (mac length) to {}".format(n, maclen))
        prx["ccnunpack{}_framesize".format(n)] = maclen

    logger.info("Configuration of CCN done on {}.".format(p))
    return True

###################################################################################################
#    CONFIGURE COM CORR
###################################################################################################

def cellnode_configure_comcorr(cellnodename, pscid=None, enable=True):
    """
    Configure the comcorr block of a CellNode.
    The PSC ID either taken in the argument, or default to nominal values.


    PARAMETERS
    ----------
    cellnodename: str
        The target cellnode, ie 'cellnode-c09'
    pscid: [int], None
        List of 32 PSCID to program on the output board. If None, default to nominal values.

    enable: boolean
        Enable or not the hardware outputs.

    RETURN
    ------
    success: boolean
        True if configuration is a success
    """

    # Get device proxy
    try:
        p = FofbTool.Utils.tangopath_cellnodes[cellnodename.lower()]
    except KeyError:
        logger.error("Wrong cellnodename. Possibilities are {}".format(FofbTool.Utils.tangopath_cellnodes.keys()))
        return False

    try:
        prx= tango.DeviceProxy(p)
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
        logger.debug(str(e))
        return False

    if pscid is None:
        logger.debug("Default to nominal values for PSCID")
        pscid = getconf("pscid", "comcorr", cellnodename, 'i', True)
    else:
        if len(pscid) != 32:
            logger.error("pscid shall have length 32")
            return False


    logger.debug("Set PSCIDs {} in the filter for {}".format(pscid, p))
    f= prx["comcorr_line_id"].value
    f[:] = pscid
    f[:] = f+ 0x10000*bool(enable)
    logger.debug("Set comcorr_line_id {} for {}".format(f, p))
    prx["comcorr_line_id"] = f

    logger.info("Configuration of ComCorr done on {}.".format(p))
    return True

###################################################################################################
#    CONFIGURE MATRIX CORRECTOR
###################################################################################################

def centralnode_configure_corr():
    """
    Configure the correction algorithm on the centralnode.
    """
    p = FofbTool.Utils.tangopath_nodes["centralnode"]
    try:
        prx= tango.DeviceProxy(p)
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'. Configuration of corrector algorithm failed.".format(p))
        logger.debug(str(e))
        return False

    # Legacy
    prx["corr_k1a_x"]=getconf( "k1a_x", "corr", t='i')
    prx["corr_k1b_x"]=getconf( "k1b_x", "corr", t='i')
    prx["corr_k1ic_x"]=getconf( "k1ic_x", "corr", t='i')
    prx["corr_k1d_x"]=getconf( "k1d_x", "corr", t='i')

    prx["corr_k1a_y"]=getconf( "k1a_y", "corr", t='i')
    prx["corr_k1b_y"]=getconf( "k1b_y", "corr", t='i')
    prx["corr_k1ic_y"]=getconf( "k1ic_y", "corr", t='i')
    prx["corr_k1d_y"]=getconf( "k1d_y", "corr", t='i')

    # Unitary
    prx["corr_k2a_x"]=getconf( "k2a_x", "corr", t='i')
    prx["corr_k2b_x"]=getconf( "k2b_x", "corr", t='i')
    prx["corr_k2ic_x"]=getconf( "k2ic_x", "corr", t='i')
    prx["corr_k2d_x"]=getconf( "k2d_x", "corr", t='i')

    prx["corr_k2a_y"]=getconf( "k2a_y", "corr", t='i')
    prx["corr_k2b_y"]=getconf( "k2b_y", "corr", t='i')
    prx["corr_k2ic_y"]=getconf( "k2ic_y", "corr", t='i')
    prx["corr_k2d_y"]=getconf( "k2d_y", "corr", t='i')

    f= prx["corr_pscid"].value
    f[0:100] = getconf('pscid', 'corr', t='i', l=True)
    prx["corr_pscid"] = f

    prx["corr_num_bpm"] = getconf('numbpm', 'corr', t='i')

    logger.info("Configuration of Corr done on {}.".format(p))
    return True