Skip to content
Snippets Groups Projects
Utils.py 14.7 KiB
Newer Older
###################################################################################################
#    Utilities
###################################################################################################
#
# This file contains usefull function and attributes to simplify life
#
###################################################################################################

import tango
import logging
BRONES Romain's avatar
BRONES Romain committed
import FofbTool.DeviceAttributeConfiguration
BRONES Romain's avatar
BRONES Romain committed
import FofbTool.Configuration
import FofbTool.Operation
BRONES Romain's avatar
BRONES Romain committed
import configparser
import os
import numpy as np

# Get the module logger
logger = logging.getLogger("FofbTool")

BRONES Romain's avatar
BRONES Romain committed
###################################################################################################
#    CONFIGURATION PARSER
###################################################################################################

# Configuration
config = configparser.ConfigParser()

# Try to load default
defconfpath = os.path.abspath(os.path.join(os.path.dirname(__spec__.origin), "default.cfg"))
if len(config.read(defconfpath)) < 1:
    logger.warning("Default configuration file not found: {}".format(defconfpath))

def logdebugconf():
    logger.debug("Display config:")
    for s in config.sections():
        for o in config.options(s):
            logger.debug("{:20} {:10} {}".format(s,o, config[s][o]))

def loadconfig(filepath):
    """
    Load a configuration file for the module.
    It supersed the default configuration, if found.

    PARAMETERS
    ----------
    filepath: str
        Path to the config file
    """
    global config
    logger.info("Load configuration from file {}".format(filepath))
    if len(config.read(filepath)) < 1:
        logger.warning("Configuration file not found: {}".format(filepath))


def getconf(option, section, nodename="", t="", l=False):
    """
    Helper method to retrieve an option in the loaded configuration.
    It tries to find a section named section.nodename and fall back to section only if fails.

    PARAMETERS
    ----------
    option: str
        Name of the option to retrieve
    section: str
        Section in wich to search
    nodename: str
        Specify a particular nodename. If not found in config, fall back to section default.
    t: str ('i', 'f', 'b', '')
        Give the type to cast, integer, float, boolean or string.
    l: boolean
        Cast from a list

    RETURNS
    -------
    roption: str, int, float or boolean
        Read option
    """
    if l:
        if t=='b':
            raise NotImplemented("List of boolean not implemented")
        try:
            val= np.asarray(config.get("{}.{}".format(section,nodename), option).split(), dtype=t)
        except (configparser.NoSectionError, configparser.NoOptionError):
            val= np.asarray(config.get(section, option).split(), dtype=t)

    else:
        try:
            func = getattr(config, {'i':"getint", 'f':"getfloat", 'b':"getboolean", '':"get"}[t])
        except KeyError as e:
            log.debug(str(e))
            raise ValueError("Allowed t argument are 'i', 'f', 'b' or ''")

        try:
            val= func("{}.{}".format(section,nodename), option)
        except (configparser.NoSectionError, configparser.NoOptionError):
            val= func(section, option)

    return val


###################################################################################################
#    INIT FUNCTIONS
###################################################################################################


def init_opcua(force=False):
    """
    Run init on all OPCUA devices. Catch DevFailed and inform via log.
    Does nothing if Fofb is running.
    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running because FOFB seems to be running.")
BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
        logger.info("Perform init() on {} '{}'".format(n,p))
        try:
            tango.DeviceProxy(p).init()
        except tango.DevFailed as e:
            logger.error("Could not perform init() '{}', got DevFailed.".format(p))
            logger.debug(str(e))

def init_watcher(force=False):
    """
    Run init on Fofb-Watcher, waiting for its completion then init on the FofbCommand.
    Does nothing if Fofb is running.
    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running because FOFB seems to be running.")
BRONES Romain's avatar
BRONES Romain committed
    p=config["tangopath"]["fofb-watcher"]
        wprx=tango.DeviceProxy(p)
        wprx.set_timeout_millis(60000)
    except tango.DevFailed as e:
            logger.error("Could not get DeviceProxy on {}".format(p))
            logger.debug(str(e))
BRONES Romain's avatar
BRONES Romain committed
            return

    logger.info("Perform init() on Fofb-Watcher. This takes nearly a minute.")
    try:
        wprx.init()
    except tango.DevFailed as e:
        logger.error("Could not perform init() on Fofb-Watcher.")
        logger.debug(str(e))

    logger.info("Perform init() on Fofb-Command.")
BRONES Romain's avatar
BRONES Romain committed
    p=config["tangopath"]["fofb-command"]
        tango.DeviceProxy(p).init()
    except tango.DevFailed as e:
        logger.error("Could not perform init() on '{}', got DevFailed.".format(p))
        logger.debug(str(e))

def confds_opcua():
    """
    Apply attribute configuration on all OPCUA devices. Catch DevFailed and inform via log.
    """
BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
        try:
            prx = tango.DeviceProxy(p)
        except tango.DevFailed as e:
            logger.error("Could not get proxy '{}', got DevFailed.".format(p))
            logger.debug(str(e))
            break

        try:
            if 'central' in n:
BRONES Romain's avatar
BRONES Romain committed
                FofbTool.DeviceAttributeConfiguration.set_attr_config_centralnode(prx)
            else:
BRONES Romain's avatar
BRONES Romain committed
                FofbTool.DeviceAttributeConfiguration.set_attr_config_cellnode(prx)
        except tango.DevFailed as e:
            logger.error("Could not set attribute configuration for '{}', got DevFailed.".format(p))
            logger.debug(str(e))


BRONES Romain's avatar
BRONES Romain committed

###################################################################################################
#    OPERATE FUNCTIONS
###################################################################################################

def check_fofbnotrunning(force=False):
BRONES Romain's avatar
BRONES Romain committed
    """
    Check if the FOFB is not running.
    If it fails to know, return False and log error.

    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    if force:
        logger.warning("Forced action even if FOFB might be running")
        return True

BRONES Romain's avatar
BRONES Romain committed
    p=config["tangopath"]["fofb-watcher"]
BRONES Romain's avatar
BRONES Romain committed
    try:
        prx = tango.DeviceProxy(p)
BRONES Romain's avatar
BRONES Romain committed
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
BRONES Romain's avatar
BRONES Romain committed
        logger.debug(str(e))
        return False

    try:
        return not (prx.fofbrunning_x or prx.fofbrunning_y)
    except tango.DevFailed as e:
        logger.error("Failed to read the FOFB status on device Fofb-Watcher")
        logger.debug(str(e))
        return False

def conf_all_combpm(force=False):
BRONES Romain's avatar
BRONES Romain committed
    """
    Run default configuration of all combpm blocks.
    Check beforehand that the FOFB is not running.

    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running configuration of combpm because FOFB seems to be running.")
        return False

    success=True
BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
            bpmallowed = getconf("bpmfilter", "combpm", n.lower(), 'i', True)
            s=FofbTool.Configuration.cellnode_configure_combpm(p, bpmallowed)
            success = success and s
BRONES Romain's avatar
BRONES Romain committed

    return success


def conf_all_comcorr(force=False, enable=True):
BRONES Romain's avatar
BRONES Romain committed
    """
    Run default configuration of all comcorr blocks.
    Check beforehand that the FOFB is not running.

    PARAMETERS
    ----------
    enable: Boolean
        True to enable frame output to PSC (steerers).

BRONES Romain's avatar
BRONES Romain committed
    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
        return False

    success=True
BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
            pscid = getconf("pscid", "comcorr", n, 'i', True)
            s=FofbTool.Configuration.cellnode_configure_comcorr(p, pscid, enable=enable)
            success = success and s
BRONES Romain's avatar
BRONES Romain committed

    return success


def conf_all_ccn(force=False):
BRONES Romain's avatar
BRONES Romain committed
    """
    Run default configuration of all comcellnode blocks.
    Check beforehand that the FOFB is not running.

    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
        return False

    success=True
BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
            nbpm = getconf("nbpm", "ccn", n, 'i')
            npsc = getconf("npsc", "ccn", n, 'i')
            s=FofbTool.Configuration.cellnode_configure_ccn(p, nbpm, npsc)
            success = success and s
BRONES Romain's avatar
BRONES Romain committed

BRONES Romain's avatar
BRONES Romain committed
    p = config["tangopath.fofbnodes"]["centralnode"]
    nbpm = [getconf("nbpm", "ccn", n, 'i') for n in config['tangopath.fofbnodes'].keys() if 'cellnode' in n]
    npsc = getconf("npsc", "ccn", 'centralnode', 'i')
    s=FofbTool.Configuration.centralnode_configure_ccn(p, nbpm, npsc)
BRONES Romain's avatar
BRONES Romain committed
    success = success and s

    return success
BRONES Romain's avatar
BRONES Romain committed
def get_prx_from_nodename(nodename):
    """
    Return a tango.DeviceProxy from a node name.
    On failure, log error and return None.

    PARAMETERS
    ----------
    nodename: str
        The target fofbnode, ie 'cellnode-c09' or 'centralnode'

    RETURN
    ------
    prx: tango.DeviceProxy or None
    """
    # Get device proxy
    try:
BRONES Romain's avatar
BRONES Romain committed
        p = config["tangopath.fofbnodes"][nodename.lower()]
BRONES Romain's avatar
BRONES Romain committed
    except KeyError:
BRONES Romain's avatar
BRONES Romain committed
        logger.error("Wrong nodename. Possibilities are {}".format(list(config["tangopath.fofbnodes"].keys())))
BRONES Romain's avatar
BRONES Romain committed
        return None

    try:
        prx= tango.DeviceProxy(p)
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
        logger.debug(str(e))
        return None

    return prx

