Newer
Older
# -*- coding: utf-8 -*-
#
# This file is part of the DG_PY_FOFBTool project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.
"""
"""
# PyTango imports
import tango
from tango import DebugIt
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
from tango.server import device_property
from tango import AttrQuality, DispLevel, DevState
from tango import AttrWriteType, PipeWriteType
# Additional import
#----- PROTECTED REGION ID(DG_PY_FOFBTool.additionnal_import) ENABLED START -----#
import FofbTool.Operation
import FofbTool.Configuration
import logging
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#----- PROTECTED REGION END -----# // DG_PY_FOFBTool.additionnal_import
__all__ = ["DG_PY_FOFBTool", "main"]
class DG_PY_FOFBTool(Device):
"""
**Properties:**
- Device Property
combpm_bpmfilter
- Each line gives the BPM id allowed in the combpm engine of the cellnode.\nOne line per cellnode, in the order of the cellnodepath property.
- Type:'DevVarStringArray'
tangopath_cellnode
- Tango path of the cellnodes
- Type:'DevVarStringArray'
comcorr_pscid
- Line by line PSCID to program on cellnode interface.\nOne line by cellnode
- Type:'DevVarStringArray'
tangopath_watcher
- Tango path of FofbWatcher
- Type:'DevString'
corr_pscid
- PSCID of the inv matrix lines
- Type:'DevString'
ccn_npsc
- Number of PSC ID to send in centralnode frames.\nIdentical for all interfaces
- Type:'DevULong'
ccn_nbpm
- Number of BPMID to pack in each cellnode.\nOne valeu per cellnode
- Type:'DevVarLongArray'
logfilepath
- Type:'DevString'
tangopath_centraltiming
- Type:'DevString'
tangopath_centralnode
- Type:'DevString'
k1_x
- Type:'DevVarLongArray'
k2_x
- Type:'DevVarLongArray'
k1_y
- Type:'DevVarLongArray'
k2_y
- Type:'DevVarLongArray'
"""
# PROTECTED REGION ID(DG_PY_FOFBTool.class_variable) ENABLED START #
d_status = {}
"""
def dev_status(self):
return my_status()
"""
def my_status(self):
try:
return "\n\n".join([str(v) for v in self.d_status.values()])+"\n"
except Exception:
self.debug_stream(str(self.d_status))
return "Status failure\n"
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# PROTECTED REGION END # // DG_PY_FOFBTool.class_variable
# -----------------
# Device Properties
# -----------------
combpm_bpmfilter = device_property(
dtype='DevVarStringArray',
mandatory=True
)
tangopath_cellnode = device_property(
dtype='DevVarStringArray',
mandatory=True
)
comcorr_pscid = device_property(
dtype='DevVarStringArray',
mandatory=True
)
tangopath_watcher = device_property(
dtype='DevString',
mandatory=True
)
corr_pscid = device_property(
dtype='DevString',
mandatory=True
)
ccn_npsc = device_property(
dtype='DevULong',
mandatory=True
)
ccn_nbpm = device_property(
dtype='DevVarLongArray',
mandatory=True
)
logfilepath = device_property(
dtype='DevString',
mandatory=True
)
tangopath_centraltiming = device_property(
dtype='DevString',
)
tangopath_centralnode = device_property(
dtype='DevString',
mandatory=True
)
k1_x = device_property(
dtype='DevVarLongArray',
mandatory=True
)
k2_x = device_property(
dtype='DevVarLongArray',
mandatory=True
)
k1_y = device_property(
dtype='DevVarLongArray',
mandatory=True
)
k2_y = device_property(
dtype='DevVarLongArray',
mandatory=True
)
loglevel = device_property(
dtype='DevString',
mandatory=True
)
# ----------
# Attributes
# ----------
includeLBP = attribute(
dtype='DevBoolean',
access=AttrWriteType.READ_WRITE,
)
FofbToolVersion = attribute(
dtype='DevString',
)
logs = attribute(
dtype=('DevString',),
max_dim_x=1024,
)
# ---------------
# General methods
# ---------------
def init_device(self):
"""Initialises the attributes and properties of the DG_PY_FOFBTool."""
Device.init_device(self)
#----- PROTECTED REGION ID(DG_PY_FOFBTool.init_device) ENABLED START -----#
self.info_stream("FofbTool {}".format(FofbTool.__version__))
try:
fh=logging.FileHandler(self.logfilepath)
except FileNotFoundError:
logger.warning("Not logging to file, could not open location {}".format(self.logfilepath))
else:
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter("{asctime} {levelname:8}: {message}", style='{'))
logger.setLevel(getattr(logging, self.loglevel.upper()))'''
if not hasattr(self, "_include_lbp"):
self._include_lbp= False
#----- PROTECTED REGION END -----# // DG_PY_FOFBTool.init_device
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
#----- PROTECTED REGION ID(DG_PY_FOFBTool.always_executed_hook) ENABLED START -----#
#----- PROTECTED REGION END -----# // DG_PY_FOFBTool.always_executed_hook
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
#----- PROTECTED REGION ID(DG_PY_FOFBTool.delete_device) ENABLED START -----#
#logger = logging.getLogger("FofbTool")
#logger.removeHandler(self.filehandler)
#----- PROTECTED REGION END -----# // DG_PY_FOFBTool.delete_device
# ------------------
# Attributes methods
# ------------------
def read_includeLBP(self):
# PROTECTED REGION ID(DG_PY_FOFBTool.includeLBP_read) ENABLED START #
"""Return the includeLBP attribute."""
return self._include_lbp
# PROTECTED REGION END # // DG_PY_FOFBTool.includeLBP_read
def write_includeLBP(self, value):
# PROTECTED REGION ID(DG_PY_FOFBTool.includeLBP_write) ENABLED START #
"""Set the includeLBP attribute."""
# PROTECTED REGION END # // DG_PY_FOFBTool.includeLBP_write
def read_FofbToolVersion(self):
# PROTECTED REGION ID(DG_PY_FOFBTool.FofbToolVersion_read) ENABLED START #
"""Return the FofbToolVersion attribute."""
return FofbTool.__version__
# PROTECTED REGION END # // DG_PY_FOFBTool.FofbToolVersion_read
def read_logs(self):
# PROTECTED REGION ID(DG_PY_FOFBTool.logs_read) ENABLED START #
"""Return the logs attribute."""
with open(self.logfilepath, 'r') as fp:
lines = fp.readlines()
return [l.replace('\n','') for l in lines[:-100:-1]]
# PROTECTED REGION END # // DG_PY_FOFBTool.logs_read
# --------
# Commands
# --------
@command(
)
@DebugIt()
def configure(self):
# PROTECTED REGION ID(DG_PY_FOFBTool.configure) ENABLED START #
"""
:return:None
"""
success=True
self.d_status["configure"]="Configure: pending"
for cn, bpm in zip(self.tangopath_cellnode, self.combpm_bpmfilter):
bpmid = [int(b) for b in bpm.split()]
self.debug_stream("Set {} to {}".format(cn, bpmid))
s=FofbTool.Configuration.cellnode_configure_combpm(cn, bpmid)
success = success and s
if not s:
self.error_stream("Failed to configure COMBPM on {}".format(cn))
self.info_stream("Configure COMCORR")
for cn, psc in zip(self.tangopath_cellnode, self.comcorr_pscid):
pscid = [int(b) for b in psc.split()]
self.debug_stream("Set {} to {}".format(cn, pscid))
s=FofbTool.Configuration.cellnode_configure_comcorr(cn, pscid, True)
success = success and s
if not s:
self.error_stream("Failed to configure COMCORR on {}".format(cn))
self.info_stream("Configure CCN")
for cn, nbpm in zip(self.tangopath_cellnode, self.ccn_nbpm):
self.debug_stream("Set {} to {} bpm and {} psc".format(cn, nbpm, self.ccn_npsc))
FofbTool.Configuration.cellnode_configure_ccn(cn, nbpm, self.ccn_npsc)
success = success and s
if not s:
self.error_stream("Failed to configure CCN on {}".format(cn))
s=FofbTool.Configuration.centralnode_configure_ccn(self.tangopath_centralnode, self.ccn_nbpm, self.ccn_npsc)
success = success and s
if not s:
self.error_stream("Failed to configure CCN on {}".format(self.tangopath_centralnode))
numbpm = int(np.sum(self.ccn_nbpm))
self.debug_stream("Summed {} bpm for corrector".format(numbpm))
FofbTool.Configuration.centralnode_configure_corr(self.tangopath_centralnode,
numbpm, [int(p) for p in self.corr_pscid.split()],
self.k1_x, self.k1_y, self.k2_x, self.k2_y)
if success:
self.d_status["configure"]="Configure: success"
else:
self.d_status["configure"]="Configure: failed"
# PROTECTED REGION END # // DG_PY_FOFBTool.configure
@command(
)
@DebugIt()
def stop(self):
# PROTECTED REGION ID(DG_PY_FOFBTool.stop) ENABLED START #
"""
:return:None
"""
self.debug_stream("Stopping combpm and comlbp")
for cn in self.tangopath_cellnode:
FofbTool.Operation.stop_combpm(cn)
FofbTool.Operation.stop_comlbp(cn)
self.debug_stream("Stopping ccn")
for cn in self.tangopath_cellnode:
FofbTool.Operation.stop_ccn(cn)
FofbTool.Operation.reset_ccn(cn)
# PROTECTED REGION END # // DG_PY_FOFBTool.stop
@command(
)
@DebugIt()
def start(self):
# PROTECTED REGION ID(DG_PY_FOFBTool.start) ENABLED START #
"""
:return:None
"""
if self._include_lbp:
self.debug_stream("Starting comlbp")
for cn in self.tangopath_cellnode:
FofbTool.Operation.start_comlbp(cn)
else:
self.debug_stream("Skipping comlbp")
self.debug_stream("Starting combpm")
for cn in self.tangopath_cellnode:
FofbTool.Operation.start_combpm(cn)
self.debug_stream("Starting ccn")
for cn in self.tangopath_cellnode:
FofbTool.Operation.start_ccn(cn)
FofbTool.Operation.start_ccn(self.tangopath_centralnode)
# PROTECTED REGION END # // DG_PY_FOFBTool.start
@command(
)
@DebugIt()
def sync(self):
# PROTECTED REGION ID(DG_PY_FOFBTool.sync) ENABLED START #
"""
:return:None
"""
db = tango.Database()
self.d_status["synchronize"]="Synchronize: starting"
self.debug_stream("Building list form FREE PROPERTIES")
bpmidlist = [(int(n.split(':')[0]), n.split(':')[2]) for n in db.get_property("FOFB", "bpmlist")['bpmlist'] if 'LIBERA' in n]
tlocal = [n.split(':')[2] for n in db.get_property("FOFB", 'TimingBoardList')['TimingBoardList'] if "LOCAL" in n]
lbpevrx = db.get_property("FOFB", 'LBPEVRX')['LBPEVRX']
FofbTool.Operation.sync_bpm(bpmidlist, lbpevrx, tlocal, self.tangopath_centraltiming)
self.d_status["synchronize"]="Synchronize: done"
# PROTECTED REGION END # // DG_PY_FOFBTool.sync
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
@command(
dtype_in='DevLong',
doc_in="cellnode",
)
@DebugIt()
def align_fa(self, argin):
# PROTECTED REGION ID(DG_PY_FOFBTool.align_fa) ENABLED START #
"""
:param argin: 'DevLong'
cellnode
:return:None
"""
cn=self.tangopath_cellnode[argin]
self.debug_stream("Launch align FA on {}".format(cn))
seqoffset = FofbTool.Operation.align_ccn(cn, 0)
if (seqoffset is None) or seqoffset in (-1,0,1):
self.error_stream("Could not align all ccn")
self.d_status["align"]="FA Align: failed"
return
for cn in self.tangopath_cellnode:
FofbTool.Configuration.cellnode_configure_comlbp(cn, seqoffset)
self.d_status["align"]="FA Align: OK"
# PROTECTED REGION END # // DG_PY_FOFBTool.align_fa
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the DG_PY_FOFBTool module."""
# PROTECTED REGION ID(DG_PY_FOFBTool.main) ENABLED START #
return run((DG_PY_FOFBTool,), args=args, **kwargs)
# PROTECTED REGION END # // DG_PY_FOFBTool.main
if __name__ == '__main__':
main()