Skip to content
Snippets Groups Projects
Commit c31f223a authored by Alexis GAMELIN's avatar Alexis GAMELIN
Browse files

Add beam current scan and a few utility functions

parent 22c5703b
Branches
Tags
No related merge requests found
......@@ -27,3 +27,10 @@ from pythoncontrol.matlab import to_matlab
from pythoncontrol.growth_damp_utils import (FBTPostmortemViewer,
GrowthDampDataAnalyzer)
from pythoncontrol.misc import (get_time_str,
get_fbt_device,
check_feedback_stoped,)
from pythoncontrol.beam_current_scan import (beam_current_scan_loop,
beam_current_scan_step,)
\ No newline at end of file
# -*- coding: utf-8 -*-
import numpy as np
import h5py as hp
import matplotlib.pyplot as plt
import os
from time import sleep
from tango import DeviceProxy
from pythoncontrol.injection import inject_beam
from pythoncontrol.chro import chroma_measurement, chro_analysis
from pythoncontrol.misc import check_feedback_stoped, get_fbt_device
def beam_current_scan_loop(goal_current,
booster_cav,
emitH_max=None,
emitV_max=None,
stop_at_max_emit=True,
FBT_on_off=False,
FBT_gain=None,
FBT_plane="V",
N_current_increase=5,
file=None):
"""
Scan beam current until either:
- curent goal is met
- emittance blow-up is measured.
- current does not increase any more for max. recored value for
N_current_increase injections.
Two basics usages:
- to find threshold current for emittance blow-up.
- to find threshold current for current loss (by injecting small amount
of current w/ FBT ON and then switching it OFF).
Injection setup is supposed to already be set in SPM/LPM to scan single/multi-
bunch current.
Parameters
----------
goal_current : float
Goal beam current in [mA].
booster_cav : {1, 2}
Booster cavity number to be used.
emitH_max : float, optional
Maximum H emittance for loop exit condition in [nm.rad].
The default is None.
emitV_max : float, optional
Maximum V emittance for loop exit condition in [pm.rad].
The default is None.
stop_at_max_emit : bool, optional
If True, the loop scan stops if the measurement H/V emittance is larger
than emitH_max/emitV_max values.
If False, no condition on maximum H/V emittance.
The default is True.
FBT_on_off : bool, optional
If True, the FBT is switched on before each injection step and switched
off just after.
The default is False.
FBT_gain : int, optional
FBT gain value used for injection.
The default is None.
FBT_plane : {"V", "H"}
Plane in which the FBT is switched on/off during injection.
The default is "V".
N_current_increase : int, optional
Maximum number of injection trials during which the maximum current
during the scan has not increased before the scan is stopped.
The default is 10.
file : h5py.File, optional
HDF5 file where the data will be saved.
The default is None.
Returns
-------
None.
"""
# Init devices
dcct = DeviceProxy("ANS/DG/DCCT-CTRL")
tuneX = DeviceProxy("ans/rf/bbfdataviewer.1-tunex")
tuneY = DeviceProxy("ans/rf/bbfdataviewer.2-tunez")
phc_C02 = DeviceProxy("ans-c02/dg/phc-emit")
phc_C16 = DeviceProxy("ans-c16/dg/phc-emit")
FBT = get_fbt_device()
# Init arrays
dcct_current = []
phc_C02_emitH = []
phc_C02_emitV = []
phc_C16_emitH = []
phc_C16_emitV = []
tune_X = []
tune_X_freq = []
tune_X_amp = []
tune_X_phase = []
tune_Y = []
tune_Y_freq = []
tune_Y_amp = []
tune_Y_phase = []
# Various check
check_feedback_stoped()
if stop_at_max_emit:
if (emitH_max is None) or (emitV_max is None):
raise ValueError('emitH_max and emitV_max need to be specified.')
if FBT_on_off and (FBT_gain is None):
raise ValueError('FBT_gain need to be specified.')
if FBT_on_off:
if FBT_plane == "V":
ADCgain = "gainADC0"
elif FBT_gain == "H":
ADCgain = "gainADC4"
else:
raise ValueError('FBT_plane should be either "H" or "V".')
# Measurement loop
count = 0
max_current = 0
while dcct.current < goal_current:
if FBT_on_off:
setattr(FBT, ADCgain, FBT_gain)
sleep(1)
print("beam injection")
inject_beam(booster_cav)
inject_beam(booster_cav)
sleep(3)
if FBT_on_off:
setattr(FBT, ADCgain, 0)
sleep(2)
dcct_current.append(dcct.current)
phc_C02_emitH.append(phc_C02.emittanceh)
phc_C02_emitV.append(phc_C02.emittancev)
phc_C16_emitH.append(phc_C16.emittanceh)
phc_C16_emitV.append(phc_C16.emittancev)
tune_X.append(tuneX.nu)
tune_X_freq.append(tuneX.FFTabs)
tune_X_amp.append(tuneX.FFTord)
tune_X_phase.append(tuneX.FFTphase)
tune_Y.append(tuneY.nu)
tune_Y_freq.append(tuneY.FFTabs)
tune_Y_amp.append(tuneY.FFTord)
tune_Y_phase.append(tuneY.FFTphase)
if stop_at_max_emit:
if (phc_C02.emittanceh > emitH_max) or (phc_C16.emittanceh > emitH_max):
print("exit: Emit H max")
break
if (phc_C02.emittancev > emitV_max) or (phc_C16.emittancev > emitV_max):
print("exit: Emit V max")
break
if dcct.current < max_current:
count += 1
else:
count = 0
max_current = max(max_current, dcct.current)
if count > N_current_increase:
print("exit: DCCT current do not increase")
break
# Set back FBT gain
if FBT_on_off:
setattr(FBT, ADCgain, FBT_gain)
# Record to file
dcct_current = np.array(dcct_current)
phc_C02_emitH = np.array(phc_C02_emitH)
phc_C02_emitV = np.array(phc_C02_emitV)
phc_C16_emitH = np.array(phc_C16_emitH)
phc_C16_emitV = np.array(phc_C16_emitV)
tune_X = np.array(tune_X)
tune_X_freq = np.array(tune_X_freq)
tune_X_amp = np.array(tune_X_amp)
tune_X_phase = np.array(tune_X_phase)
tune_Y = np.array(tune_Y)
tune_Y_freq = np.array(tune_Y_freq)
tune_Y_amp = np.array(tune_Y_amp)
tune_Y_phase = np.array(tune_Y_phase)
if file is not None:
file["dcct_current"] = dcct_current
file["phc_C02_emitH"] = phc_C02_emitH
file["phc_C02_emitV"] = phc_C02_emitV
file["phc_C16_emitH"] = phc_C16_emitH
file["phc_C16_emitV"] = phc_C16_emitV
file["tune_X"] = tune_X
file["tune_X_freq"] = tune_X_freq
file["tune_X_amp"] = tune_X_amp
file["tune_X_phase"] = tune_X_phase
file["tune_Y"] = tune_Y
file["tune_Y_freq"] = tune_Y_freq
file["tune_Y_amp"] = tune_Y_amp
file["tune_Y_phase"] = tune_Y_phase
def beam_current_scan_step(file,
goal_current,
booster_cav,
chro_measurement=True,
fill_pattern_measurement=False,
**kwargs,
):
"""
Adds chromaticity and fill pattern measurements to beam current scan
(beam_current_scan_loop).
Can typically be used as a step function for an instability threhsold vs
chromaticity scan.
Parameters
----------
file : h5py.File
HDF5 file/group where the data for this step will be saved.
goal_current : float
Goal beam current in [mA].
booster_cav : {1, 2}
Booster cavity number to be used.
chro_measurement : bool, optional
If True, measure chromaticity before the beam current scan.
The default is True.
fill_pattern_measurement : bool, optional
If True, record filling pattern after beam current scan.
The default is False.
**kwargs
See beam_current_scan_loop.
Returns
-------
None.
"""
if chro_measurement:
delta, NuX, NuZ = chroma_measurement(N_step_delta=5, N_meas_tune=1, Pause_meas=2)
chro = chro_analysis(delta, NuX, NuZ, method="lin", N_step_delta=5, plot=False)
file["chro_delta"] = delta
file["chro_NuX"] = NuX
file["chor_NuY"] = NuZ
file["chro"] = chro
beam_current_scan_loop(goal_current,
booster_cav,
**kwargs)
if fill_pattern_measurement:
remplissage = DeviceProxy("ANS/DG/REMPLISSAGE")
file["filling_pattern"] = remplissage.bunchescurrent
print('Threshold current is: ' + str(np.max(file['dcct_current'][:])))
file.close()
#%% results
def print_thresholds_from_files():
for a in os.listdir():
if a[0:5] == 'SB_FB':
print(a)
f = hp.File(a)
print(np.max(f['dcct_current'][:]))
f.close()
else:
pass
def plot_threshold_vs_chroma():
I = []
chro = []
for a in os.listdir():
if a[0:5] == 'SB_FB':
f = hp.File(a)
I.append(np.max(f['dcct_current'][:]))
chro.append(f['chro'][1])
# print(f["dcct_current"][-1])
f.close()
else:
pass
I = np.array(I)
chro = np.array(chro)
I = I[np.argsort(chro)]
chro = chro[np.argsort(chro)]
plt.figure()
# plt.plot(chro, I, marker='.', linestyle="dashed", label="Classical instability (=current loss)")
plt.errorbar(chro, I, xerr=0.1, yerr=0.1, marker='.', linestyle="dashed", label="Head-Tail instability (=current loss)")
I = []
chro = []
for a in os.listdir():
if a[0:5] == 'SB_ch':
f = hp.File(a)
I.append(np.max(f['dcct_current'][:]))
chro.append(f['chro'][1])
# print(f["dcct_current"][-1])
f.close()
else:
pass
I = np.array(I)
chro = np.array(chro)
I = I[np.argsort(chro)]
chro = chro[np.argsort(chro)]
idx = chro < 0.17
plt.errorbar(chro[~idx], I[~idx], xerr=0.1, yerr=0.1, marker='x', linestyle="dashed", label="'kicker' instability (=V excitation)")
I_TMCI = I[idx]
chro_TMCI = chro[idx]
plt.errorbar(chro_TMCI, I_TMCI, xerr=0.1, yerr=0.1, marker='x', linestyle="dashed", label=" TMCI (=current loss)")
plt.legend()
plt.xlabel("V chromaticity")
plt.ylabel("Threshold current [mA]")
\ No newline at end of file
......@@ -11,6 +11,7 @@ import matplotlib.pyplot as plt
from time import sleep
from tango import DeviceProxy
from scipy.optimize import curve_fit
from pythoncontrol.misc import check_feedback_stoped
#%% chroma measurement
......@@ -52,8 +53,6 @@ def chroma_measurement(N_step_delta=5,
tuneX = DeviceProxy("ans/rf/bbfdataviewer.1-tunex")
tuneZ = DeviceProxy("ans/rf/bbfdataviewer.2-tunez")
RF = DeviceProxy('ANS/RF/MasterClock')
service_locker = DeviceProxy("ANS/CA/SERVICE-LOCKER-MATLAB")
fofb_watcher = DeviceProxy("ANS/DG/FOFB-WATCHER")
# Parameters
alphac = 4.1819e-04
......@@ -66,14 +65,7 @@ def chroma_measurement(N_step_delta=5,
if delta_change_max > 0.004:
raise ValueError("Maximum energy variation should be lower or equal to 0.4 % = 0.004.")
if service_locker.sofb:
raise ValueError("Stop the SOFB")
if fofb_watcher.fofbrunning_x or fofb_watcher.fofbrunning_y:
raise ValueError("Stop the FOFB")
if service_locker.isTunexFBRunning or service_locker.isTunezFBRunning:
raise ValueError("Stop the tune FB")
check_feedback_stoped()
print("starting chroma measurement")
......
......@@ -12,9 +12,9 @@ import numpy as np
import h5py as hp
from pythoncontrol.fitting import fit_risetime
from scipy.signal import hilbert
from time import gmtime, strftime
from time import sleep
from pythoncontrol.chro import chroma_measurement, chro_analysis
from pythoncontrol.misc import check_feedback_stoped, get_fbt_device, get_time_str
def take_data_gd(window_width,
window_delay=10,
......@@ -73,7 +73,7 @@ def take_data_gd(window_width,
# File setup
if file_name is None:
time = strftime("%H_%M_%S",gmtime())
time = get_time_str()
file_name = "growth_damp_" + str(total_current) + "mA_" + time + ".hdf5"
file = hp.File(file_name, "a")
......@@ -153,27 +153,11 @@ def growth_damp(window_width,
# Needed devices
delay = DeviceProxy("ANS-C09/RF/FINE_DELAY_GENERATOR")
service_locker = DeviceProxy("ANS/CA/SERVICE-LOCKER-MATLAB")
fofb_watcher = DeviceProxy("ANS/DG/FOFB-WATCHER")
tuneX = DeviceProxy("ans/rf/bbfdataviewer.1-tunex")
tuneZ = DeviceProxy("ans/rf/bbfdataviewer.2-tunez")
switch_FBT = DeviceProxy("ans/rf/switch_fbt")
if switch_FBT.fbt_number1 is True:
fbt_name = "ANS/RF/BBFV3-1"
elif switch_FBT.fbt_number2 is True:
fbt_name = "ANS/RF/BBFV3-2"
else:
ValueError("No FBT ?")
fbt = DeviceProxy(fbt_name)
if service_locker.sofb:
raise ValueError("Stop the SOFB")
if fofb_watcher.fofbrunning_x or fofb_watcher.fofbrunning_y:
raise ValueError("Stop the FOFB")
if service_locker.isTunexFBRunning or service_locker.isTunezFBRunning:
raise ValueError("Stop the tune FB")
fbt = get_fbt_device()
check_feedback_stoped()
if datasize != 4:
# Change data size
......@@ -192,7 +176,7 @@ def growth_damp(window_width,
# File setup
if file_name is None:
time = strftime("%H_%M_%S",gmtime())
time = get_time_str()
file_name = "growth_damp_" + time + ".hdf5"
file = hp.File(file_name, "a")
file["window_delay"] = window_delay
......
misc.py 0 → 100644
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu May 15 17:56:04 2025
@author: gamelina
"""
from time import gmtime, strftime
from tango import DeviceProxy
def get_time_str():
"Return a string with a time stamp."
time_str = strftime("%d_%m_%Y_%H_%M_%S_",gmtime())
return time_str
def get_fbt_device():
"""
Returns currently used FBT Tango device.
Returns
-------
fbt : tango.DeviceProxy
Currently used FBT device.
"""
switch_FBT = DeviceProxy("ans/rf/switch_fbt")
if switch_FBT.fbt_number1 is True:
fbt_name = "ANS/RF/BBFV3-1"
elif switch_FBT.fbt_number2 is True:
fbt_name = "ANS/RF/BBFV3-2"
else:
ValueError("No FBT ?")
fbt = DeviceProxy(fbt_name)
return fbt
def check_feedback_stoped(SOFB=True,
FOFB=True,
TuneFB=True):
"""
Check if the feedback are turned off.
Parameters
----------
SOFB : bool, optional
If True, check if SOFB is off.
The default is True.
FOFB : bool, optional
If True, check if FOFB is off.
The default is True.
TuneFB : bool, optional
If True, check if TuneFB is off.
The default is True.
Raises
------
ValueError
Returns
-------
None.
"""
service_locker = DeviceProxy("ANS/CA/SERVICE-LOCKER-MATLAB")
fofb_watcher = DeviceProxy("ANS/DG/FOFB-WATCHER")
if service_locker.sofb:
raise ValueError("Stop the SOFB")
if fofb_watcher.fofbrunning_x or fofb_watcher.fofbrunning_y:
raise ValueError("Stop the FOFB")
if service_locker.isTunexFBRunning or service_locker.isTunezFBRunning:
raise ValueError("Stop the tune FB")
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment