Skip to content
Snippets Groups Projects
Commit e82aa7d9 authored by Alexis GAMELIN's avatar Alexis GAMELIN
Browse files

Add growth damp and chromaticity scripts

parent a7520315
No related branches found
No related tags found
No related merge requests found
chro.py 0 → 100644
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 9 15:23:24 2024
@author: gamelina
"""
import numpy as np
import matplotlib.pyplot as plt
from time import sleep
from tango import DeviceProxy
from scipy.optimize import curve_fit
#%% Init
tuneX = DeviceProxy("ans/rf/bbfdataviewer.1-tunex")
tuneZ = DeviceProxy("ans/rf/bbfdataviewer.2-tunez")
RF = DeviceProxy('ANS/RF/MasterClock')
service_locker = DeviceProxy("ANS/CA/SERVICE-LOCKER-MATLAB")
#%% chroma measurement
def chroma_measurement(N_step_delta=5,
delta_change_max=0.002,
N_meas_tune=1,
sleep_time_tune_meas=4):
"""
Record data for chromaticity measurement.
Parameters
----------
N_step_delta : int, optional
Number of steps to use in chroma measurements.
Must be an odd number.
The default is 5.
delta_change_max : float, optional
Maximum relative energy (delta) variation for the chroma measurement.
Maximum allowed is 0.004 = 0.4 % which is default value in MML.
The default is 0.002.
N_meas_tune : TYPE, optional
Number of chroma measurement to do.
The default is 1.
sleep_time_tune_meas : float, optional
Time to wait before the tune measurement is done.
The default is 4.
Returns
-------
delta : array of float
Relative energy (delta) variation steps done.
NuX : array of float
Horizontal tune measured.
NuZ : array of float
Vertical tune measured.
"""
# Parameters
alphac = 4.1819e-04
sleep_time_RF_change = 5 # second
# Check before measurements
if N_step_delta % 2 != 1:
raise ValueError("N_step_delta must be an odd number.")
if delta_change_max > 0.004:
raise ValueError("Maximum energy variation should be lower or equal to 0.4 % = 0.004.")
if service_locker.sofb or service_locker.fofb:
raise ValueError("Stop the SOFB and FOFB")
if service_locker.isTunexFBRunning or service_locker.isTunezFBRunning:
raise ValueError("Stop the tune FB")
print("starting chroma measurement")
delta = np.linspace(-delta_change_max, delta_change_max, N_step_delta)
fRF_0 = RF.frequency
print("Initial fRF = ", RF.frequency)
delta_fRF_list = delta * alphac * fRF_0
NuX = np.zeros((N_step_delta, N_meas_tune))
NuZ = np.zeros((N_step_delta, N_meas_tune))
for i, delta_fRF in enumerate(delta_fRF_list):
RF.frequency = fRF_0 + delta_fRF
sleep(sleep_time_RF_change)
print("fRF = ", RF.frequency)
while round(RF.frequency,5) != round(fRF_0 + delta_fRF,5):
sleep(sleep_time_RF_change)
print("fRF = ", RF.frequency)
for j in range(N_meas_tune):
sleep(sleep_time_tune_meas)
NuX[i,j] = tuneX.Nu
NuZ[i,j] = tuneZ.Nu
RF.frequency = fRF_0
print("chroma measurement done")
print("Final fRF = ", RF.frequency)
return (delta, NuX, NuZ)
#%% chroma analysis
def chro_analysis(delta,
NuX,
NuZ,
method="lin",
plot=True):
"""
Compute chromaticity from measurement data.
Parameters
----------
delta : array of float
Relative energy (delta) variation steps done.
NuX : array of float
Horizontal tune measured.
NuZ : array of float
Vertical tune measured.
method : {"lin" or "quad"}, optional
"lin" uses a linear fit and "quad" a 2nd order polynomial.
The default is "lin".
plot : bool, optional
If True, plot the fit.
The default is True.
Returns
-------
chro : array
Array with horizontal and veritical chromaticity.
"""
delta = -delta
chro = []
N_step_delta = len(delta)
for i, Nu in enumerate([NuX, NuZ]):
tune0 = np.mean(Nu[N_step_delta//2,:])
dtune = np.mean(Nu[:,:],1) - tune0
# dtune_min = np.min(Nu[:,:],1) - tune0
# dtune_max = np.max(Nu[:,:],1) - tune0
if method=="lin":
def linear_fit(x, a, b):
return a*x + b
popt_lin, _ = curve_fit(linear_fit, delta, dtune)
chro.append(popt_lin[0])
elif method=="quad":
def quad_fit(x, a, b, c):
return a*x**2 + b*x + c
popt_quad, _ = curve_fit(quad_fit, delta, dtune)
chro.append(popt_quad[1])
if plot:
plt.figure()
# plt.errorbar(x=delta, y=dtune, yerr=[dtune_min, dtune_max])
plt.scatter(delta, dtune)
if method=="lin":
plt.plot(delta, linear_fit(delta, popt_lin[0], popt_lin[1]), '--',
label="fit: {0}x+{1}".format(round(popt_lin[0],4), round(popt_lin[1],8)))
elif method=="quad":
plt.plot(delta, quad_fit(delta, popt_quad[0], popt_quad[1], popt_quad[2]), '--',
label="fit: {0}x2+{1}x+{2}".format(round(popt_quad[0],4), round(popt_quad[1],4), round(popt_quad[2],4)))
plt.xlabel('$\\delta$')
if i == 0:
plt.title("Horizontal Chromaticity")
plt.ylabel('$\\delta nuX$')
else:
plt.title("Vertical Chromaticity")
plt.ylabel('$\\delta nuZ$')
plt.legend()
print(f"chro is {chro}")
return np.array(chro)
import numpy as np
def get_envelope(x, y, window_size=256):
'''Return rolling maximum of signal y along time x.'''
xm = np.zeros(len(x) // window_size)
ym = np.zeros(len(x) // window_size)
for i in range(0, len(x) - window_size, window_size):
xm[i // window_size] = i + np.argmax(y[i:i+window_size])
ym[i // window_size] = np.max(y[i:i+window_size])
return xm, ym
def fit_risetime(signal, smoothing_window_size=50, min_level=8e-5,
until=None, start_from_0=False, min_points=3,
min_n_risetimes=1.5, matplotlib_axis=None):
'''Fit the exponential rise time of a positive signal.'''
if start_from_0:
start = 0
else:
try:
start = np.where(signal > min_level)[0][0]
except:
return np.nan
to_be_fit = signal[start:until]
t, x = get_envelope(np.arange(len(to_be_fit)), to_be_fit,
window_size=smoothing_window_size)
ddt = np.gradient(x, t)
# dddt = np.gradient(ddt, t)
until = len(x)
exponent = 0
while exponent < min_n_risetimes / t[until-1]:
# extend fitting region until covers at least min_n_risetimes rise times
if start_from_0:
fit_start = 0
else:
try:
fit_start = np.where(
(ddt[:-2] > 0) & (ddt[1:-1] > 0) & (ddt[2:] > 0))[0][0]
except IndexError:
return np.nan
try:
until = np.where(
(ddt[:-2] < 0) & (ddt[1:-1] < 0) & (ddt[2:] < 0) &
(np.arange(len(ddt)-2) > fit_start + min_points)
)[0][0]
# until_from_curvature = np.where(
# (dddt[:-2] < 0)& (dddt[1:-1] < 0) & (dddt[2:] < 0) &
# (np.arange(len(dddt)-2) > fit_start + min_points)
# )[0][0]
# until = min(until, until_from_curvature)
except IndexError:
until = len(x)
if until == len(t):
until -= 1
exponent, amplitude = np.polyfit(t[fit_start:until],
np.log(x[fit_start:until]), 1)
# cannot extend anymore if all region is covered already:
if until >= len(x) - 1:
break
else:
min_points += 1
if matplotlib_axis:
matplotlib_axis.plot(signal[:int(t[until] + start)])
tplot = np.linspace(t[fit_start], t[until], 100)
matplotlib_axis.plot(start + tplot,
np.exp(amplitude + exponent * tplot),
color='darkorange', ls='--')
matplotlib_axis.axvline(start + t[fit_start], 0, 1, color='red')
matplotlib_axis.axvline(start + t[until], 0, 1, color='red')
matplotlib_axis.set_title(1 / exponent)
return 1 / exponent
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed May 17 14:28:52 2023
@author: operateur
"""
from tango import DeviceProxy
import matplotlib.pyplot as plt
import numpy as np
import h5py as hp
from fitting import fit_risetime
from scipy.signal import hilbert
from time import gmtime, strftime
from time import sleep
from chro import chroma_measurement, chro_analysis
def take_data_gd(window_width,
window_delay=10,
datasize=4,
file_name=None,
N_record=1,
chro_measurement=False):
"""
Growth damp measurement with some additonal data saved.
Parameters
----------
window_width : float
Delay generator window width in [us].
Window during which the FBT is off.
window_delay : float, optional
Delay generator window delay in [ns].
Delay of the window during which the FBT is off.
The default is 10.
datasize : int, optional
Data size of the BBFV3 device.
Increasing this value allows to save more turns.
Be careful when increasing this value, ~10 is a good value.
The default is 4.
file_name : str, optional
File name where the data is saved.
If None, is 'growth_damp_{time}.hdf5'
The default is None.
N_record : int, optional
Number of growth damp measurements to save.
The default is 1.
chro_measurement : bool, optional
If True, make de chromaticity measurement before the growth damp one.
The default is False.
Returns
-------
file_name : str
File name where the data is saved.
"""
if chro_measurement:
(delta, NuX, NuZ) = chroma_measurement()
chro_lin = chro_analysis(delta, NuX, NuZ, method="lin", plot=False)
chro_quad = chro_analysis(delta, NuX, NuZ, method="quad", plot=False)
print(f"Chro lin: {chro_lin}")
print(f"Chro quad: {chro_quad}")
remplissage = DeviceProxy("ANS/DG/REMPLISSAGE")
tuneX = DeviceProxy("ans/rf/bbfdataviewer.1-tunex")
tuneZ = DeviceProxy("ans/rf/bbfdataviewer.2-tunez")
phc_C02 = DeviceProxy("ans-c02/dg/phc-emit")
phc_C16 = DeviceProxy("ans-c16/dg/phc-emit")
total_current = np.round(np.sum(remplissage.bunchescurrent),1)
# File setup
if file_name is None:
time = strftime("%H_%M_%S",gmtime())
file_name = "growth_damp_" + str(total_current) + "mA_" + time + ".hdf5"
file = hp.File(file_name, "a")
file["filling_pattern_before"] = remplissage.bunchescurrent
file["tuneX_before"] = tuneX.Nu
file["tuneZ_before"] = tuneZ.Nu
file["emitX_C02_before"] = phc_C02.EmittanceH
file["emitX_C16_before"] = phc_C16.EmittanceH
file["emitZ_C02_before"] = phc_C02.EmittanceV
file["emitZ_C16_before"] = phc_C16.EmittanceV
if chro_measurement:
file["chro_delta"] = delta
file["chro_NuX"] = NuX
file["chro_NuZ"] = NuZ
file["chro_lin"] = chro_lin
file["chro_quad"] = chro_quad
file.close()
growth_damp(window_width=window_width,
window_delay=window_delay,
datasize=datasize,
file_name=file_name,
N_record=N_record)
file = hp.File(file_name, "a")
file["filling_pattern_after"] = remplissage.bunchescurrent
file["tuneX_after"] = tuneX.Nu
file["tuneZ_after"] = tuneZ.Nu
file["emitX_C02_after"] = phc_C02.EmittanceH
file["emitX_C16_after"] = phc_C16.EmittanceH
file["emitZ_C02_after"] = phc_C02.EmittanceV
file["emitZ_C16_after"] = phc_C16.EmittanceV
file.close()
return file_name
def growth_damp(window_width,
window_delay=10,
datasize=4,
file_name=None,
N_record=1):
"""
Growth damp core function.
Parameters
----------
window_width : float
Delay generator window width in [us].
Window during which the FBT is off.
window_delay : float, optional
Delay generator window delay in [ns].
Delay of the window during which the FBT is off.
The default is 10.
datasize : int, optional
Data size of the BBFV3 device.
Increasing this value allows to save more turns.
Be careful when increasing this value, ~10 is a good value.
The default is 4.
file_name : str, optional
File name where the data is saved.
If None, is 'growth_damp_{time}.hdf5'
The default is None.
N_record : int, optional
Number of growth damp measurements to save.
The default is 1.
Returns
-------
file_name : str
File name where the data is saved.
"""
# Measurement params
sleep_time_acquisition_on_off = 5 # second
sleep_time_delay_generator_change = 1 # second
h=416 # harmonic
# Needed devices
delay = DeviceProxy("ANS-C09/RF/FINE_DELAY_GENERATOR")
service_locker = DeviceProxy("ANS/CA/SERVICE-LOCKER-MATLAB")
tuneX = DeviceProxy("ans/rf/bbfdataviewer.1-tunex")
tuneZ = DeviceProxy("ans/rf/bbfdataviewer.2-tunez")
switch_FBT = DeviceProxy("ans/rf/switch_fbt")
if switch_FBT.fbt_number1 is True:
fbt_name = "ANS/RF/BBFV3-1"
elif switch_FBT.fbt_number2 is True:
fbt_name = "ANS/RF/BBFV3-2"
else:
ValueError("No FBT ?")
fbt = DeviceProxy(fbt_name)
if datasize != 4:
# Change data size
fbt.automaticacquisitionoff()
sleep(sleep_time_acquisition_on_off)
fbt.datasize = datasize
fbt.automaticacquisitionon()
sleep(sleep_time_acquisition_on_off)
# Activate delay channel A
delay.activateChannelA = True
# Stop MNO
if (service_locker.isTunexFBRunning) or (service_locker.isTunezFBRunning):
raise ValueError("Tune FB is running, MNO can not be stopped.")
else:
selMNOsource_initial = fbt.selMNOsource
fbt.selMNOsource = 0 # no MNO
# File setup
if file_name is None:
time = strftime("%H_%M_%S",gmtime())
file_name = "growth_damp_" + time + ".hdf5"
file = hp.File(file_name, "a")
file["window_delay"] = window_delay
file["window_width"] = window_width
file["datasize"] = datasize
file["N_record"] = N_record
N_data = len(np.float64(fbt.selectedBunchDataADC0))
for j in range(N_record):
file.create_dataset(f"raw_{j}", (h, N_data), dtype="float")
# Measurment
delay.pulseDelayChannelA = window_delay
delay.pulseWidthChannelA = window_width
sleep(sleep_time_delay_generator_change)
for j in range(N_record):
if j > 0:
fbt.AutomaticAcquisitionON()
sleep(sleep_time_acquisition_on_off)
fbt.automaticacquisitionoff()
sleep(sleep_time_acquisition_on_off)
for i in range(h):
fbt.selectedBunchNumberADC0 = i
data = np.float64(fbt.selectedBunchDataADC0)
file[f"raw_{j}"][i,:] = data
# Get back to nominal
delay.activateChannelA = False
fbt.selMNOsource = selMNOsource_initial
if datasize != 4:
fbt.datasize = 4 # default value for operation
fbt.AutomaticAcquisitionON()
sleep(sleep_time_acquisition_on_off)
# Tune devices do not work for datasize != 4, so init is needed
tuneZ.init()
tuneX.init()
file.close()
return file_name
def _avg_mean(signal, window):
N=len(signal)
data = np.zeros((N-window,))
for i in range(N-window):
data[i] = np.mean(signal[i:window+i])
return data
def growth_damp_analysis(file_name,
dataset_name="rise_time",
avg_window=30,
min_level_select=50,
smoothing_window_size=10,
min_level_factor=1.8,
min_points=3,
min_n_risetimes=0.5,
save_steps=False,
bunch_list=None):
file = hp.File(file_name, "a")
h=416
if save_steps:
N_data = len(file["raw"][0,:])
file.create_dataset("mean_removed", (h, N_data), dtype="float")
file.create_dataset("hilbert", (h, N_data), dtype="float")
file.create_dataset("data_win", (h, N_data-avg_window), dtype="float")
file.create_dataset(dataset_name, (h,), dtype="float")
if bunch_list is None:
bunch_list = range(h)
for i in bunch_list:
data = np.array(file["raw"][i,:])
mean_removed = data - np.mean(data)
hilbert_transfo = np.abs(hilbert(mean_removed))
data_win = _avg_mean(hilbert_transfo, avg_window)
if save_steps:
file["mean_removed"][i,:] = mean_removed
file["hilbert"][i,:] = hilbert_transfo
file["data_win"][i,:] = data_win
min_level = np.min([np.mean(data_win[:min_level_select]),np.mean(data_win[-min_level_select:])])
exp = fit_risetime(signal=data_win,
smoothing_window_size=smoothing_window_size,
min_level=min_level*min_level_factor,
until=None,
start_from_0=False,
min_points=min_points,
min_n_risetimes=min_n_risetimes,
matplotlib_axis=None)
file[dataset_name][i] = exp
file.close()
def plot_raw(file_name,
bunch_list=np.arange(10)):
file = hp.File(file_name, "a")
plt.figure()
plt.grid()
for i in bunch_list:
plt.plot(file["raw"][i,:],label="bunch = "+str(i))
plt.legend()
file.close()
def plot_risetime(file_name,
dataset_name="rise_time",
max_rise=5e4):
file = hp.File(file_name, "a")
h=416
rise_time = np.array(file[dataset_name])
file.close()
idx = rise_time < 0
rise_time[idx] = np.nan
idx = rise_time > max_rise
rise_time[idx] = np.nan
plt.figure()
plt.grid()
plt.plot(np.arange(h),rise_time)
plt.xlabel("Bunch index")
plt.ylabel("Rise time [turns]")
plt.title(f"mean={np.nanmean(rise_time)}, std={np.nanstd(rise_time)}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment