Select Git revision

BRONES Romain authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
daqcapt 3.30 KiB
#!/usr/bin/env python3
import time
import argparse
import numpy as np
import os, mmap, ctypes
import glob
import deviceaccess
# DAQ memory zone
daqmem = mmap.mmap(os.open("/dev/ddrpl", os.O_RDWR | os.O_SYNC),
0x40000000, mmap.MAP_SHARED, (mmap.PROT_READ | mmap.PROT_WRITE))
# DAQ device
deviceaccess.setDMapFilePath("/opt/fofb/map/devices.dmap")
appdev = deviceaccess.Device("APPUIO")
appdev.open()
# Number of Bytes Per Sample
NBPS=8
def configure(ena, N):
"""
Configure DAQ regions.
ena: int
bitsel for enabling region (one bit per region)
N: int, list(int)
Number of samples. If int, same number for both region.
"""
# Select tab 1 for both regions
appdev.write("APP.daq_0.TAB_SEL", np.int(1), 0)
appdev.write("APP.daq_0.TAB_SEL", np.int(1), 1)
# Set N samples for both regions
if type(N) == int:
appdev.write("APP.daq_0.SAMPLES", np.int(N), 0)
appdev.write("APP.daq_0.SAMPLES", np.int(N), 1)
else:
appdev.write("APP.daq_0.SAMPLES", np.int(N[0]), 0)
appdev.write("APP.daq_0.SAMPLES", np.int(N[1]), 1)
# Enable regions
appdev.write("APP.daq_0.ENABLE", np.int(ena))
def trigger():
appdev.write("APP.DAQ_CONTROL", np.int(1))
appdev.write("APP.DAQ_CONTROL", np.int(0))
def stop(b=True):
appdev.write("APP.DAQ_CONTROL", np.int(3*b))
def status():
for r in ["ENABLE", "ACTIVE_BUF"]:
print("{:>20} {:10}".format(r,
appdev.read("APP.daq_0."+r, np.dtype('u4'))))
for r in ["TAB_SEL", "STROBE_CNT", "SAMPLES", "FIFO_STATUS", "SENT_BURST_CNT", "TRG_CNT_BUF0", "TRG_CNT_BUF1"]:
r0, r1 =appdev.read("APP.daq_0."+r, np.dtype('u4'))
print("{:>20} {:10} {:10}".format(r, r0, r1))
def write_capture(path="./", region=[0,1]):
d=time.strftime("%m%d_%H%M%S")
for r, b, o in zip((0,1), NBPS*appdev.read("APP.daq_0.SAMPLES", np.dtype('u4')), [0,0x20000000]):
if r in region:
with open(path+"DAQCAPT_{}_R{}.bin".format(d, r), 'wb') as fp:
fp.write(daqmem[o:o+b])
return path+"DAQCAPT_{}_R{}.bin".format(d, r)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DAQ capture and status")
parser.add_argument("--configure", action="store_true", help="Configure")
parser.add_argument("--stop", action="store_true", help="Send stop signal")
parser.add_argument("-t", action="store_true", help="Trigger DAQ")
parser.add_argument("-c", action="store_true", help="Dump memory to file")
parser.add_argument("-R", type=int, choices=(0,1), help="Region selector, default is both")
parser.add_argument("-N", type=int, default=1024, help="Number of samples to capture")
parser.add_argument("--path", type=str, default="./", help="Output path")
parser.add_argument("-s", action="store_true", help="Print status (after other actions)")
args=parser.parse_args()
if args.configure:
if args.R is None:
configure(3, args.N)
else:
configure(int(args.R)+1, args.N)
if args.stop:
stop()
exit(0)
else:
stop(False)
if args.t:
trigger()
if args.c:
if args.R is None:
fn=write_capture(args.path)
else:
fn=write_capture(args.path, [args.R, ])
print(fn)
if args.s:
status()