################################################################################################### # OPERATION FUNCTIONS ################################################################################################### # # Contains functions to operate the FOFB application. # ################################################################################################### import tango import logging import numpy as np import time # Get the module logger logger = logging.getLogger("FofbTool") def trycatch_tango_devfailed(func): def inner_function(*args, **kwargs): try: return func(*args, **kwargs) except tango.DevFailed as e: logger.error("DevFailed met during operation") logger.debug(e) return False return inner_function ################################################################################################### # OPERATIONS ON CCN ################################################################################################### @trycatch_tango_devfailed def align_ccn(node_tangopath, comlbpif=0): """ Align FA sequence number on a cellnode. Returns the computed offset. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' comlbpif: int which comlbp interface to use (0 to 3) RETURNS ------- seqoffset: int or None If success, return the found sequece offset. Return None on failure. """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return None logger.info("Trying to detect and align FA sequences") logger.debug("Reset offset on comlbp") prx["comlbp{}_seqoffset".format(comlbpif)]=0 stop_comlbp(node_tangopath) stop_combpm(node_tangopath) time.sleep(1) start_comlbp(node_tangopath) start_combpm(node_tangopath) # Loop until two consecutives identical measures N=[7,3] # random start while N[-2] != N[-1]: if len(N) > 7: logger.error("Could not measure sequence offset.") return None logger.debug("Latch two sequence numbers without offset") prx.ccnpack0_control=1 # let it roll a bit to collect data time.sleep(2+np.random.uniform()) prx.ccnpack0_control=2 # latch it time.sleep(2) _N = prx.ccnpack0_latchedseq1-prx.ccnpack0_latchedseq2 # handle diff going from 17 bits to 16bits offset if _N > 0x7FFF: _N=_N-0x10000 if -_N > 0x8000: _N=_N+0x10000 N.append(_N) logger.debug("seq ({}, {}, {})".format(prx.ccnpack0_latchedseq1, prx.ccnpack0_latchedseq2, N[-1])) N=N[-1] if N in (-1, 0, 1): logger.warning("Sequence offset measured = {}, something might be wrong".format(N)) logger.debug("Setting sequence offset to {}".format(N)) prx["comlbp{}_seqoffset".format(comlbpif)]=N seqoffset=N logger.debug("Perform autocheck") logger.debug("Latch two sequence numbers without offset") prx.ccnpack0_control=1 time.sleep(2) prx.ccnpack0_control=2 time.sleep(2) N=prx.ccnpack0_latchedseq2-prx.ccnpack0_latchedseq1 if not N in (-1, 1): logger.warning("Corrected sequence offset measured = {}, something might be wrong. Run it again".format(N)) return None return seqoffset @trycatch_tango_devfailed def stop_ccn(node_tangopath, ccnif=[]): """ Stop the communication with cellnode on the specified fofbnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' ccnif: list(int) List of the interface to stop. If empty, all the possible interface will be stopped. RETURNS ------- bool: success """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False if len(ccnif) == 0: ccnif = [int(a[7]) for a in prx.get_attribute_list() if "ccnpack" in a if "control" in a] logger.info("Stopping CCN on {}".format(node_tangopath)) for n in ccnif: if 'central' in node_tangopath: prx["ccnunpack{}_control".format(n)] = False prx["ccnpack{}_control".format(n)] = False else: prx["ccnpack{}_control".format(n)] = False prx["ccnunpack{}_control".format(n)] = False return True @trycatch_tango_devfailed def reset_ccn(node_tangopath, ccnif=[]): """ Reset the communication with cellnode on the specified fofbnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' ccnif: list(int) List of the interface to stop. If empty, all the possible interface will be stopped. RETURNS ------- bool: success """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False if len(ccnif) == 0: ccnif = [int(a[6]) for a in prx.get_attribute_list() if "ccneth" in a if "gt_reset" in a] logger.info("Reset CCN on {}".format(node_tangopath)) for n in ccnif: prx["ccneth{}_reset".format(n)] = 0x60000001 # impossible to write 0xE0000001 prx["ccneth{}_gt_reset".format(n)] = 1 time.sleep(2) for n in ccnif: prx["ccneth{}_gt_reset".format(n)] = 0 prx["ccneth{}_reset".format(n)] = 0 ack_ccn(node_tangopath, ccnif) return True @trycatch_tango_devfailed def start_ccn(node_tangopath, ccnif=[]): """ Start the communication with cellnode on the specified fofbnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' ccnif: list(int) List of the interface to stop. If empty, all the possible interface will be stopped. RETURNS ------- bool: success """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False if len(ccnif) == 0: ccnif = [int(a[7]) for a in prx.get_attribute_list() if "ccnpack" in a if "control" in a] logger.info("Starting CCN on {}".format(node_tangopath)) ack_ccn(node_tangopath, ccnif) for n in ccnif: if 'central' in node_tangopath: prx["ccnunpack{}_control".format(n)] = 1 prx["ccnpack{}_control".format(n)] = 1 else: prx["ccnpack{}_control".format(n)] = 1 prx["ccnunpack{}_control".format(n)] = 1 return True @trycatch_tango_devfailed def ack_ccn(node_tangopath, ccnif=[]): """ Start the communication with cellnode on the specified fofbnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' ccnif: list(int) List of the interface to stop. If empty, all the possible interface will be stopped. RETURNS ------- bool: success """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False if len(ccnif) == 0: ccnif = [int(a[7]) for a in prx.get_attribute_list() if "ccnpack" in a if "control" in a] logger.info("Ack CCN error on {}".format(node_tangopath)) for n in ccnif: prx["ccnpack{}_reset_error".format(n)] = True prx["ccnunpack{}_reset_error".format(n)] = True time.sleep(1) for n in ccnif: prx["ccnpack{}_reset_error".format(n)] = False prx["ccnunpack{}_reset_error".format(n)] = False return True ################################################################################################### # OPERATIONS ON COMBPM ################################################################################################### @trycatch_tango_devfailed def stop_combpm(node_tangopath): """ Stop the communication with bpm on the specified cellnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' RETURN ------ success: boolean """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False logger.info("Stopping ComBPM on {}".format(node_tangopath)) prx["combpm_reset"] = 1 prx["combpm_gt_control"] = 0x5 return True @trycatch_tango_devfailed def start_combpm(node_tangopath): """ Start the communication with bpm on the specified cellnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' RETURN ------ success: boolean """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False logger.info("Starting ComBpm on {}".format(node_tangopath)) prx["combpm_reset"] = 0 prx["combpm_gt_control"] = 0x1 time.sleep(1) ack_combpm(node_tangopath) return True @trycatch_tango_devfailed def ack_combpm(node_tangopath): """ Ack errors on the communication with bpm on the specified cellnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' RETURN ------ success: boolean """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False logger.info("Ack ComBpm on {}".format(node_tangopath)) prx["combpm_reset_error"] = True time.sleep(1) prx["combpm_reset_error"] = False return True ################################################################################################### # OPERATIONS ON COMLBP ################################################################################################### @trycatch_tango_devfailed def stop_comlbp(node_tangopath): """ Stop the communication with LBP on the specified cellnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' RETURN ------ success: boolean """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False logger.info("Stopping ComLBP on {}".format(node_tangopath)) for n in range(4): prx["comlbp{}_control".format(n)] = 1 time.sleep(1) for n in range(4): prx["comlbp{}_control".format(n)] = 0 return True @trycatch_tango_devfailed def start_comlbp(node_tangopath): """ Start the communication with LBP on the specified cellnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' RETURN ------ success: boolean """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False logger.info("Starting ComLBP on {}".format(node_tangopath)) for n in range(4): prx["comlbp{}_control".format(n)] = 0x10 return True @trycatch_tango_devfailed def reset_comlbp(node_tangopath): """ Reset the communication with LBP on the specified cellnode. PARAMETERS ---------- node_tangopath: str The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09' RETURN ------ success: boolean """ try: prx=tango.DeviceProxy(node_tangopath) prx.ping() except tango.DevFailed: logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath)) return False logger.info("Reset ComLBP on {}".format(node_tangopath)) for n in range(4): prx["comlbp{}_control".format(n)] = 0x3 time.sleep(1) for n in range(4): prx["comlbp{}_control".format(n)] = 0x0 return True ################################################################################################### # OPERATIONS ON BPM ELECTRONICS ################################################################################################### # Some local constants ADDR_CCCFG=0 # Register address of Libera Electron ADDR_FAICFG=0x2000 # Register address of Libera Electron def electron_group_writefadata(group, listargs): args = tango.DeviceData() args.insert(tango.DevVarLongArray, listargs) r=group.command_inout("WriteFAData", args) return r def electron_init_fa(bpmlist): """ Configure FA register of Libera Electron PARAMETERS ---------- bpmlist: list of (bpmid, bpmpath) ID to apply and path of BPM tango device RETURNS ------- success: boolean True on success """ # Init BPMs, put for each the ID number # Cannot do that with group logger.info("Initialize BPMs") for bpmid, bpmpath in bpmlist: logger.debug("Initialize BPM {}".format(bpmpath)) pbpm = tango.DeviceProxy(bpmpath) try: pbpm.ping() except tango.ConnectionFailed: logger.error("Failed to connect to {}".format(bpmpath)) return False # Configure : bpmid, tframe, mgtpower, mgtlb, bfrclr electron_group_writefadata(pbpm, [ADDR_CCCFG, 4, 5, bpmid, 6000, 0, 0, 6000]) # Ack rise electron_group_writefadata(pbpm, [ADDR_FAICFG, 4, 1, 9]) # Ack fall electron_group_writefadata(pbpm, [ADDR_FAICFG, 4, 1, 8]) return True def electron_sync_next_trigger(bpmlist): """ Prepare Libera Electron to sync on next trigger PARAMETERS ---------- bpmlist: list of str List of Tango paths to Libera devices RETURNS ------- success: boolean True on success """ logger.info("Prepare Libera Electron to sync at next trigger") bpms = tango.Group('bpms') bpms.add(bpmlist) r=bpms.command_inout("settimeonnexttrigger") for _r in r: if _r.has_failed(): logger.error("Failed to apply command SetTimeOnNextTrigger on bpm {}".format(_r.dev_name())) return False return True def electron_start_next_trigger(bpmlist): """ Prepare Libera Electron to start on next trigger PARAMETERS ---------- bpmlist: list of str List of Tango paths to Libera devices RETURNS ------- success: boolean True on success """ logger.info("Prepare Libera Electron to start at next trigger") bpms = tango.Group('bpms') bpms.add(bpmlist) # Write start, tframelim=zero, no debug data r=electron_group_writefadata(bpms, [ADDR_FAICFG, 4, 1, 8]) for _r in r: if _r.has_failed(): logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name())) return False # Write Enable ITech FAI r=electron_group_writefadata(bpms, [ADDR_FAICFG+4, 4, 1,1]) for _r in r: if _r.has_failed(): logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name())) return False return True def electron_stop_com(bpmlist): """ Stop Libera Electron FA communication PARAMETERS ---------- bpmlist: list of str List of Tango paths to Libera devices RETURNS ------- success: boolean True on success """ logger.info("Stop Libera Electron FA communication") bpms = tango.Group('bpms') bpms.add(bpmlist) # Write Disable ITech FAI r=electron_group_writefadata(bpms, [ADDR_FAICFG+4, 4, 1, 0]) for _r in r: if _r.has_failed(): logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name())) return False # Soft stop r=electron_group_writefadata(bpms, [ADDR_FAICFG+8, 4, 1, 0]) for _r in r: if _r.has_failed(): logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name())) return False # Reset User FAI r=electron_group_writefadata(bpms, [ADDR_FAICFG, 4, 1, 0]) for _r in r: if _r.has_failed(): logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name())) return False return True ################################################################################################### # OPERATIONS FOR LBP and Electron SYNCHRONIZATION ################################################################################################### @trycatch_tango_devfailed def sync_bpm(bpmidlist, lbpevrx, timinglocal, timingcentral): """ Synchronize all BPM electronics, Electron and Brillance Plus. This will use the timing system (central and local board). PARAMETERS: ----------- bpmidlist: list of tuple list of tuple (ID, tangopath) to put on Libera Electron lbpevrx: list list of LBP Evrx tango path. Can be an empty array. timinglocal: list list of Timing local itango path to set event timingcentral: str Tango path of timing central RETURN ------ success: boolean """ EVN=240 # Event number bpmlist = [b[1] for b in bpmidlist] tlocal = tango.Group('tlocal') tlocal.add(timinglocal) tcentral = tango.DeviceProxy(timingcentral) glbpevrx = tango.Group('lbpevrx') glbpevrx.add(lbpevrx) # --------------------------------------------------------------------------------------------------------------- # Init BPMs, stop first and put for each the ID number if not electron_stop_com(bpmlist): return False if not electron_init_fa(bpmidlist): return False # --------------------------------------------------------------------------------------------------------------- # Write event number logger.info("Set Event Number on local timing board, BpmTriggerEvent") r=tlocal.write_attribute("bpm.trigEvent", EVN) for _r in r: if _r.has_failed(): logger.error("Failed to set Event Number on local timing board {}, bpm.trigEvent".format(_r.dev_name())) return False # --------------------------------------------------------------------------------------------------------------- # Prepare bpm for trigger reception if not electron_sync_next_trigger(bpmlist): return False logger.info("Prepare Libera Brillance Plus to start on next trigger") r=glbpevrx.write_attribute("synchronize", 0) for _r in r: if _r.has_failed(): logger.error("Failed to write synchronize on LBP EVRX {}".format(_r.dev_name())) return False # --------------------------------------------------------------------------------------------------------------- # Wait 2 seconds and Fire the soft event time.sleep(2) logger.info("Fire the trigger") tcentral.write_attribute("softEventAdress", EVN) tcentral.firesoftevent() time.sleep(2) # --------------------------------------------------------------------------------------------------------------- # Start electron on next trigger if not electron_start_next_trigger(bpmlist): return False # --------------------------------------------------------------------------------------------------------------- # Wait 2 seconds and Fire the soft event time.sleep(2) logger.info("Fire the trigger") tcentral.write_attribute("softEventAdress", EVN) tcentral.firesoftevent() time.sleep(2) # --------------------------------------------------------------------------------------------------------------- # Write event number back to 3 logger.info("Set Event Number back to 3 on local timing board, BpmTriggerEvent") r=tlocal.write_attribute("bpm.trigEvent", 3) for _r in r: if _r.has_failed(): logger.error("Failed to set Event Number on local timing board {}, bpm.trigEvent".format(_r.dev_name())) return False return True