################################################################################################### # Utilities ################################################################################################### # # This file contains usefull function and attributes to simplify life # ################################################################################################### import tango import logging import FofbTool.DeviceAttributeConfiguration import FofbTool.Configuration import FofbTool.Operation # Get the module logger logger = logging.getLogger("FofbTool") tangopath_cellnodes = { "cellnode-c01":"ans/dg/fofb-cellnode-c01", "cellnode-c06":"ans/dg/fofb-cellnode-c06", "cellnode-c09":"ans/dg/fofb-cellnode-c09", "cellnode-c14":"ans/dg/fofb-cellnode-c14", } tangopath_nodes = { "centralnode":"ans/dg/fofb-centralnode", } tangopath_nodes.update(tangopath_cellnodes) tangopath = { "fofb-watcher":"ans/dg/fofb-watcher", "fofb-command":"ans/dg/fofb-command", "fofb-manager":"ans/dg/fofb-manager", "bpm-manager":"ans/dg/bpm-manager", } tangopath.update(tangopath_nodes) def init_opcua(): """ Run init on all OPCUA devices. Catch DevFailed and inform via log. Does nothing if Fofb is running. """ if not check_fofbnotrunning(): logger.warning("Not running configuration of combpm because FOFB seems to be running.") return for i,(n,p) in enumerate(tangopath_nodes.items()): logger.info("Perform init() on {} '{}'".format(n,p)) try: tango.DeviceProxy(p).init() except tango.DevFailed as e: logger.error("Could not perform init() '{}', got DevFailed.".format(p)) logger.debug(str(e)) def init_watcher(): """ Run init on Fofb-Watcher, waiting for its completion then init on the FofbCommand. Does nothing if Fofb is running. """ if not check_fofbnotrunning(): logger.warning("Not running configuration of combpm because FOFB seems to be running.") return try: wprx=tango.DeviceProxy(tangopath["fofb-watcher"]) wprx.set_timeout_millis(30000) except tango.DevFailed as e: logger.error("Could not get DeviceProxy on {}".format(tangopath["fofb-watcher"])) logger.debug(str(e)) return logger.info("Perform init() on Fofb-Watcher. This takes nearly a minute.") try: wprx.init() except tango.DevFailed as e: logger.error("Could not perform init() on Fofb-Watcher.") logger.debug(str(e)) logger.info("Perform init() on Fofb-Command.") try: tango.DeviceProxy(tangopath["fofb-command"]).init() except tango.DevFailed as e: logger.error("Could not perform init() on '{}', got DevFailed.".format(tangopath['fofb-command'])) logger.debug(str(e)) def confds_opcua(): """ Apply attribute configuration on all OPCUA devices. Catch DevFailed and inform via log. """ for i,(n,p) in enumerate(tangopath_nodes.items()): try: prx = tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Could not get proxy '{}', got DevFailed.".format(p)) logger.debug(str(e)) break try: if 'central' in n: FofbTool.DeviceAttributeConfiguration.set_attr_config_centralnode(prx) else: FofbTool.DeviceAttributeConfiguration.set_attr_config_cellnode(prx) except tango.DevFailed as e: logger.error("Could not set attribute configuration for '{}', got DevFailed.".format(p)) logger.debug(str(e)) def check_fofbnotrunning(): """ Check if the FOFB is not running. If it fails to know, return False and log error. RETURN ------ running: Boolean True if FOFB is not running """ try: prx = tango.DeviceProxy(tangopath["fofb-watcher"]) except tango.DevFailed as e: logger.error("Failed to get the Device proxy to '{}'".format(tangopath["fofb-watcher"])) logger.debug(str(e)) return False try: return not (prx.fofbrunning_x or prx.fofbrunning_y) except tango.DevFailed as e: logger.error("Failed to read the FOFB status on device Fofb-Watcher") logger.debug(str(e)) return False def conf_all_combpm(): """ Run default configuration of all combpm blocks. Check beforehand that the FOFB is not running. RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(): logger.warning("Not running configuration of combpm because FOFB seems to be running.") return False success=True for i,(n,p) in enumerate(tangopath_cellnodes.items()): s=FofbTool.Configuration.cellnode_configure_combpm(n) success = success and s return success def conf_all_comcorr(enable=True): """ Run default configuration of all comcorr blocks. Check beforehand that the FOFB is not running. PARAMETERS ---------- enable: Boolean True to enable frame output to PSC (steerers). RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(): logger.warning("Not running configuration of comcorr because FOFB seems to be running.") return False success=True for i,(n,p) in enumerate(tangopath_cellnodes.items()): s=FofbTool.Configuration.cellnode_configure_comcorr(n, enable=enable) success = success and s return success def conf_all_ccn(): """ Run default configuration of all comcellnode blocks. Check beforehand that the FOFB is not running. RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(): logger.warning("Not running configuration of comcorr because FOFB seems to be running.") return False success=True for i,(n,p) in enumerate(tangopath_cellnodes.items()): s=FofbTool.Configuration.cellnode_configure_ccn(n) success = success and s s=FofbTool.Configuration.centralnode_configure_ccn() success = success and s return success def get_prx_from_nodename(nodename): """ Return a tango.DeviceProxy from a node name. On failure, log error and return None. PARAMETERS ---------- nodename: str The target fofbnode, ie 'cellnode-c09' or 'centralnode' RETURN ------ prx: tango.DeviceProxy or None """ # Get device proxy try: p = FofbTool.Utils.tangopath_nodes[nodename.lower()] except KeyError: logger.error("Wrong nodename. Possibilities are {}".format(FofbTool.Utils.tangopath_nodes.keys())) return None try: prx= tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Failed to get the Device proxy to '{}'".format(p)) logger.debug(str(e)) return None return prx def get_prx_from_cellnodename(cellnodename): """ Return a tango.DeviceProxy from a cellnode name. On failure, log error and return None. PARAMETERS ---------- nodename: str The target fofbnode, ie 'cellnode-c09' RETURN ------ prx: tango.DeviceProxy or None """ # Get device proxy try: p = FofbTool.Utils.tangopath_cellnodes[cellnodename.lower()] except KeyError: logger.error("Wrong nodename. Possibilities are {}".format(FofbTool.Utils.tangopath_cellnodes.keys())) return None try: prx= tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Failed to get the Device proxy to '{}'".format(p)) logger.debug(str(e)) return None return prx def stop_all_combpm(): """ Apply stop command on all Cellnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(): logger.warning("Not running stop combpm because FOFB seems to be running.") for i,(n,p) in enumerate(tangopath_cellnodes.items()): FofbTool.Operation.stop_combpm(n) def stop_all_ccn(): """ Apply stop and reset commands on all fofbnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(): logger.warning("Not running stop combpm because FOFB seems to be running.") for i,(n,p) in enumerate(tangopath_nodes.items()): FofbTool.Operation.stop_ccn(n) FofbTool.Operation.reset_ccn(n) def start_all_combpm(): """ Apply start command on all Cellnodes. """ for i,(n,p) in enumerate(tangopath_cellnodes.items()): FofbTool.Operation.start_combpm(n) def start_all_ccn(): """ Apply stop and reset commands on all fofbnodes. """ for i,(n,p) in enumerate(tangopath_nodes.items()): FofbTool.Operation.start_ccn(n)