def stop_all_combpm(force=False):
BRONES Romain's avatar
BRONES Romain committed
    """
    Apply stop command on all Cellnodes.
    Check beforehand that the FOFB is not running. Display warning if.

    """

    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running stop combpm because FOFB seems to be running.")
BRONES Romain's avatar
BRONES Romain committed

BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
            FofbTool.Operation.stop_combpm(p)
def stop_all_comlbp(force=False):
    """
    Apply stop and reset command on all Cellnodes.
    Check beforehand that the FOFB is not running. Display warning if.

    """

    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running stop comlbp because FOFB seems to be running.")
BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            try:
BRONES Romain's avatar
BRONES Romain committed
                FofbTool.Operation.reset_comlbp(p)
                FofbTool.Operation.stop_comlbp(p)
            except (tango.DevFailed, TypeError):
                logger.error("Could not stop cpmlbp on {}".format(n))


def stop_all_ccn(force=False):
BRONES Romain's avatar
BRONES Romain committed
    """
    Apply stop and reset commands on all fofbnodes.
    Check beforehand that the FOFB is not running. Display warning if.

    """

    if not check_fofbnotrunning(force):
BRONES Romain's avatar
BRONES Romain committed
        logger.warning("Not running stop comccn because FOFB seems to be running.")
BRONES Romain's avatar
BRONES Romain committed

BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
        FofbTool.Operation.stop_ccn(p)
        FofbTool.Operation.reset_ccn(p)
BRONES Romain's avatar
BRONES Romain committed



def start_all_combpm():
    """
    Apply start command on all Cellnodes.

    """

BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
            FofbTool.Operation.start_combpm(p)
BRONES Romain's avatar
BRONES Romain committed

def start_all_comlbp():
    """
    Apply start comlbp command on all Cellnodes.

    """

BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            try:
BRONES Romain's avatar
BRONES Romain committed
                FofbTool.Operation.start_comlbp(p)
            except (tango.DevFailed, TypeError):
                logger.error("Could not start LBP on {}".format(n))


BRONES Romain's avatar
BRONES Romain committed

def start_all_ccn():
    """
    Apply start commands on all fofbnodes.
BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
        FofbTool.Operation.start_ccn(p)
BRONES Romain's avatar
BRONES Romain committed

def align_all_ccn(cellnodename, force=False):
    """
    Launch alignement FA sequence for Electron and Brillance Plus.
    """

    if not check_fofbnotrunning(force):
        logger.warning("Not running align ccn because FOFB seems to be running.")
        return

    logger.debug("Use {} for sequence alignement".format(cellnodename))
BRONES Romain's avatar
BRONES Romain committed
    cellnodepath=FofbTool.Utils.get_prx_from_nodename(cellnodename)
    seqoffset = FofbTool.Operation.align_ccn(cellnodepath, 0)
    if seqoffset is None:
        logger.error("Could not align all ccn")
        return

BRONES Romain's avatar
BRONES Romain committed

BRONES Romain's avatar
BRONES Romain committed
    for n,p in config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            try:
BRONES Romain's avatar
BRONES Romain committed
                FofbTool.Configuration.cellnode_configure_comlbp(n, seqoffset)
            except (tango.DevFailed, TypeError):
                logger.error("Could not set comlbp offset on {}".format(n))
BRONES Romain's avatar
BRONES Romain committed

def conf_centralnode_corr():

    p = config["tangopath.fofbnodes"]["centralnode"]

    FofbTool.Configuration.centralnode_configure_corr(p,
            getconf('numbpm', 'corr', t='i'),
            getconf('pscid', 'corr', t='i', l=True),
            [getconf( "k1{}_x".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
            [getconf( "k1{}_y".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
            [getconf( "k2{}_x".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
            [getconf( "k2{}_y".format(l), "corr", t='i') for l in ["a","b","ic","d"]],
            )

def sync_all_bpm():
    """
    Synchronize all BPM electronics, Electron and Brillance Plus.
    This will use the timing system (central and local board).
    """

    db = tango.Database()

    bpmlist = [n.split(':')[2] for n in db.get_property("FOFB", "bpmlist")['bpmlist'] if ":LIBERA:" in n]
    bpmidlist = [(int(n.split(':')[0]), n.split(':')[2]) for n in db.get_property("FOFB", "bpmlist")['bpmlist'] if 'LIBERA' in n]

    tlocal = tango.Group('tlocal')
    tlocal.add([n.split(':')[2] for n in db.get_property("FOFB", 'TimingBoardList')['TimingBoardList'] if "LOCAL" in n])

    # Set a group of Libera Brillance Plus EVRX board, from FofbTool configuration
    lbpevrx = tango.Group('lbpevrx')
    lbpevrx.add(FofbTool.Configuration.config["tangopath"]["lbpevrx"].split())

    tcentral = tango.DeviceProxy(FofbTool.Configuration.config["tangopath"]["timing-central"])

    FofbTool.Operation.sync_bpm(bpmlist, lbpevrx, tlocal, tcentral)