Skip to content
Snippets Groups Projects
Select Git revision
  • 8b8d4d3424c90a762d6d073669293e16c23bfaa0
  • main default protected
  • GBE
  • dev
  • dev_rcm
  • dev_sync_lbp
  • 3.2
  • 3.1
  • 3.0
  • 2.5
  • 2.4
  • 2.3
  • 2.2.1
  • 2.2
  • 2.1
  • 2.0
  • 1.0
17 results

Utils.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    Utils.py 7.95 KiB
    ###################################################################################################
    #    Utilities
    ###################################################################################################
    #
    # This file contains usefull function and attributes to simplify life
    #
    ###################################################################################################
    
    import tango
    import logging
    import FofbTool.DeviceAttributeConfiguration
    import FofbTool.Configuration
    import FofbTool.Operation
    
    # Get the module logger
    logger = logging.getLogger("FofbTool")
    
    def init_opcua():
        """
        Run init on all OPCUA devices. Catch DevFailed and inform via log.
        Does nothing if Fofb is running.
        """
    
        if not check_fofbnotrunning():
            logger.warning("Not running configuration of combpm because FOFB seems to be running.")
            return
    
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            logger.info("Perform init() on {} '{}'".format(n,p))
            try:
                tango.DeviceProxy(p).init()
            except tango.DevFailed as e:
                logger.error("Could not perform init() '{}', got DevFailed.".format(p))
                logger.debug(str(e))
    
    def init_watcher():
        """
        Run init on Fofb-Watcher, waiting for its completion then init on the FofbCommand.
        Does nothing if Fofb is running.
        """
    
        if not check_fofbnotrunning():
            logger.warning("Not running configuration of combpm because FOFB seems to be running.")
            return
    
        p=FofbTool.Configuration.config["tangopath"]["fofb-watcher"]
        try:
            wprx=tango.DeviceProxy(p)
            wprx.set_timeout_millis(60000)
        except tango.DevFailed as e:
                logger.error("Could not get DeviceProxy on {}".format(p))
                logger.debug(str(e))
                return
    
        logger.info("Perform init() on Fofb-Watcher. This takes nearly a minute.")
        try:
            wprx.init()
        except tango.DevFailed as e:
            logger.error("Could not perform init() on Fofb-Watcher.")
            logger.debug(str(e))
    
        logger.info("Perform init() on Fofb-Command.")
        p=FofbTool.Configuration.config["tangopath"]["fofb-command"]
        try:
            tango.DeviceProxy(p).init()
        except tango.DevFailed as e:
            logger.error("Could not perform init() on '{}', got DevFailed.".format(p))
            logger.debug(str(e))
    
    def confds_opcua():
        """
        Apply attribute configuration on all OPCUA devices. Catch DevFailed and inform via log.
        """
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            try:
                prx = tango.DeviceProxy(p)
            except tango.DevFailed as e:
                logger.error("Could not get proxy '{}', got DevFailed.".format(p))
                logger.debug(str(e))
                break
    
            try:
                if 'central' in n:
                    FofbTool.DeviceAttributeConfiguration.set_attr_config_centralnode(prx)
                else:
                    FofbTool.DeviceAttributeConfiguration.set_attr_config_cellnode(prx)
            except tango.DevFailed as e:
                logger.error("Could not set attribute configuration for '{}', got DevFailed.".format(p))
                logger.debug(str(e))
    
    
    def check_fofbnotrunning():
        """
        Check if the FOFB is not running.
        If it fails to know, return False and log error.
    
        RETURN
        ------
        running: Boolean
            True if FOFB is not running
        """
    
        p=FofbTool.Configuration.config["tangopath"]["fofb-watcher"]
        try:
            prx = tango.DeviceProxy(p)
        except tango.DevFailed as e:
            logger.error("Failed to get the Device proxy to '{}'".format(p))
            logger.debug(str(e))
            return False
    
        try:
            return not (prx.fofbrunning_x or prx.fofbrunning_y)
        except tango.DevFailed as e:
            logger.error("Failed to read the FOFB status on device Fofb-Watcher")
            logger.debug(str(e))
            return False
    
    def conf_all_combpm():
        """
        Run default configuration of all combpm blocks.
        Check beforehand that the FOFB is not running.
    
        RETURN
        ------
        running: Boolean
            True if FOFB is not running
        """
    
        if not check_fofbnotrunning():
            logger.warning("Not running configuration of combpm because FOFB seems to be running.")
            return False
    
        success=True
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            if 'cellnode' in n:
                s=FofbTool.Configuration.cellnode_configure_combpm(n)
                success = success and s
    
        return success
    
    
    def conf_all_comcorr(enable=True):
        """
        Run default configuration of all comcorr blocks.
        Check beforehand that the FOFB is not running.
    
        PARAMETERS
        ----------
        enable: Boolean
            True to enable frame output to PSC (steerers).
    
        RETURN
        ------
        running: Boolean
            True if FOFB is not running
        """
    
        if not check_fofbnotrunning():
            logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
            return False
    
        success=True
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            if 'cellnode' in n:
                s=FofbTool.Configuration.cellnode_configure_comcorr(n, enable=enable)
                success = success and s
    
        return success
    
    
    def conf_all_ccn():
        """
        Run default configuration of all comcellnode blocks.
        Check beforehand that the FOFB is not running.
    
        RETURN
        ------
        running: Boolean
            True if FOFB is not running
        """
    
        if not check_fofbnotrunning():
            logger.warning("Not running configuration of comcorr because FOFB seems to be running.")
            return False
    
        success=True
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            if 'cellnode' in n:
                s=FofbTool.Configuration.cellnode_configure_ccn(n)
                success = success and s
    
        s=FofbTool.Configuration.centralnode_configure_ccn()
        success = success and s
    
        return success
    
    def get_prx_from_nodename(nodename):
        """
        Return a tango.DeviceProxy from a node name.
        On failure, log error and return None.
    
        PARAMETERS
        ----------
        nodename: str
            The target fofbnode, ie 'cellnode-c09' or 'centralnode'
    
        RETURN
        ------
        prx: tango.DeviceProxy or None
        """
        # Get device proxy
        try:
            p = FofbTool.Configuration.config["tangopath.fofbnodes"][nodename.lower()]
        except KeyError:
            logger.error("Wrong nodename. Possibilities are {}".format(FofbTool.Configuration.config["tangopath.fofbnodes"].keys()))
            return None
    
        try:
            prx= tango.DeviceProxy(p)
        except tango.DevFailed as e:
            logger.error("Failed to get the Device proxy to '{}'".format(p))
            logger.debug(str(e))
            return None
    
        return prx
    
    def stop_all_combpm():
        """
        Apply stop command on all Cellnodes.
        Check beforehand that the FOFB is not running. Display warning if.
    
        """
    
        if not check_fofbnotrunning():
            logger.warning("Not running stop combpm because FOFB seems to be running.")
    
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            if 'cellnode' in n:
                FofbTool.Operation.stop_combpm(n)
    
    
    def stop_all_ccn():
        """
        Apply stop and reset commands on all fofbnodes.
        Check beforehand that the FOFB is not running. Display warning if.
    
        """
    
        if not check_fofbnotrunning():
            logger.warning("Not running stop combpm because FOFB seems to be running.")
    
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            FofbTool.Operation.stop_ccn(n)
            FofbTool.Operation.reset_ccn(n)
    
    
    
    def start_all_combpm():
        """
        Apply start command on all Cellnodes.
    
        """
    
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            if 'cellnode' in n:
                FofbTool.Operation.start_combpm(n)
    
    
    def start_all_ccn():
        """
        Apply stop and reset commands on all fofbnodes.
    
        """
    
        for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items():
            FofbTool.Operation.start_ccn(n)