Skip to content
Snippets Groups Projects
Select Git revision
  • a0858b6ccf72d35bafd5fc6b94def8fa725a4fe5
  • main default protected
  • pck_cnt
  • upgrade_yocto
  • dev
  • test_chtk
  • feat-xvc-legacy
7 results

daqcapt

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    daqcapt 3.30 KiB
    #!/usr/bin/env python3
    import time
    import argparse
    import numpy as np
    import os, mmap, ctypes
    import glob
    import deviceaccess
    
    # DAQ memory zone
    daqmem = mmap.mmap(os.open("/dev/ddrpl", os.O_RDWR | os.O_SYNC),
                0x40000000, mmap.MAP_SHARED, (mmap.PROT_READ | mmap.PROT_WRITE))
    
    # DAQ device
    deviceaccess.setDMapFilePath("/opt/fofb/map/devices.dmap")
    appdev = deviceaccess.Device("APPUIO")
    appdev.open()
    
    # Number of Bytes Per Sample
    NBPS=8
    
    
    def configure(ena, N):
        """
        Configure DAQ regions.
    
        ena: int
            bitsel for enabling region (one bit per region)
        N: int, list(int)
            Number of samples. If int, same number for both region.
        """
    
        # Select tab 1 for both regions
        appdev.write("APP.daq_0.TAB_SEL", np.int(1), 0)
        appdev.write("APP.daq_0.TAB_SEL", np.int(1), 1)
    
        # Set N samples for both regions
        if type(N) == int:
            appdev.write("APP.daq_0.SAMPLES", np.int(N), 0)
            appdev.write("APP.daq_0.SAMPLES", np.int(N), 1)
        else:
            appdev.write("APP.daq_0.SAMPLES", np.int(N[0]), 0)
            appdev.write("APP.daq_0.SAMPLES", np.int(N[1]), 1)
    
        # Enable regions
        appdev.write("APP.daq_0.ENABLE", np.int(ena))
    
    def trigger():
        appdev.write("APP.DAQ_CONTROL", np.int(1))
        appdev.write("APP.DAQ_CONTROL", np.int(0))
    
    def stop(b=True):
        appdev.write("APP.DAQ_CONTROL", np.int(3*b))
    
    def status():
    
        for r in ["ENABLE", "ACTIVE_BUF"]:
            print("{:>20} {:10}".format(r,
                appdev.read("APP.daq_0."+r, np.dtype('u4'))))
    
        for r in ["TAB_SEL", "STROBE_CNT", "SAMPLES", "FIFO_STATUS", "SENT_BURST_CNT", "TRG_CNT_BUF0", "TRG_CNT_BUF1"]:
            r0, r1 =appdev.read("APP.daq_0."+r, np.dtype('u4'))
            print("{:>20} {:10} {:10}".format(r, r0, r1))
    
    def write_capture(path="./", region=[0,1]):
        d=time.strftime("%m%d_%H%M%S")
    
        for r, b, o in zip((0,1), NBPS*appdev.read("APP.daq_0.SAMPLES", np.dtype('u4')), [0,0x20000000]):
            if r in region:
                with open(path+"DAQCAPT_{}_R{}.bin".format(d, r), 'wb') as fp:
                    fp.write(daqmem[o:o+b])
    
        return path+"DAQCAPT_{}_R{}.bin".format(d, r)
    
    if __name__ == "__main__":
        parser = argparse.ArgumentParser(description="DAQ capture and status")
    
        parser.add_argument("--configure", action="store_true", help="Configure")
        parser.add_argument("--stop", action="store_true", help="Send stop signal")
        parser.add_argument("-t", action="store_true", help="Trigger DAQ")
        parser.add_argument("-c", action="store_true", help="Dump memory to file")
        parser.add_argument("-R", type=int, choices=(0,1), help="Region selector, default is both")
    
        parser.add_argument("-N", type=int, default=1024, help="Number of samples to capture")
        parser.add_argument("--path", type=str, default="./", help="Output path")
    
        parser.add_argument("-s", action="store_true", help="Print status (after other actions)")
    
        args=parser.parse_args()
    
    
        if args.configure:
            if args.R is None:
                configure(3, args.N)
            else:
                configure(int(args.R)+1, args.N)
    
        if args.stop:
            stop()
            exit(0)
        else:
            stop(False)
    
        if args.t:
            trigger()
    
        if args.c:
            if args.R is None:
                fn=write_capture(args.path)
            else:
                fn=write_capture(args.path, [args.R, ])
            print(fn)
    
        if args.s:
            status()