Newer
Older
###################################################################################################
# OPERATION FUNCTIONS
###################################################################################################
#
# Contains functions to operate the FOFB application.
#
###################################################################################################
import tango
import logging
import numpy as np
import time
# Get the module logger
logger = logging.getLogger("FofbTool")
def trycatch_tango_devfailed(func):
def inner_function(*args, **kwargs):
try:
except tango.DevFailed as e:
logger.error("DevFailed met during operation")
logger.debug(e)
return False
return inner_function
###################################################################################################
# OPERATIONS ON CCN
###################################################################################################
@trycatch_tango_devfailed
Align FA sequence number on a cellnode. Returns the computed offset.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
comlbpif: int
which comlbp interface to use (0 to 3)
RETURNS
-------
seqoffset: int or None
If success, return the found sequece offset.
Return None on failure.
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
return None
logger.info("Trying to detect and align FA sequences")
logger.debug("Reset offset on comlbp")
prx["comlbp{}_seqoffset".format(comlbpif)]=0
stop_comlbp(node_tangopath)
stop_combpm(node_tangopath)
start_comlbp(node_tangopath)
start_combpm(node_tangopath)
# Loop until two consecutives identical measures
if len(N) > 7:
logger.error("Could not measure sequence offset.")
logger.debug("Latch two sequence numbers without offset")
prx.ccnpack0_control=1 # let it roll a bit to collect data
prx.ccnpack0_control=2 # latch it
time.sleep(2)
_N = prx.ccnpack0_latchedseq1-prx.ccnpack0_latchedseq2
# handle diff going from 17 bits to 16bits offset
if _N > 0x7FFF:
_N=_N-0x10000
if -_N > 0x8000:
_N=_N+0x10000
N.append(_N)
logger.debug("seq ({}, {}, {})".format(prx.ccnpack0_latchedseq1, prx.ccnpack0_latchedseq2, N[-1]))
N=N[-1]
if N in (-1, 0, 1):
logger.warning("Sequence offset measured = {}, something might be wrong".format(N))
logger.debug("Setting sequence offset to {}".format(N))
prx["comlbp{}_seqoffset".format(comlbpif)]=N
logger.debug("Perform autocheck")
logger.debug("Latch two sequence numbers without offset")
prx.ccnpack0_control=1
time.sleep(2)
prx.ccnpack0_control=2
time.sleep(2)
N=prx.ccnpack0_latchedseq2-prx.ccnpack0_latchedseq1
if not N in (-1, 1):
logger.warning("Corrected sequence offset measured = {}, something might be wrong. Run it again".format(N))
@trycatch_tango_devfailed
"""
Stop the communication with cellnode on the specified fofbnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
ccnif: list(int)
List of the interface to stop. If empty, all the possible interface will be stopped.
RETURNS
-------
bool: success
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
if len(ccnif) == 0:
ccnif = [int(a[7]) for a in prx.get_attribute_list() if "ccnpack" in a if "control" in a]
logger.info("Stopping CCN on {}".format(node_tangopath))
for n in ccnif:
if 'central' in node_tangopath:
prx["ccnunpack{}_control".format(n)] = False
prx["ccnpack{}_control".format(n)] = False
else:
prx["ccnpack{}_control".format(n)] = False
prx["ccnunpack{}_control".format(n)] = False
@trycatch_tango_devfailed
"""
Reset the communication with cellnode on the specified fofbnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
ccnif: list(int)
List of the interface to stop. If empty, all the possible interface will be stopped.
RETURNS
-------
bool: success
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
if len(ccnif) == 0:
ccnif = [int(a[6]) for a in prx.get_attribute_list() if "ccneth" in a if "gt_reset" in a]
logger.info("Reset CCN on {}".format(node_tangopath))
for n in ccnif:
prx["ccneth{}_reset".format(n)] = 0x60000001 # impossible to write 0xE0000001
prx["ccneth{}_gt_reset".format(n)] = 1
time.sleep(2)
prx["ccneth{}_gt_reset".format(n)] = 0
prx["ccneth{}_reset".format(n)] = 0
@trycatch_tango_devfailed
"""
Start the communication with cellnode on the specified fofbnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
ccnif: list(int)
List of the interface to stop. If empty, all the possible interface will be stopped.
RETURNS
-------
bool: success
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
if len(ccnif) == 0:
ccnif = [int(a[7]) for a in prx.get_attribute_list() if "ccnpack" in a if "control" in a]
logger.info("Starting CCN on {}".format(node_tangopath))
if 'central' in node_tangopath:
prx["ccnunpack{}_control".format(n)] = 1
prx["ccnpack{}_control".format(n)] = 1
else:
prx["ccnpack{}_control".format(n)] = 1
prx["ccnunpack{}_control".format(n)] = 1
@trycatch_tango_devfailed
"""
Start the communication with cellnode on the specified fofbnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
ccnif: list(int)
List of the interface to stop. If empty, all the possible interface will be stopped.
RETURNS
-------
bool: success
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
if len(ccnif) == 0:
ccnif = [int(a[7]) for a in prx.get_attribute_list() if "ccnpack" in a if "control" in a]
logger.info("Ack CCN error on {}".format(node_tangopath))
for n in ccnif:
prx["ccnpack{}_reset_error".format(n)] = True
prx["ccnunpack{}_reset_error".format(n)] = True
time.sleep(1)
prx["ccnpack{}_reset_error".format(n)] = False
prx["ccnunpack{}_reset_error".format(n)] = False
###################################################################################################
# OPERATIONS ON COMBPM
###################################################################################################
@trycatch_tango_devfailed
"""
Stop the communication with bpm on the specified cellnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
RETURN
------
success: boolean
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
logger.info("Stopping ComBPM on {}".format(node_tangopath))
prx["combpm_reset"] = 1
prx["combpm_gt_control"] = 0x5
@trycatch_tango_devfailed
"""
Start the communication with bpm on the specified cellnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
RETURN
------
success: boolean
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
logger.info("Starting ComBpm on {}".format(node_tangopath))
prx["combpm_reset"] = 0
prx["combpm_gt_control"] = 0x1
time.sleep(1)
@trycatch_tango_devfailed
"""
Ack errors on the communication with bpm on the specified cellnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
RETURN
------
success: boolean
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
prx["combpm_reset_error"] = True
time.sleep(1)
prx["combpm_reset_error"] = False
###################################################################################################
# OPERATIONS ON COMLBP
###################################################################################################
@trycatch_tango_devfailed
"""
Stop the communication with LBP on the specified cellnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
RETURN
------
success: boolean
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
logger.info("Stopping ComLBP on {}".format(node_tangopath))
for n in range(4):
prx["comlbp{}_control".format(n)] = 1
time.sleep(1)
for n in range(4):
prx["comlbp{}_control".format(n)] = 0
@trycatch_tango_devfailed
"""
Start the communication with LBP on the specified cellnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
RETURN
------
success: boolean
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
logger.info("Starting ComLBP on {}".format(node_tangopath))
for n in range(4):
prx["comlbp{}_control".format(n)] = 0x10
@trycatch_tango_devfailed
"""
Reset the communication with LBP on the specified cellnode.
PARAMETERS
----------
node_tangopath: str
The target fofbnode tango path, ie 'ans/dg/fofb-cellnode-c09'
RETURN
------
success: boolean
try:
prx=tango.DeviceProxy(node_tangopath)
prx.ping()
except tango.DevFailed:
logger.error("Failed to obtain tango proxy or to ping to {}".format(node_tangopath))
logger.info("Reset ComLBP on {}".format(node_tangopath))
for n in range(4):
prx["comlbp{}_control".format(n)] = 0x3
time.sleep(1)
for n in range(4):
prx["comlbp{}_control".format(n)] = 0x0
###################################################################################################
# OPERATIONS ON BPM ELECTRONICS
###################################################################################################
# Some local constants
ADDR_CCCFG=0 # Register address of Libera Electron
ADDR_FAICFG=0x2000 # Register address of Libera Electron
def electron_group_writefadata(group, listargs):
args = tango.DeviceData()
args.insert(tango.DevVarLongArray, listargs)
r=group.command_inout("WriteFAData", args)
return r
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def electron_init_fa(bpmlist):
"""
Configure FA register of Libera Electron
PARAMETERS
----------
bpmlist: list of (bpmid, bpmpath)
ID to apply and path of BPM tango device
RETURNS
-------
success: boolean
True on success
"""
# Init BPMs, put for each the ID number
# Cannot do that with group
logger.info("Initialize BPMs")
for bpmid, bpmpath in bpmlist:
logger.debug("Initialize BPM {}".format(bpmpath))
pbpm = tango.DeviceProxy(bpmpath)
try:
pbpm.ping()
except tango.ConnectionFailed:
logger.error("Failed to connect to {}".format(bpmpath))
return False
# Configure : bpmid, tframe, mgtpower, mgtlb, bfrclr
electron_group_writefadata(pbpm, [ADDR_CCCFG, 4, 5, bpmid, 6000, 0, 0, 6000])
electron_group_writefadata(pbpm, [ADDR_FAICFG, 4, 1, 9])
electron_group_writefadata(pbpm, [ADDR_FAICFG, 4, 1, 8])
def electron_sync_next_trigger(bpmlist):
Prepare Libera Electron to sync on next trigger
PARAMETERS
----------
bpmlist: list of str
List of Tango paths to Libera devices
RETURNS
-------
success: boolean
True on success
"""
logger.info("Prepare Libera Electron to sync at next trigger")
bpms = tango.Group('bpms')
bpms.add(bpmlist)
r=bpms.command_inout("settimeonnexttrigger")
for _r in r:
if _r.has_failed():
logger.error("Failed to apply command SetTimeOnNextTrigger on bpm {}".format(_r.dev_name()))
return False
return True
def electron_start_next_trigger(bpmlist):
"""
Prepare Libera Electron to start on next trigger
PARAMETERS
----------
bpmlist: list of str
List of Tango paths to Libera devices
RETURNS
-------
success: boolean
True on success
"""
logger.info("Prepare Libera Electron to start at next trigger")
bpms = tango.Group('bpms')
bpms.add(bpmlist)
# Write start, tframelim=zero, no debug data
r=electron_group_writefadata(bpms, [ADDR_FAICFG, 4, 1, 8])
for _r in r:
if _r.has_failed():
logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name()))
return False
# Write Enable ITech FAI
r=electron_group_writefadata(bpms, [ADDR_FAICFG+4, 4, 1,1])
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
for _r in r:
if _r.has_failed():
logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name()))
return False
return True
def electron_stop_com(bpmlist):
"""
Stop Libera Electron FA communication
PARAMETERS
----------
bpmlist: list of str
List of Tango paths to Libera devices
RETURNS
-------
success: boolean
True on success
"""
logger.info("Stop Libera Electron FA communication")
bpms = tango.Group('bpms')
bpms.add(bpmlist)
# Write Disable ITech FAI
r=electron_group_writefadata(bpms, [ADDR_FAICFG+4, 4, 1, 0])
for _r in r:
if _r.has_failed():
logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name()))
return False
# Soft stop
r=electron_group_writefadata(bpms, [ADDR_FAICFG+8, 4, 1, 0])
for _r in r:
if _r.has_failed():
logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name()))
return False
# Reset User FAI
r=electron_group_writefadata(bpms, [ADDR_FAICFG, 4, 1, 0])
for _r in r:
if _r.has_failed():
logger.error("Failed to apply command WriteFAData on bpm {}".format(_r.dev_name()))
return False
return True
###################################################################################################
# OPERATIONS FOR LBP and Electron SYNCHRONIZATION
###################################################################################################
@trycatch_tango_devfailed
def sync_bpm(bpmidlist, lbpevrx, timinglocal, timingcentral):
"""
Synchronize all BPM electronics, Electron and Brillance Plus.
This will use the timing system (central and local board).
bpmidlist: list of tuple
list of tuple (ID, tangopath) to put on Libera Electron
list of LBP Evrx tango path. Can be an empty array.
timinglocal: list
list of Timing local itango path to set event
timingcentral: str
RETURN
------
success: boolean
bpmlist = [b[1] for b in bpmidlist]
tlocal = tango.Group('tlocal')
tlocal.add(timinglocal)
tcentral = tango.DeviceProxy(timingcentral)
glbpevrx = tango.Group('lbpevrx')
glbpevrx.add(lbpevrx)
# ---------------------------------------------------------------------------------------------------------------
# Init BPMs, stop first and put for each the ID number
if not electron_stop_com(bpmlist):
return False
if not electron_init_fa(bpmidlist):
return False
# ---------------------------------------------------------------------------------------------------------------
# Write event number
logger.info("Set Event Number on local timing board, BpmTriggerEvent")
r=tlocal.write_attribute("bpm.trigEvent", EVN)
for _r in r:
if _r.has_failed():
logger.error("Failed to set Event Number on local timing board {}, bpm.trigEvent".format(_r.dev_name()))
# ---------------------------------------------------------------------------------------------------------------
# Prepare bpm for trigger reception
if not electron_sync_next_trigger(bpmlist):
return False
logger.info("Prepare Libera Brillance Plus to start on next trigger")
r=glbpevrx.write_attribute("synchronize", 0)
for _r in r:
if _r.has_failed():
logger.error("Failed to write synchronize on LBP EVRX {}".format(_r.dev_name()))
# ---------------------------------------------------------------------------------------------------------------
# Wait 2 seconds and Fire the soft event
time.sleep(2)
logger.info("Fire the trigger")
tcentral.write_attribute("softEventAdress", EVN)
tcentral.firesoftevent()
time.sleep(2)
# ---------------------------------------------------------------------------------------------------------------
# Start electron on next trigger
if not electron_start_next_trigger(bpmlist):
return False
# ---------------------------------------------------------------------------------------------------------------
# Wait 2 seconds and Fire the soft event
time.sleep(2)
logger.info("Fire the trigger")
tcentral.write_attribute("softEventAdress", EVN)
tcentral.firesoftevent()
time.sleep(2)
# ---------------------------------------------------------------------------------------------------------------
# Write event number back to 3
logger.info("Set Event Number back to 3 on local timing board, BpmTriggerEvent")
r=tlocal.write_attribute("bpm.trigEvent", 3)
for _r in r:
if _r.has_failed():
logger.error("Failed to set Event Number on local timing board {}, bpm.trigEvent".format(_r.dev_name()))