Skip to content
Snippets Groups Projects
Utils.py 7.95 KiB
Newer Older
###################################################################################################
#    Utilities
###################################################################################################
#
# This file contains usefull function and attributes to simplify life
#
###################################################################################################

import tango
import logging
BRONES Romain's avatar
BRONES Romain committed
import FofbTool.DeviceAttributeConfiguration
BRONES Romain's avatar
BRONES Romain committed
import FofbTool.Configuration
import FofbTool.Operation

# Get the module logger
logger = logging.getLogger("FofbTool")

def init_opcua():
    """
    Run init on all OPCUA devices. Catch DevFailed and inform via log.
    Does nothing if Fofb is running.
    if not check_fofbnotrunning():
        logger.warning("Not running configuration of combpm because FOFB seems to be running.")
        return

    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
        logger.info("Perform init() on {} '{}'".format(n,p))
        try:
            tango.DeviceProxy(p).init()
        except tango.DevFailed as e:
            logger.error("Could not perform init() '{}', got DevFailed.".format(p))
            logger.debug(str(e))

def init_watcher():
    """
    Run init on Fofb-Watcher, waiting for its completion then init on the FofbCommand.
    Does nothing if Fofb is running.
    if not check_fofbnotrunning():
        logger.warning("Not running configuration of combpm because FOFB seems to be running.")
        return

    p=FofbTool.Configuration.config["tangopath"]["fofb-watcher"]
        wprx=tango.DeviceProxy(p)
        wprx.set_timeout_millis(60000)
    except tango.DevFailed as e:
            logger.error("Could not get DeviceProxy on {}".format(p))
            logger.debug(str(e))
BRONES Romain's avatar
BRONES Romain committed
            return

    logger.info("Perform init() on Fofb-Watcher. This takes nearly a minute.")
    try:
        wprx.init()
    except tango.DevFailed as e:
        logger.error("Could not perform init() on Fofb-Watcher.")
        logger.debug(str(e))

    logger.info("Perform init() on Fofb-Command.")
    p=FofbTool.Configuration.config["tangopath"]["fofb-command"]
        tango.DeviceProxy(p).init()
    except tango.DevFailed as e:
        logger.error("Could not perform init() on '{}', got DevFailed.".format(p))
        logger.debug(str(e))

def confds_opcua():
    """
    Apply attribute configuration on all OPCUA devices. Catch DevFailed and inform via log.
    """
    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
        try:
            prx = tango.DeviceProxy(p)
        except tango.DevFailed as e:
            logger.error("Could not get proxy '{}', got DevFailed.".format(p))
            logger.debug(str(e))
            break

        try:
            if 'central' in n:
BRONES Romain's avatar
BRONES Romain committed
                FofbTool.DeviceAttributeConfiguration.set_attr_config_centralnode(prx)
            else:
BRONES Romain's avatar
BRONES Romain committed
                FofbTool.DeviceAttributeConfiguration.set_attr_config_cellnode(prx)
        except tango.DevFailed as e:
            logger.error("Could not set attribute configuration for '{}', got DevFailed.".format(p))
            logger.debug(str(e))


BRONES Romain's avatar
BRONES Romain committed
def check_fofbnotrunning():
    """
    Check if the FOFB is not running.
    If it fails to know, return False and log error.

    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    p=FofbTool.Configuration.config["tangopath"]["fofb-watcher"]
BRONES Romain's avatar
BRONES Romain committed
    try:
        prx = tango.DeviceProxy(p)
BRONES Romain's avatar
BRONES Romain committed
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
BRONES Romain's avatar
BRONES Romain committed
        logger.debug(str(e))
        return False

    try:
        return not (prx.fofbrunning_x or prx.fofbrunning_y)
    except tango.DevFailed as e:
        logger.error("Failed to read the FOFB status on device Fofb-Watcher")
        logger.debug(str(e))
        return False

def conf_all_combpm():
    """
    Run default configuration of all combpm blocks.
    Check beforehand that the FOFB is not running.

    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    if not check_fofbnotrunning():
        logger.warning("Not running configuration of combpm because FOFB seems to be running.")
        return False

    success=True
    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            s=FofbTool.Configuration.cellnode_configure_combpm(n)
            success = success and s
BRONES Romain's avatar
BRONES Romain committed

    return success


def conf_all_comcorr(enable=True):
BRONES Romain's avatar
BRONES Romain committed
    """
    Run default configuration of all comcorr blocks.
    Check beforehand that the FOFB is not running.

    PARAMETERS
    ----------
    enable: Boolean
        True to enable frame output to PSC (steerers).

BRONES Romain's avatar
BRONES Romain committed
    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    if not check_fofbnotrunning():
        logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
        return False

    success=True
    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            s=FofbTool.Configuration.cellnode_configure_comcorr(n, enable=enable)
            success = success and s
BRONES Romain's avatar
BRONES Romain committed

    return success


def conf_all_ccn():
    """
    Run default configuration of all comcellnode blocks.
    Check beforehand that the FOFB is not running.

    RETURN
    ------
    running: Boolean
        True if FOFB is not running
    """

    if not check_fofbnotrunning():
        logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
        return False

    success=True
    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            s=FofbTool.Configuration.cellnode_configure_ccn(n)
            success = success and s
BRONES Romain's avatar
BRONES Romain committed

    s=FofbTool.Configuration.centralnode_configure_ccn()
    success = success and s

    return success
BRONES Romain's avatar
BRONES Romain committed
def get_prx_from_nodename(nodename):
    """
    Return a tango.DeviceProxy from a node name.
    On failure, log error and return None.

    PARAMETERS
    ----------
    nodename: str
        The target fofbnode, ie 'cellnode-c09' or 'centralnode'

    RETURN
    ------
    prx: tango.DeviceProxy or None
    """
    # Get device proxy
    try:
        p = FofbTool.Configuration.config["tangopath.fofbnodes"][nodename.lower()]
BRONES Romain's avatar
BRONES Romain committed
    except KeyError:
        logger.error("Wrong nodename. Possibilities are {}".format(FofbTool.Configuration.config["tangopath.fofbnodes"].keys()))
BRONES Romain's avatar
BRONES Romain committed
        return None

    try:
        prx= tango.DeviceProxy(p)
    except tango.DevFailed as e:
        logger.error("Failed to get the Device proxy to '{}'".format(p))
        logger.debug(str(e))
        return None

    return prx

def stop_all_combpm():
    """
    Apply stop command on all Cellnodes.
    Check beforehand that the FOFB is not running. Display warning if.

    """

    if not check_fofbnotrunning():
        logger.warning("Not running stop combpm because FOFB seems to be running.")

    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            FofbTool.Operation.stop_combpm(n)
BRONES Romain's avatar
BRONES Romain committed


def stop_all_ccn():
    """
    Apply stop and reset commands on all fofbnodes.
    Check beforehand that the FOFB is not running. Display warning if.

    """

    if not check_fofbnotrunning():
        logger.warning("Not running stop combpm because FOFB seems to be running.")

    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
        FofbTool.Operation.stop_ccn(n)
        FofbTool.Operation.reset_ccn(n)



def start_all_combpm():
    """
    Apply start command on all Cellnodes.

    """

    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
        if 'cellnode' in n:
            FofbTool.Operation.start_combpm(n)
BRONES Romain's avatar
BRONES Romain committed


def start_all_ccn():
    """
    Apply stop and reset commands on all fofbnodes.

    """

    for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
BRONES Romain's avatar
BRONES Romain committed
        FofbTool.Operation.start_ccn(n)