################################################################################################### # Utilities ################################################################################################### # # This file contains usefull function and attributes to simplify life # ################################################################################################### import tango import logging import FofbTool.DeviceAttributeConfiguration import FofbTool.Configuration import FofbTool.Operation # Get the module logger logger = logging.getLogger("FofbTool") def init_opcua(force=False): """ Run init on all OPCUA devices. Catch DevFailed and inform via log. Does nothing if Fofb is running. """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of combpm because FOFB seems to be running.") return for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): logger.info("Perform init() on {} '{}'".format(n,p)) try: tango.DeviceProxy(p).init() except tango.DevFailed as e: logger.error("Could not perform init() '{}', got DevFailed.".format(p)) logger.debug(str(e)) def init_watcher(force=False): """ Run init on Fofb-Watcher, waiting for its completion then init on the FofbCommand. Does nothing if Fofb is running. """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of combpm because FOFB seems to be running.") return p=FofbTool.Configuration.config["tangopath"]["fofb-watcher"] try: wprx=tango.DeviceProxy(p) wprx.set_timeout_millis(60000) except tango.DevFailed as e: logger.error("Could not get DeviceProxy on {}".format(p)) logger.debug(str(e)) return logger.info("Perform init() on Fofb-Watcher. This takes nearly a minute.") try: wprx.init() except tango.DevFailed as e: logger.error("Could not perform init() on Fofb-Watcher.") logger.debug(str(e)) logger.info("Perform init() on Fofb-Command.") p=FofbTool.Configuration.config["tangopath"]["fofb-command"] try: tango.DeviceProxy(p).init() except tango.DevFailed as e: logger.error("Could not perform init() on '{}', got DevFailed.".format(p)) logger.debug(str(e)) def confds_opcua(): """ Apply attribute configuration on all OPCUA devices. Catch DevFailed and inform via log. """ for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): try: prx = tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Could not get proxy '{}', got DevFailed.".format(p)) logger.debug(str(e)) break try: if 'central' in n: FofbTool.DeviceAttributeConfiguration.set_attr_config_centralnode(prx) else: FofbTool.DeviceAttributeConfiguration.set_attr_config_cellnode(prx) except tango.DevFailed as e: logger.error("Could not set attribute configuration for '{}', got DevFailed.".format(p)) logger.debug(str(e)) def check_fofbnotrunning(force=False): """ Check if the FOFB is not running. If it fails to know, return False and log error. RETURN ------ running: Boolean True if FOFB is not running """ if force: logger.warning("Forced action even if FOFB might be running") return True p=FofbTool.Configuration.config["tangopath"]["fofb-watcher"] try: prx = tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Failed to get the Device proxy to '{}'".format(p)) logger.debug(str(e)) return False try: return not (prx.fofbrunning_x or prx.fofbrunning_y) except tango.DevFailed as e: logger.error("Failed to read the FOFB status on device Fofb-Watcher") logger.debug(str(e)) return False def conf_all_combpm(force=False): """ Run default configuration of all combpm blocks. Check beforehand that the FOFB is not running. RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of combpm because FOFB seems to be running.") return False success=True for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: s=FofbTool.Configuration.cellnode_configure_combpm(n) success = success and s return success def conf_all_comcorr(force=False, enable=True): """ Run default configuration of all comcorr blocks. Check beforehand that the FOFB is not running. PARAMETERS ---------- enable: Boolean True to enable frame output to PSC (steerers). RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of comcorr because FOFB seems to be running.") return False success=True for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: s=FofbTool.Configuration.cellnode_configure_comcorr(n, enable=enable) success = success and s return success def conf_all_ccn(force=False): """ Run default configuration of all comcellnode blocks. Check beforehand that the FOFB is not running. RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of comcorr because FOFB seems to be running.") return False success=True for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: s=FofbTool.Configuration.cellnode_configure_ccn(n) success = success and s s=FofbTool.Configuration.centralnode_configure_ccn() success = success and s return success def get_prx_from_nodename(nodename): """ Return a tango.DeviceProxy from a node name. On failure, log error and return None. PARAMETERS ---------- nodename: str The target fofbnode, ie 'cellnode-c09' or 'centralnode' RETURN ------ prx: tango.DeviceProxy or None """ # Get device proxy try: p = FofbTool.Configuration.config["tangopath.fofbnodes"][nodename.lower()] except KeyError: logger.error("Wrong nodename. Possibilities are {}".format(list(FofbTool.Configuration.config["tangopath.fofbnodes"].keys()))) return None try: prx= tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Failed to get the Device proxy to '{}'".format(p)) logger.debug(str(e)) return None return prx def stop_all_combpm(force=False): """ Apply stop command on all Cellnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(force): logger.warning("Not running stop combpm because FOFB seems to be running.") return for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: FofbTool.Operation.stop_combpm(n) def stop_all_comlbp(force=False): """ Apply stop and reset command on all Cellnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(force): logger.warning("Not running stop combpm because FOFB seems to be running.") return for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: try: FofbTool.Operation.reset_comlbp(n) FofbTool.Operation.stop_comlbp(n) except (tango.DevFailed, TypeError): logger.error("Could not stop cpmlbp on {}".format(n)) def stop_all_ccn(force=False): """ Apply stop and reset commands on all fofbnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(force): logger.warning("Not running stop combpm because FOFB seems to be running.") return for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): FofbTool.Operation.stop_ccn(n) FofbTool.Operation.reset_ccn(n) def start_all_combpm(): """ Apply start command on all Cellnodes. """ for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: FofbTool.Operation.start_combpm(n) def start_all_comlbp(): """ Apply start comlbp command on all Cellnodes. """ for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: try: FofbTool.Operation.start_comlbp(n) except (tango.DevFailed, TypeError): logger.error("Could not start LBP on {}".format(n)) def start_all_ccn(): """ Apply start commands on all fofbnodes. """ for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): FofbTool.Operation.start_ccn(n) def align_all_ccn(cellnodename, force=False): """ Launch alignement FA sequence for Electron and Brillance Plus. """ if not check_fofbnotrunning(force): logger.warning("Not running align ccn because FOFB seems to be running.") return logger.debug("Use {} for sequence alignement".format(cellnodename)) seqoffset = FofbTool.Operation.align_ccn(cellnodename) if not seqoffset: logger.error("Could not align all ccn") return for n,p in FofbTool.Configuration.config["tangopath.fofbnodes"].items(): if 'cellnode' in n: try: FofbTool.Operation.set_comlbp_seqoffset(n, seqoffset) except (tango.DevFailed, TypeError): logger.error("Could not set comlbp offset on {}".format(n))