################################################################################################### # Utilities ################################################################################################### # # This file contains usefull function and attributes to simplify life # ################################################################################################### import tango import logging import FofbTool.DeviceAttributeConfiguration import FofbTool.Configuration import FofbTool.Operation import configparser import os import numpy as np # Get the module logger logger = logging.getLogger("FofbTool") ################################################################################################### # CONFIGURATION PARSER ################################################################################################### # Configuration config = configparser.ConfigParser() # Try to load default defconfpath = os.path.abspath(os.path.join(os.path.dirname(__spec__.origin), "default.cfg")) if len(config.read(defconfpath)) < 1: logger.warning("Default configuration file not found: {}".format(defconfpath)) def logdebugconf(): logger.debug("Display config:") for s in config.sections(): for o in config.options(s): logger.debug("{:20} {:10} {}".format(s,o, config[s][o])) def loadconfig(filepath): """ Load a configuration file for the module. It supersed the default configuration, if found. PARAMETERS ---------- filepath: str Path to the config file """ global config logger.info("Load configuration from file {}".format(filepath)) if len(config.read(filepath)) < 1: logger.warning("Configuration file not found: {}".format(filepath)) def getconf(option, section, nodename="", t="", l=False): """ Helper method to retrieve an option in the loaded configuration. It tries to find a section named section.nodename and fall back to section only if fails. PARAMETERS ---------- option: str Name of the option to retrieve section: str Section in wich to search nodename: str Specify a particular nodename. If not found in config, fall back to section default. t: str ('i', 'f', 'b', '') Give the type to cast, integer, float, boolean or string. l: boolean Cast from a list RETURNS ------- roption: str, int, float or boolean Read option """ if l: if t=='b': raise NotImplemented("List of boolean not implemented") try: val= np.asarray(config.get("{}.{}".format(section,nodename), option).split(), dtype=t) except (configparser.NoSectionError, configparser.NoOptionError): val= np.asarray(config.get(section, option).split(), dtype=t) else: try: func = getattr(config, {'i':"getint", 'f':"getfloat", 'b':"getboolean", '':"get"}[t]) except KeyError as e: log.debug(str(e)) raise ValueError("Allowed t argument are 'i', 'f', 'b' or ''") try: val= func("{}.{}".format(section,nodename), option) except (configparser.NoSectionError, configparser.NoOptionError): val= func(section, option) return val ################################################################################################### # INIT FUNCTIONS ################################################################################################### def init_opcua(force=False): """ Run init on all OPCUA devices. Catch DevFailed and inform via log. Does nothing if Fofb is running. """ if not check_fofbnotrunning(force): logger.warning("Not running because FOFB seems to be running.") return for n,p in config["tangopath.fofbnodes"].items(): logger.info("Perform init() on {} '{}'".format(n,p)) try: tango.DeviceProxy(p).init() except tango.DevFailed as e: logger.error("Could not perform init() '{}', got DevFailed.".format(p)) logger.debug(str(e)) def init_watcher(force=False): """ Run init on Fofb-Watcher, waiting for its completion then init on the FofbCommand. Does nothing if Fofb is running. """ if not check_fofbnotrunning(force): logger.warning("Not running because FOFB seems to be running.") return p=config["tangopath"]["fofb-watcher"] try: wprx=tango.DeviceProxy(p) wprx.set_timeout_millis(60000) except tango.DevFailed as e: logger.error("Could not get DeviceProxy on {}".format(p)) logger.debug(str(e)) return logger.info("Perform init() on Fofb-Watcher. This takes nearly a minute.") try: wprx.init() except tango.DevFailed as e: logger.error("Could not perform init() on Fofb-Watcher.") logger.debug(str(e)) logger.info("Perform init() on Fofb-Command.") p=config["tangopath"]["fofb-command"] try: tango.DeviceProxy(p).init() except tango.DevFailed as e: logger.error("Could not perform init() on '{}', got DevFailed.".format(p)) logger.debug(str(e)) def confds_opcua(): """ Apply attribute configuration on all OPCUA devices. Catch DevFailed and inform via log. """ for n,p in config["tangopath.fofbnodes"].items(): try: prx = tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Could not get proxy '{}', got DevFailed.".format(p)) logger.debug(str(e)) break try: if 'central' in n: FofbTool.DeviceAttributeConfiguration.set_attr_config_centralnode(prx) else: FofbTool.DeviceAttributeConfiguration.set_attr_config_cellnode(prx) except tango.DevFailed as e: logger.error("Could not set attribute configuration for '{}', got DevFailed.".format(p)) logger.debug(str(e)) ################################################################################################### # OPERATE FUNCTIONS ################################################################################################### def check_fofbnotrunning(force=False): """ Check if the FOFB is not running. If it fails to know, return False and log error. RETURN ------ running: Boolean True if FOFB is not running """ if force: logger.warning("Forced action even if FOFB might be running") return True p=config["tangopath"]["fofb-watcher"] try: prx = tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Failed to get the Device proxy to '{}'".format(p)) logger.debug(str(e)) return False try: return not (prx.fofbrunning_x or prx.fofbrunning_y) except tango.DevFailed as e: logger.error("Failed to read the FOFB status on device Fofb-Watcher") logger.debug(str(e)) return False def conf_all_combpm(force=False): """ Run default configuration of all combpm blocks. Check beforehand that the FOFB is not running. RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of combpm because FOFB seems to be running.") return False success=True for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: bpmallowed = getconf("bpmfilter", "combpm", n.lower(), 'i', True) s=FofbTool.Configuration.cellnode_configure_combpm(p, bpmallowed) success = success and s return success def conf_all_comcorr(force=False, enable=True): """ Run default configuration of all comcorr blocks. Check beforehand that the FOFB is not running. PARAMETERS ---------- enable: Boolean True to enable frame output to PSC (steerers). RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of comcorr because FOFB seems to be running.") return False success=True for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: pscid = getconf("pscid", "comcorr", n, 'i', True) s=FofbTool.Configuration.cellnode_configure_comcorr(p, pscid, enable=enable) success = success and s return success def conf_all_ccn(force=False): """ Run default configuration of all comcellnode blocks. Check beforehand that the FOFB is not running. RETURN ------ running: Boolean True if FOFB is not running """ if not check_fofbnotrunning(force): logger.warning("Not running configuration of comcorr because FOFB seems to be running.") return False success=True for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: nbpm = getconf("nbpm", "ccn", n, 'i') npsc = getconf("npsc", "ccn", n, 'i') s=FofbTool.Configuration.cellnode_configure_ccn(p, nbpm, npsc) success = success and s p = config["tangopath.fofbnodes"]["centralnode"] nbpm = [getconf("nbpm", "ccn", n, 'i') for n in config['tangopath.fofbnodes'].keys() if 'cellnode' in n] npsc = getconf("npsc", "ccn", 'centralnode', 'i') s=FofbTool.Configuration.centralnode_configure_ccn(p, nbpm, npsc) success = success and s return success def get_prx_from_nodename(nodename): """ Return a tango.DeviceProxy from a node name. On failure, log error and return None. PARAMETERS ---------- nodename: str The target fofbnode, ie 'cellnode-c09' or 'centralnode' RETURN ------ prx: tango.DeviceProxy or None """ # Get device proxy try: p = config["tangopath.fofbnodes"][nodename.lower()] except KeyError: logger.error("Wrong nodename. Possibilities are {}".format(list(config["tangopath.fofbnodes"].keys()))) return None try: prx= tango.DeviceProxy(p) except tango.DevFailed as e: logger.error("Failed to get the Device proxy to '{}'".format(p)) logger.debug(str(e)) return None return prx def stop_all_combpm(force=False): """ Apply stop command on all Cellnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(force): logger.warning("Not running stop combpm because FOFB seems to be running.") return for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: FofbTool.Operation.stop_combpm(p) def stop_all_comlbp(force=False): """ Apply stop and reset command on all Cellnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(force): logger.warning("Not running stop comlbp because FOFB seems to be running.") return for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: try: FofbTool.Operation.reset_comlbp(p) FofbTool.Operation.stop_comlbp(p) except (tango.DevFailed, TypeError): logger.error("Could not stop cpmlbp on {}".format(n)) def stop_all_ccn(force=False): """ Apply stop and reset commands on all fofbnodes. Check beforehand that the FOFB is not running. Display warning if. """ if not check_fofbnotrunning(force): logger.warning("Not running stop comccn because FOFB seems to be running.") return for n,p in config["tangopath.fofbnodes"].items(): FofbTool.Operation.stop_ccn(p) FofbTool.Operation.reset_ccn(p) def start_all_combpm(): """ Apply start command on all Cellnodes. """ for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: FofbTool.Operation.start_combpm(p) def start_all_comlbp(): """ Apply start comlbp command on all Cellnodes. """ for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: try: FofbTool.Operation.start_comlbp(p) except (tango.DevFailed, TypeError): logger.error("Could not start LBP on {}".format(n)) def start_all_ccn(): """ Apply start commands on all fofbnodes. """ for n,p in config["tangopath.fofbnodes"].items(): FofbTool.Operation.start_ccn(p) def align_all_ccn(cellnodename, force=False): """ Launch alignement FA sequence for Electron and Brillance Plus. """ if not check_fofbnotrunning(force): logger.warning("Not running align ccn because FOFB seems to be running.") return logger.debug("Use {} for sequence alignement".format(cellnodename)) cellnodepath=FofbTool.Utils.get_prx_from_nodename(cellnodename) seqoffset = FofbTool.Operation.align_ccn(cellnodepath, 0) if seqoffset is None: logger.error("Could not align all ccn") return for n,p in config["tangopath.fofbnodes"].items(): if 'cellnode' in n: try: FofbTool.Configuration.cellnode_configure_comlbp(n, seqoffset) except (tango.DevFailed, TypeError): logger.error("Could not set comlbp offset on {}".format(n)) def conf_centralnode_corr(): p = config["tangopath.fofbnodes"]["centralnode"] FofbTool.Configuration.centralnode_configure_corr(p, getconf('numbpm', 'corr', t='i'), getconf('pscid', 'corr', t='i', l=True), [getconf( "k1{}_x".format(l), "corr", t='i') for l in ["a","b","ic","d"]], [getconf( "k1{}_y".format(l), "corr", t='i') for l in ["a","b","ic","d"]], [getconf( "k2{}_x".format(l), "corr", t='i') for l in ["a","b","ic","d"]], [getconf( "k2{}_y".format(l), "corr", t='i') for l in ["a","b","ic","d"]], ) def sync_all_bpm(): """ Synchronize all BPM electronics, Electron and Brillance Plus. This will use the timing system (central and local board). """ db = tango.Database() bpmlist = [n.split(':')[2] for n in db.get_property("FOFB", "bpmlist")['bpmlist'] if ":LIBERA:" in n] bpmidlist = [(int(n.split(':')[0]), n.split(':')[2]) for n in db.get_property("FOFB", "bpmlist")['bpmlist'] if 'LIBERA' in n] tlocal = tango.Group('tlocal') tlocal.add([n.split(':')[2] for n in db.get_property("FOFB", 'TimingBoardList')['TimingBoardList'] if "LOCAL" in n]) # Set a group of Libera Brillance Plus EVRX board, from FofbTool configuration lbpevrx = tango.Group('lbpevrx') lbpevrx.add(FofbTool.Configuration.config["tangopath"]["lbpevrx"].split()) tcentral = tango.DeviceProxy(FofbTool.Configuration.config["tangopath"]["timing-central"]) FofbTool.Operation.sync_bpm(bpmlist, lbpevrx, tlocal, tcentral)