# -*- coding: utf-8 -*-
"""
Module for plotting the data recorded by the monitor module during the
tracking.
"""
import random
import h5py as hp
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from scipy.stats import gmean
[docs]def plot_beamdata(filenames,
dataset="mean",
dimension="tau",
stat_var="mean",
x_var="time",
turn=None,
legend=None):
"""
Plot 2D data recorded by BeamMonitor.
Parameters
----------
filenames : str or list of str
Names of the HDF5 files to be plotted.
dataset : {"current","emit","mean","std","cs_invariant"}
HDF5 file's dataset to be plotted. The default is "mean".
dimension : str
The dimension of the dataset to plot:
for "emit", dimension = {"x","y","s"},
for "cs_invariant", dimension = {"x","y","s"},
for "mean" and "std", dimension = {"x","xp","y","yp","tau","delta"}.
not used if "current".
The default is "tau".
stat_var : {"mean", "std"}
Statistical value of the dimension.
x_var : {"time", "index"}
Choice of the horizontal axis:
"time" corresponds to turn number.
"index" corresponds to bunch index.
turn : int or float, optional
Choice of the turn to plot when x_var = "index".
If None, the last turn is plotted.
legend : list of str, optional
Legend to add for each file.
Return
------
fig : Figure
Figure object with the plot on it.
"""
if isinstance(filenames, str):
filenames = [filenames]
fig, ax = plt.subplots()
for filename in filenames:
file = hp.File(filename, "r")
time = np.array(file["Beam"]["time"])
data = np.array(file["Beam"][dataset])
if x_var == "time":
x = time
x_label = "Number of turns"
bunch_index = (file["Beam"]["current"][:, 0] != 0).nonzero()[0]
if dataset == "current":
y = np.nansum(data[bunch_index, :], 0) * 1e3
y_label = "Total current (mA)"
elif dataset == "emit":
dimension_dict = {"x": 0, "y": 1, "s": 2}
axis = dimension_dict[dimension]
label = [
"$\\epsilon_{x}$ (m.rad)", "$\\epsilon_{y}$ (m.rad)",
"$\\epsilon_{s}$ (s)"
]
if stat_var == "mean":
y = np.nanmean(data[axis, bunch_index, :], 0)
elif stat_var == "std":
y = np.nanstd(data[axis, bunch_index, :], 0)
y_label = stat_var + " " + label[axis]
elif dataset == "cs_invariant":
dimension_dict = {"x": 0, "y": 1, "s": 2}
axis = dimension_dict[dimension]
label = ['$J_x$ (m)', '$J_y$ (m)', '$J_s$ (s)']
if stat_var == "mean":
y = np.nanmean(data[axis, bunch_index, :], 0)
elif stat_var == "std":
y = np.nanstd(data[axis, bunch_index, :], 0)
y_label = stat_var + " " + label[axis]
elif dataset == "mean" or dataset == "std":
dimension_dict = {
"x": 0,
"xp": 1,
"y": 2,
"yp": 3,
"tau": 4,
"delta": 5
}
axis = dimension_dict[dimension]
scale = [1e6, 1e6, 1e6, 1e6, 1e12, 1]
label = [
"x (um)", "x' ($\\mu$rad)", "y (um)", "y' ($\\mu$rad)",
"$\\tau$ (ps)", "$\\delta$"
]
if stat_var == "mean":
y = np.nanmean(data[axis, bunch_index, :], 0) * scale[axis]
elif stat_var == "std":
y = np.nanstd(data[axis, bunch_index, :], 0) * scale[axis]
label_sup = {"mean": "mean of ", "std": "std of "}
y_label = label_sup[stat_var] + dataset + " " + label[axis]
elif x_var == "index":
h = len(file["Beam"]["mean"][0, :, 0])
x = np.arange(h)
x_label = "Bunch index"
if turn is None:
idx = -1
else:
idx = np.where(time == int(turn))[0]
if (idx.size == 0):
raise ValueError("Turn is not valid.")
if dataset == "current":
y = data[:, idx] * 1e3
y_label = "Bunch current (mA)"
elif dataset == "emit":
dimension_dict = {"x": 0, "y": 1, "s": 2}
axis = dimension_dict[dimension]
label = [
"$\\epsilon_{x}$ (m.rad)", "$\\epsilon_{y}$ (m.rad)",
"$\\epsilon_{s}$ (s)"
]
y = data[axis, :, idx]
y_label = label[axis]
elif dataset == "cs_invariant":
dimension_dict = {"x": 0, "y": 1, "s": 2}
axis = dimension_dict[dimension]
label = ['$J_x$ (m)', '$J_y$ (m)', '$J_s$ (s)']
y = data[axis, :, idx]
y_label = label[axis]
elif dataset == "mean" or dataset == "std":
dimension_dict = {
"x": 0,
"xp": 1,
"y": 2,
"yp": 3,
"tau": 4,
"delta": 5
}
axis = dimension_dict[dimension]
scale = [1e6, 1e6, 1e6, 1e6, 1e12, 1]
label = [
"x (um)", "x' ($\\mu$rad)", "y (um)", "y' ($\\mu$rad)",
"$\\tau$ (ps)", "$\\delta$"
]
y = data[axis, :, idx] * scale[axis]
y_label = dataset + " " + label[axis]
else:
raise ValueError("x_var should be time or index")
y = np.squeeze(y)
ax.plot(x, y)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
if legend is not None:
plt.legend(legend)
file.close()
return fig
[docs]def streak_beamdata(filename, dataset="mean", dimension="tau", cm_lim=None):
"""
Plot 3D data recorded by BeamMonitor.
Parameters
----------
filename : str
Name of the HDF5 file that contains the data.
dataset : {"current","emit","mean","std","cs_invariant"}
HDF5 file's dataset to be plotted. The default is "mean".
dimension : str
The dimension of the dataset to plot:
for "emit", dimension = {"x","y","s"},
for "cs_invariant", dimension = {"x","y","s"},
for "mean" and "std", dimension = {"x","xp","y","yp","tau","delta"}.
not used if "current".
The default is "tau".
cm_lim : list [vmin, vmax], optional
Colormap scale for the "streak" plot.
Return
------
fig : Figure
Figure object with the plot on it.
"""
file = hp.File(filename, "r")
data = file["Beam"]
time = np.array(data["time"])
h = len(data["mean"][0, :, 0])
x = np.arange(h)
x_label = "Bunch index"
y = time
y_label = "Number of turns"
if dataset == "current":
z = (np.array(data["current"]) * 1e3).T
z_label = "Bunch current (mA)"
title = z_label
elif dataset == "emit":
dimension_dict = {"x": 0, "y": 1, "s": 2}
axis = dimension_dict[dimension]
label = [
"$\\epsilon_{x}$ (m.rad)", "$\\epsilon_{y}$ (m.rad)",
"$\\epsilon_{s}$ (s)"
]
z = np.array(data["emit"][axis, :, :]).T
z_label = label[axis]
title = z_label
elif dataset == "cs_invariant":
dimension_dict = {"x": 0, "y": 1, "s": 2}
axis = dimension_dict[dimension]
label = ['$J_x$ (m)', '$J_y$ (m)', '$J_s$ (s)']
z = np.array(data["cs_invariant"][axis, :, :]).T
z_label = label[axis]
title = z_label
else:
dimension_dict = {
"x": 0,
"xp": 1,
"y": 2,
"yp": 3,
"tau": 4,
"delta": 5
}
axis = dimension_dict[dimension]
scale = [1e6, 1e6, 1e6, 1e6, 1e12, 1]
label = [
"x (um)", "x' ($\\mu$rad)", "y (um)", "y' ($\\mu$rad)",
"$\\tau$ (ps)", "$\\delta$"
]
z = np.array(data[dataset][axis, :, :]).T * scale[axis]
z_label = label[axis]
if dataset == "mean":
title = label[axis] + " CM"
elif dataset == "std":
title = label[axis] + " RMS"
fig, ax = plt.subplots()
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
ax.set_title(title)
if dataset == "mean":
cmap = mpl.cm.coolwarm # diverging
else:
cmap = mpl.cm.inferno # sequential
c = ax.imshow(z,
cmap=cmap,
origin='lower',
aspect='auto',
extent=[x.min(), x.max(), y.min(),
y.max()])
if cm_lim is not None:
c.set_clim(vmin=cm_lim[0], vmax=cm_lim[1])
cbar = fig.colorbar(c, ax=ax)
cbar.set_label(z_label)
file.close()
return fig
[docs]def plot_bunchdata(filenames,
bunch_number,
dataset,
dimension="x",
legend=None):
"""
Plot data recorded by BunchMonitor.
Parameters
----------
filenames : str or list of str
Names of the HDF5 files to be plotted.
bunch_number : int or list of int
Bunch to plot. This has to be identical to 'bunch_number' parameter in
'BunchMonitor' object.
dataset : {"current", "emit", "mean", "std", "cs_invariant"}
HDF5 file's dataset to be plotted.
dimension : str, optional
The dimension of the dataset to plot. Use "None" for "current",
otherwise use the following :
for "emit", dimension = {"x","y","s"},
for "cs_invariant", dimension = {"x","y","s"},
for "mean" and "std", dimension = {"x","xp","y","yp","tau","delta"},
for "action", dimension = {"x","y"}.
legend : list of str, optional
Legend to add for each file.
Return
------
fig : Figure
Figure object with the plot on it.
"""
if isinstance(filenames, str):
filenames = [filenames]
if isinstance(bunch_number, int):
ll = []
for i in range(len(filenames)):
ll.append(bunch_number)
bunch_number = ll
fig, ax = plt.subplots()
for i, filename in enumerate(filenames):
file = hp.File(filename, "r")
group = "BunchData_{0}".format(
bunch_number[i]) # Data group of the HDF5 file
if dataset == "current":
y_var = file[group][dataset][:] * 1e3
label = "current (mA)"
elif dataset == "emit":
dimension_dict = {"x": 0, "y": 1, "s": 2}
y_var = file[group][dataset][dimension_dict[dimension]]
if dimension == "x": label = "hor. emittance (m.rad)"
elif dimension == "y": label = "ver. emittance (m.rad)"
elif dimension == "s": label = "long. emittance (s)"
elif dataset == "mean" or dataset == "std":
dimension_dict = {
"x": 0,
"xp": 1,
"y": 2,
"yp": 3,
"tau": 4,
"delta": 5
}
scale = [1e6, 1e6, 1e6, 1e6, 1e12, 1]
axis_index = dimension_dict[dimension]
y_var = file[group][dataset][axis_index] * scale[axis_index]
if dataset == "mean":
label_list = [
"x ($\\mu$m)", "x' ($\\mu$rad)", "y ($\\mu$m)",
"y' ($\\mu$rad)", "$\\tau$ (ps)", "$\\delta$"
]
else:
label_list = [
"$\\sigma_x$ ($\\mu$m)", "$\\sigma_{x'}$ ($\\mu$rad)",
"$\\sigma_y$ ($\\mu$m)", "$\\sigma_{y'}$ ($\\mu$rad)",
"$\\sigma_{\\tau}$ (ps)", "$\\sigma_{\\delta}$"
]
label = label_list[axis_index]
elif dataset == "cs_invariant":
dimension_dict = {"x": 0, "y": 1, "s": 2}
axis_index = dimension_dict[dimension]
y_var = file[group][dataset][axis_index]
label_list = ['$J_x$ (m)', '$J_y$ (m)', '$J_s$ (s)']
label = label_list[axis_index]
x_axis = file[group]["time"][:]
xlabel = "Number of turns"
ax.plot(x_axis, y_var)
ax.set_xlabel(xlabel)
ax.set_ylabel(label)
if legend is not None:
plt.legend(legend)
file.close()
return fig
[docs]def plot_phasespacedata(filename,
bunch_number,
x_var,
y_var,
turn,
only_alive=True,
plot_size=1,
plot_kind='kde'):
"""
Plot data recorded by PhaseSpaceMonitor.
Parameters
----------
filename : str
Name of the HDF5 file that contains the data.
bunch_number : int
Bunch to plot. This has to be identical to 'bunch_number' parameter in
'PhaseSpaceMonitor' object.
x_var, y_var : str {"x", "xp", "y", "yp", "tau", "delta"}
If dataset is "particles", the variables to be plotted on the
horizontal and the vertical axes need to be specified.
turn : int
Turn at which the data will be extracted.
only_alive : bool, optional
When only_alive is True, only alive particles are plotted and dead
particles will be discarded.
plot_size : [0,1], optional
Number of macro-particles to plot relative to the total number
of macro-particles recorded. This option helps reduce processing time
when the data is big.
plot_kind : {'scatter', 'kde', 'hex', 'reg', 'resid'}, optional
The plot style. The default is 'kde'.
Return
------
fig : Figure
Figure object with the plot on it.
"""
file = hp.File(filename, "r")
group = "PhaseSpaceData_{0}".format(bunch_number)
dataset = "particles"
option_dict = {"x": 0, "xp": 1, "y": 2, "yp": 3, "tau": 4, "delta": 5}
scale = [1e3, 1e3, 1e3, 1e3, 1e12, 1]
label = [
"x (mm)", "x' (mrad)", "y (mm)", "y' (mrad)", "$\\tau$ (ps)",
"$\\delta$"
]
# find the index of "turn" in time array
turn_index = np.where(file[group]["time"][:] == turn)
if len(turn_index[0]) == 0:
raise ValueError("Turn {0} is not found. Enter turn from {1}.".format(
turn, file[group]["time"][:]))
path = file[group][dataset]
mp_number = path[:, 0, 0].size
if only_alive is True:
data = np.array(file[group]["alive"])
index = np.where(data[:, turn_index])[0]
else:
index = np.arange(mp_number)
if plot_size == 1:
samples = index
elif plot_size < 1:
samples_meta = random.sample(list(index), int(plot_size * mp_number))
samples = sorted(samples_meta)
else:
raise ValueError("plot_size must be in range [0,1].")
# format : sns.jointplot(x_axis, yaxis, kind)
x_axis = path[samples, option_dict[x_var], turn_index[0][0]]
y_axis = path[samples, option_dict[y_var], turn_index[0][0]]
fig = sns.jointplot(x_axis * scale[option_dict[x_var]],
y_axis * scale[option_dict[y_var]],
kind=plot_kind)
plt.xlabel(label[option_dict[x_var]])
plt.ylabel(label[option_dict[y_var]])
file.close()
return fig
[docs]def plot_profiledata(filename,
bunch_number,
dimension="tau",
start=0,
stop=None,
step=None,
profile_plot=True,
streak_plot=True):
"""
Plot data recorded by ProfileMonitor
Parameters
----------
filename : str
Name of the HDF5 file that contains the data.
bunch_number : int
Bunch to plot. This has to be identical to 'bunch_number' parameter in
'ProfileMonitor' object.
dimension : str, optional
Dimension to plot. The default is "tau"
start : int, optional
First turn to plot. The default is 0.
stop : int, optional
Last turn to plot. If None, the last turn of the record is selected.
step : int, optional
Plotting step. This has to be divisible by 'save_every' parameter in
'ProfileMonitor' object, i.e. step % save_every == 0. If None, step is
equivalent to save_every.
profile_plot : bool, optional
If Ture, bunch profile plot is plotted.
streak_plot : bool, optional
If True, strek plot is plotted.
Returns
-------
fig : Figure
Figure object with the plot on it.
"""
file = hp.File(filename, "r")
path = file['ProfileData_{0}'.format(bunch_number)]
l_bound = np.array(path["{0}_bin".format(dimension)])
data = np.array(path[dimension])
time = np.array(path["time"])
if stop is None:
stop = time[-1]
elif stop not in time:
raise ValueError("stop not found. Choose from {0}".format(time[:]))
if start not in time:
raise ValueError("start not found. Choose from {0}".format(time[:]))
save_every = time[1] - time[0]
if step is None:
step = save_every
if step % save_every != 0:
raise ValueError("step must be divisible by the recording step "
"which is {0}.".format(save_every))
dimension_dict = {"x": 0, "xp": 1, "y": 2, "yp": 3, "tau": 4, "delta": 5}
scale = [1e6, 1e6, 1e6, 1e6, 1e12, 1]
label = [
"x (um)", "x' ($\\mu$rad)", "y (um)", "y' ($\\mu$rad)", "$\\tau$ (ps)",
"$\\delta$"
]
num = int((stop-start) / step)
n_bin = len(data[:, 0])
start_index = np.where(time[:] == start)[0][0]
x_var = np.zeros((num + 1, n_bin))
turn_index_array = np.zeros((num + 1, ), dtype=int)
for i in range(num + 1):
turn_index = int(start_index + i*step/save_every)
turn_index_array[i] = turn_index
# construct an array of bin mids
x_var[i, :] = l_bound[:, turn_index]
if profile_plot is True:
fig, ax = plt.subplots()
for i in range(num + 1):
ax.plot(x_var[i] * scale[dimension_dict[dimension]],
data[:, turn_index_array[i]],
label="turn {0}".format(time[turn_index_array[i]]))
ax.set_xlabel(label[dimension_dict[dimension]])
ax.set_ylabel("number of macro-particles")
ax.legend()
if streak_plot is True:
turn = np.reshape(time[turn_index_array], (num + 1, 1))
y_var = np.ones((num + 1, n_bin)) * turn
z_var = np.transpose(data[:, turn_index_array])
fig2, ax2 = plt.subplots()
cmap = mpl.cm.inferno # sequential
c = ax2.imshow(z_var,
cmap=cmap,
origin='lower',
aspect='auto',
extent=[
x_var.min() * scale[dimension_dict[dimension]],
x_var.max() * scale[dimension_dict[dimension]],
y_var.min(),
y_var.max()
])
ax2.set_xlabel(label[dimension_dict[dimension]])
ax2.set_ylabel("Number of turns")
cbar = fig2.colorbar(c, ax=ax2)
cbar.set_label("Number of macro-particles")
file.close()
if profile_plot is True and streak_plot is True:
return fig, fig2
elif profile_plot is True:
return fig
elif streak_plot is True:
return fig2
[docs]def plot_wakedata(filename,
bunch_number,
wake_type="Wlong",
start=0,
stop=None,
step=None,
profile_plot=False,
streak_plot=True,
bunch_profile=False,
dipole=False):
"""
Plot data recorded by WakePotentialMonitor
Parameters
----------
filename : str
Name of the HDF5 file that contains the data.
bunch_number : int
Bunch to plot. This has to be identical to 'bunch_number' parameter in
'WakePotentialMonitor' object.
wake_type : str, optional
Wake type to plot: "Wlong", "Wxdip", ...
start : int, optional
First turn to plot. The default is 0.
stop : int, optional
Last turn to plot. If None, the last turn of the record is selected.
step : int, optional
Plotting step. This has to be divisible by 'save_every' parameter in
'WakePotentialMonitor' object, i.e. step % save_every == 0. If None,
step is equivalent to save_every.
profile_plot : bool, optional
If Ture, wake potential profile plot is plotted.
streak_plot : bool, optional
If True, strek plot is plotted.
bunch_profile : bool, optional.
If True, the bunch profile is plotted.
dipole : bool, optional
If True, the dipole moment is plotted.
Returns
-------
fig : Figure
Figure object with the plot on it.
"""
file = hp.File(filename, "r")
path = file['WakePotentialData_{0}'.format(bunch_number)]
time = np.array(path["time"])
if stop is None:
stop = time[-1]
elif stop not in time:
raise ValueError("stop not found. Choose from {0}".format(time[:]))
if start not in time:
raise ValueError("start not found. Choose from {0}".format(time[:]))
save_every = time[1] - time[0]
if step is None:
step = save_every
if step % save_every != 0:
raise ValueError("step must be divisible by the recording step "
"which is {0}.".format(save_every))
dimension_dict = {
"Wlong": 0,
"Wxdip": 1,
"Wydip": 2,
"Wxquad": 3,
"Wyquad": 4
}
scale = [1e-12, 1e-12, 1e-12, 1e-15, 1e-15]
label = [
"$W_p$ (V/pC)", "$W_{p,x}^D (V/pC)$", "$W_{p,y}^D (V/pC)$",
"$W_{p,x}^Q (V/pC/mm)$", "$W_{p,y}^Q (V/pC/mm)$"
]
if bunch_profile == True:
tau_name = "tau_" + wake_type
wake_type = "profile_" + wake_type
dimension_dict = {wake_type: 0}
scale = [1]
label = ["$\\rho$ (a.u.)"]
cmap = mpl.cm.inferno # sequential
elif dipole == True:
tau_name = "tau_" + wake_type
wake_type = "dipole_" + wake_type
dimension_dict = {wake_type: 0}
scale = [1]
label = ["Dipole moment (m)"]
cmap = mpl.cm.coolwarm # diverging
else:
tau_name = "tau_" + wake_type
cmap = mpl.cm.coolwarm # diverging
data = np.array(path[wake_type])
num = int((stop-start) / step)
n_bin = len(data[:, 0])
start_index = np.where(time[:] == start)[0][0]
x_var = np.zeros((num + 1, n_bin))
turn_index_array = np.zeros((num + 1, ), dtype=int)
for i in range(num + 1):
turn_index = int(start_index + i*step/save_every)
turn_index_array[i] = turn_index
# construct an array of bin mids
x_var[i, :] = np.array(path[tau_name])[:, turn_index]
if profile_plot is True:
fig, ax = plt.subplots()
for i in range(num + 1):
ax.plot(x_var[i] * 1e12,
data[:, turn_index_array[i]] *
scale[dimension_dict[wake_type]],
label="turn {0}".format(time[turn_index_array[i]]))
ax.set_xlabel("$\\tau$ (ps)")
ax.set_ylabel(label[dimension_dict[wake_type]])
ax.legend()
if streak_plot is True:
turn = np.reshape(time[turn_index_array], (num + 1, 1))
y_var = np.ones((num + 1, n_bin)) * turn
z_var = np.transpose(data[:, turn_index_array] *
scale[dimension_dict[wake_type]])
fig2, ax2 = plt.subplots()
c = ax2.imshow(z_var,
cmap=cmap,
origin='lower',
aspect='auto',
extent=[
x_var.min() * 1e12,
x_var.max() * 1e12,
y_var.min(),
y_var.max()
])
ax2.set_xlabel("$\\tau$ (ps)")
ax2.set_ylabel("Number of turns")
cbar = fig2.colorbar(c, ax=ax2)
cbar.set_label(label[dimension_dict[wake_type]])
file.close()
if profile_plot is True and streak_plot is True:
return fig, fig2
elif profile_plot is True:
return fig
elif streak_plot is True:
return fig2
[docs]def plot_bunchspectrum(filenames,
bunch_number,
dataset="incoherent",
dim="tau",
turns=None,
fs=None,
log_scale=True,
legend=None,
norm=False):
"""
Plot coherent and incoherent spectrum data.
Parameters
----------
filenames : str or list of str
Names of the HDF5 files to be plotted.
bunch_number : int or list of int
Bunch to plot. This has to be identical to 'bunch_number' parameter in
'BunchSpectrumMonitor' object.
dataset : {"mean_incoherent", "coherent", "incoherent"}
HDF5 file's dataset to be plotted.
The default is "incoherent".
dim : {"x","y","tau"}, optional
The dimension of the dataset to plot.
The default is "tau".
turns : array or None, optional
Numbers of the turns to plot.
If None, all turns are shown.
The default is None.
fs : float or None, optional
If not None, the frequency axis is noramlised by fs.
The default is None.
log_scale : bool, optional
If True, the spectrum plots are shown in y-log scale.
The default is True.
legend : list of str, optional
Legend to add for each file.
The default is None.
norm : bool, optional
If True, normalise the data of each spectrum by its geometric mean.
The default is False.
Return
------
fig : Figure
"""
if isinstance(filenames, str):
filenames = [filenames]
if isinstance(bunch_number, int):
ll = []
for i in range(len(filenames)):
ll.append(bunch_number)
bunch_number = ll
fig, ax = plt.subplots()
for i, filename in enumerate(filenames):
file = hp.File(filename, "r")
group = file["BunchSpectrum_{0}".format(bunch_number[i])]
time = np.array(group["time"])
freq = np.array(group["freq"])
dim_dict = {"x": 0, "y": 1, "tau": 2}
if dataset == "mean_incoherent":
y_var = group["mean_incoherent"][dim_dict[dim], :]
y_err = group["std_incoherent"][dim_dict[dim], :]
ax.errorbar(time, y_var, y_err)
xlabel = "Turn number"
ylabel = "Mean incoherent frequency [Hz]"
elif dataset == "incoherent" or dataset == "coherent":
if turns is None:
turn_index = np.where(time == time)[0]
else:
turn_index = []
for turn in turns:
idx = np.where(time == turn)[0][0]
turn_index.append(idx)
turn_index = np.array(turn_index)
if fs is None:
x_var = freq
xlabel = "Frequency [Hz]"
else:
x_var = freq / fs
xlabel = r"$f/f_{s}$"
for idx in turn_index:
y_var = group[dataset][dim_dict[dim], :, idx]
if norm is True:
y_var = y_var / gmean(y_var)
ax.plot(x_var, y_var)
if log_scale is True:
ax.set_yscale('log')
ylabel = "FFT amplitude [a.u.]"
if dataset == "incoherent":
ax.set_title("Incoherent spectrum")
elif dataset == "coherent":
ax.set_title("Coherent spectrum")
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
if legend is not None:
plt.legend(legend)
file.close()
return fig
[docs]def streak_bunchspectrum(filename,
bunch_number,
dataset="incoherent",
dim="tau",
fs=None,
log_scale=True,
fmin=None,
fmax=None,
turns=None,
norm=False,
ylim=None):
"""
Plot 3D data recorded by the BunchSpectrumMonitor.
Parameters
----------
filenames : str
Name of the HDF5 file to be plotted.
bunch_number : int
Bunch to plot. This has to be identical to 'bunch_number' parameter in
'BunchSpectrumMonitor' object.
dataset : {"coherent", "incoherent"}
HDF5 file's dataset to be plotted.
The default is "incoherent".
dim : {"x","y","tau"}, optional
The dimension of the dataset to plot.
The default is "tau".
fs : float or None, optional
If not None, the frequency axis is noramlised by fs.
log_scale : bool, optional
If True, the spectrum plots are shown in y-log scale.
The default is True.
fmin : float, optional
If not None, the plot is limitted to values bigger than fmin.
fmax : float, optional
If not None, the plot is limitted to values smaller than fmax.
turns : array, optional
If not None, only the turn numbers in the turns array are plotted.
norm : bool, optional
If True, normalise the data of each spectrum by its geometric mean.
The default is False.
ylim : array, optional
If not None, should be array like in the form [ymin, ymax] where ymin
and ymax are the minimum and maxmimum values used in the y axis.
Returns
-------
fig : Figure
"""
file = hp.File(filename, "r")
group = file["BunchSpectrum_{0}".format(bunch_number)]
time = np.array(group["time"])
freq = np.array(group["freq"])
dim_dict = {"x": 0, "y": 1, "tau": 2}
if turns is None:
turn_index = np.where(time == time)[0]
if ylim is None:
tmin = time.min()
tmax = time.max()
else:
tmin = ylim[0]
tmax = ylim[1]
else:
tmin = turns.min()
tmax = turns.max()
turn_index = []
for turn in turns:
idx = np.where(time == turn)[0][0]
turn_index.append(idx)
turn_index = np.array(turn_index)
data = group[dataset][dim_dict[dim], :, turn_index]
if log_scale is True:
option = mpl.colors.LogNorm()
else:
option = None
if fs is None:
x_var = freq
xlabel = "Frequency [Hz]"
else:
x_var = freq / fs
xlabel = r"$f/f_{s}$"
if fmin is None:
fmin = x_var.min()
if fmax is None:
fmax = x_var.max()
ind = (x_var > fmin) & (x_var < fmax)
x_var = x_var[ind]
data = data[ind, :]
if norm is True:
data = data / gmean(data)
if ylim is None:
ylabel = "Turn number"
else:
ylabel = ""
fig, ax = plt.subplots()
if dataset == "incoherent":
ax.set_title("Incoherent spectrum")
elif dataset == "coherent":
ax.set_title("Coherent spectrum")
cmap = mpl.cm.inferno # sequential
c = ax.imshow(data.T,
cmap=cmap,
origin='lower',
aspect='auto',
extent=[x_var.min(), x_var.max(), tmin, tmax],
norm=option,
interpolation="none")
cbar = fig.colorbar(c, ax=ax)
cbar.ax.set_ylabel("FFT amplitude [a.u.]", rotation=270)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
return fig
[docs]def plot_beamspectrum(filenames,
dim="tau",
turns=None,
f0=None,
log_scale=True,
legend=None,
norm=False):
"""
Plot coherent beam spectrum data.
Parameters
----------
filenames : str or list of str
Names of the HDF5 files to be plotted.
dim : {"x","y","tau"}, optional
The dimension of the dataset to plot.
The default is "tau".
turns : array or None, optional
Numbers of the turns to plot.
If None, all turns are shown.
The default is None.
f0 : float or None, optional
If not None, the frequency axis is noramlised by f0.
The default is None.
log_scale : bool, optional
If True, the spectrum plots are shown in y-log scale.
The default is True.
legend : list of str, optional
Legend to add for each file.
The default is None.
norm : bool, optional
If True, normalise the data of each spectrum by its geometric mean.
The default is False.
Return
------
fig : Figure
"""
if isinstance(filenames, str):
filenames = [filenames]
fig, ax = plt.subplots()
for i, filename in enumerate(filenames):
file = hp.File(filename, "r")
group = file["BeamSpectrum"]
dataset = "coherent"
time = np.array(group["time"])
freq = np.array(group["freq"])
dim_dict = {"x": 0, "y": 1, "tau": 2}
if turns is None:
turn_index = np.where(time == time)[0]
else:
turn_index = []
for turn in turns:
idx = np.where(time == turn)[0][0]
turn_index.append(idx)
turn_index = np.array(turn_index)
if f0 is None:
x_var = freq
xlabel = "Frequency [Hz]"
else:
x_var = freq / f0
xlabel = r"$f/f_{0}$"
for idx in turn_index:
y_var = group[dataset][dim_dict[dim], :, idx]
if norm is True:
y_var = y_var / gmean(y_var)
ax.plot(x_var, y_var)
if log_scale is True:
ax.set_yscale('log')
ylabel = "FFT amplitude [a.u.]"
ax.set_title("Beam coherent spectrum")
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
if legend is not None:
plt.legend(legend)
file.close()
return fig
[docs]def streak_beamspectrum(filename,
dim="tau",
f0=None,
log_scale=True,
fmin=None,
fmax=None,
turns=None,
norm=False,
ylim=None):
"""
Plot 3D data recorded by the BeamSpectrumMonitor.
Parameters
----------
filenames : str
Name of the HDF5 file to be plotted.
dim : {"x","y","tau"}, optional
The dimension of the dataset to plot.
The default is "tau".
f0 : float or None, optional
If not None, the frequency axis is noramlised by f0.
log_scale : bool, optional
If True, the spectrum plots are shown in y-log scale.
The default is True.
fmin : float, optional
If not None, the plot is limitted to values bigger than fmin.
fmax : float, optional
If not None, the plot is limitted to values smaller than fmax.
turns : array, optional
If not None, only the turn numbers in the turns array are plotted.
norm : bool, optional
If True, normalise the data of each spectrum by its geometric mean.
The default is False.
ylim : array, optional
If not None, should be array like in the form [ymin, ymax] where ymin
and ymax are the minimum and maxmimum values used in the y axis.
Returns
-------
fig : Figure
"""
file = hp.File(filename, "r")
group = file["BeamSpectrum"]
dataset = "coherent"
time = np.array(group["time"])
freq = np.array(group["freq"])
dim_dict = {"x": 0, "y": 1, "tau": 2}
if turns is None:
turn_index = np.where(time == time)[0]
if ylim is None:
tmin = time.min()
tmax = time.max()
else:
tmin = ylim[0]
tmax = ylim[1]
else:
tmin = turns.min()
tmax = turns.max()
turn_index = []
for turn in turns:
idx = np.where(time == turn)[0][0]
turn_index.append(idx)
turn_index = np.array(turn_index)
data = group[dataset][dim_dict[dim], :, turn_index]
if log_scale is True:
option = mpl.colors.LogNorm()
else:
option = None
if f0 is None:
x_var = freq
xlabel = "Frequency [Hz]"
else:
x_var = freq / f0
xlabel = r"$f/f_{0}$"
if fmin is None:
fmin = x_var.min()
if fmax is None:
fmax = x_var.max()
ind = (x_var > fmin) & (x_var < fmax)
x_var = x_var[ind]
data = data[ind, :]
if norm is True:
data = data / gmean(data)
if ylim is None:
ylabel = "Turn number"
else:
ylabel = ""
fig, ax = plt.subplots()
ax.set_title("Beam coherent spectrum")
cmap = mpl.cm.inferno # sequential
c = ax.imshow(data.T,
cmap=cmap,
origin='lower',
aspect='auto',
extent=[x_var.min(), x_var.max(), tmin, tmax],
norm=option,
interpolation="none")
cbar = fig.colorbar(c, ax=ax)
cbar.ax.set_ylabel("FFT amplitude [a.u.]", rotation=270)
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
return fig
[docs]def plot_cavitydata(filename,
cavity_name,
phasor="cavity",
plot_type="bunch",
bunch_number=0,
turn=None,
cm_lim=None,
show_objective=False):
"""
Plot data recorded by CavityMonitor.
Parameters
----------
filename : str
Name of the HDF5 file that contains the data.
cavity_name : str
Name of the CavityResonator object.
phasor : str, optional
Type of the phasor to plot. Can be "beam", "cavity", "generator" or
"ig".
plot_type : str, optional
Type of plot:
- "bunch" plots the phasor voltage and angle versus time for a
given bunch.
- "turn" plots the phasor voltage and ange versus bunch index for
a given turn.
- "streak_amplitude" plots the phasor amplitude versus bunch index
and time.
- "streak_angle" plots the phasor angle versus bunch index and
time.
- "detune" or "psi" plots the detuning or tuning angle versus time.
- "power" plots the generator, cavity, beam and reflected power
versus time. Needs BeamMonitor data with same save_every as
CavityMonitor.
- "mean" plots the mean phasor voltage and angle versus time for
non empty bunches. Needs BeamMonitor data with same save_every as
CavityMonitor.
- "tuner_diff" plots the tuner difference from optimal tuning
condition (for a given bunch) and the tuning angle versus time.
bunch_number : int, optional
Bunch number to select. The default is 0.
turn : int, optional
Turn to plot. The default is None.
cm_lim : list [vmin, vmax], optional
Colormap scale for the "streak" plots.
show_objective : bool, optional
If True, shows the Cavity voltage and phase objective value (Vc and
theta) in plot type "mean" and "bunch" for "cavity" phasor.
Default is False.
Returns
-------
fig : Figure
Figure object with the plot on it.
"""
file = hp.File(filename, "r")
cavity_data = file[cavity_name]
time = np.array(cavity_data["time"])
ph = {"cavity": 0, "beam": 1, "generator": 2, "ig": 3}
labels = ["Cavity", "Beam", "Generator", "Generator"]
units = [" voltage [MV]", " voltage [MV]", " voltage [MV]", " current [A]"]
units_val = [1e-6, 1e-6, 1e-6, 1]
if (plot_type == "bunch") or (plot_type == "mean"):
if plot_type == "bunch":
data = [
cavity_data["cavity_phasor_record"][bunch_number, :],
cavity_data["beam_phasor_record"][bunch_number, :],
cavity_data["generator_phasor_record"][bunch_number, :],
cavity_data["ig_phasor_record"][bunch_number, :]
]
elif plot_type == "mean":
try:
bunch_index = (file["Beam"]["current"][:, 0] != 0).nonzero()[0]
except:
ValueError("Beam monitor is needed to show mean voltage.")
data = [
np.mean(cavity_data["cavity_phasor_record"][bunch_index, :],
0),
np.mean(cavity_data["beam_phasor_record"][bunch_index, :], 0),
np.mean(cavity_data["generator_phasor_record"][bunch_index, :],
0),
np.mean(cavity_data["ig_phasor_record"][bunch_index, :], 0)
]
ylabel1 = labels[ph[phasor]] + units[ph[phasor]]
ylabel2 = labels[ph[phasor]] + " phase [rad]"
fig, ax = plt.subplots()
twin = ax.twinx()
p1, = ax.plot(time,
np.abs(data[ph[phasor]]) * units_val[ph[phasor]],
color="r",
label=ylabel1)
p2, = twin.plot(time,
np.angle(data[ph[phasor]]),
color="b",
label=ylabel2)
if show_objective:
o1, = ax.plot(time,
np.array(cavity_data["Vc"]) * 1e-6,
"r--",
label="Cavity voltage objective value [MV]")
o2, = twin.plot(time,
np.array(cavity_data["theta"]),
"b--",
label="Cavity phase objective value [rad]")
ax.set_xlabel("Turn number")
ax.set_ylabel(ylabel1)
twin.set_ylabel(ylabel2)
if show_objective:
plots = [p1, o1, p2, o2]
else:
plots = [p1, p2]
ax.legend(handles=plots, loc="best")
ax.yaxis.label.set_color("r")
twin.yaxis.label.set_color("b")
if plot_type == "turn":
index = np.array(time) == turn
if (index.size == 0):
raise ValueError("Turn is not valid.")
data = [
cavity_data["cavity_phasor_record"][:, index],
cavity_data["beam_phasor_record"][:, index],
cavity_data["generator_phasor_record"][:, index],
cavity_data["ig_phasor_record"][:, index]
]
h = len(data[0])
x = np.arange(h)
ylabel1 = labels[ph[phasor]] + units[ph[phasor]]
ylabel2 = labels[ph[phasor]] + " phase [rad]"
fig, ax = plt.subplots()
twin = ax.twinx()
p1, = ax.plot(x,
np.abs(data[ph[phasor]]) * units_val[ph[phasor]],
color="r",
label=ylabel1)
p2, = twin.plot(x,
np.angle(data[ph[phasor]]),
color="b",
label=ylabel2)
ax.set_xlabel("Bunch index")
ax.set_ylabel(ylabel1)
twin.set_ylabel(ylabel2)
plots = [p1, p2]
ax.legend(handles=plots, loc="best")
ax.yaxis.label.set_color("r")
twin.yaxis.label.set_color("b")
if plot_type == "streak_amplitude" or plot_type == "streak_phase":
data = [
cavity_data["cavity_phasor_record"][:, :],
cavity_data["beam_phasor_record"][:, :],
cavity_data["generator_phasor_record"][:, :],
cavity_data["ig_phasor_record"][:, :]
]
if plot_type == "streak_amplitude":
data = np.transpose(
np.abs(data[ph[phasor]]) * units_val[ph[phasor]])
ylabel = labels[ph[phasor]] + units[ph[phasor]]
cmap = mpl.cm.coolwarm # diverging
elif plot_type == "streak_phase":
data = np.transpose(np.angle(data[ph[phasor]]))
ylabel = labels[ph[phasor]] + " phase [rad]"
cmap = mpl.cm.coolwarm # diverging
fig, ax = plt.subplots()
c = ax.imshow(data, cmap=cmap, origin='lower', aspect='auto')
if cm_lim is not None:
c.set_clim(vmin=cm_lim[0], vmax=cm_lim[1])
ax.set_xlabel("Bunch index")
ax.set_ylabel("Number of turns")
cbar = fig.colorbar(c, ax=ax)
cbar.set_label(ylabel)
if plot_type == "detune" or plot_type == "psi":
fig, ax = plt.subplots()
if plot_type == "detune":
data = np.array(cavity_data["detune"]) * 1e-3
ylabel = r"Detuning $\Delta f$ [kHz]"
elif plot_type == "psi":
data = np.array(cavity_data["psi"])
ylabel = r"Tuning angle $\psi$"
ax.plot(time, data)
ax.set_xlabel("Number of turns")
ax.set_ylabel(ylabel)
if plot_type == "power":
Vc = np.mean(np.abs(cavity_data["cavity_phasor_record"]), 0)
theta = np.mean(np.angle(cavity_data["cavity_phasor_record"]), 0)
try:
bunch_index = (file["Beam"]["current"][:, 0] != 0).nonzero()[0]
I0 = np.nansum(file["Beam"]["current"][bunch_index, :], 0)
except:
print("Beam monitor is needed to compute power.")
Rs = np.array(cavity_data["Rs"])
Pc = Vc**2 / (2*Rs)
Pb = I0 * Vc * np.cos(theta)
Pg = np.array(cavity_data["Pg"])
Pr = Pg - Pb - Pc
fig, ax = plt.subplots()
ax.plot(time, Pg * 1e-3, label="Generator power $P_g$ [kW]")
ax.plot(time, Pb * 1e-3, label="Beam power $P_b$ [kW]")
ax.plot(time, Pc * 1e-3, label="Dissipated cavity power $P_c$ [kW]")
ax.plot(time, Pr * 1e-3, label="Reflected power $P_r$ [kW]")
ax.set_xlabel("Number of turns")
ax.set_ylabel("Power [kW]")
plt.legend()
if plot_type == "tuner_diff":
data0 = np.angle(cavity_data["cavity_phasor_record"][bunch_number, :])
data1 = np.array(cavity_data["psi"])
data2 = np.array(cavity_data["theta_g"])
ylabel1 = "Tuner diff. from optimal [rad]"
ylabel2 = r"Tuning angle $\psi$ [rad]"
fig, ax = plt.subplots()
twin = ax.twinx()
p1, = ax.plot(time, data0 - data2 + data1, color="r", label=ylabel1)
p2, = twin.plot(time, data1, color="b", label=ylabel2)
ax.set_xlabel("Turn number")
ax.set_ylabel(ylabel1)
twin.set_ylabel(ylabel2)
plots = [p1, p2]
ax.legend(handles=plots, loc="best")
ax.yaxis.label.set_color("r")
twin.yaxis.label.set_color("b")
file.close()
return fig