Skip to content
Snippets Groups Projects
Commit a83e4ca1 authored by Watanyu Foosang's avatar Watanyu Foosang Committed by Gamelin Alexis
Browse files

Add deconvolution methods, restructure wakefield module, and add new wake potential class

- New Wakefunction class to store a wake function object.
- New deconvolve_wp and deconvolve_imp methods were added in Wakefunction class.
- New  get-impedance method was added in Impedance class.
- New Resonator class to calculate the analytical resonator wake function.
- New Wakepotential2 class to take in wake function objects, then compute the wake potential and do the tracking.
parent 32190c71
No related branches found
No related tags found
No related merge requests found
...@@ -17,9 +17,10 @@ from mbtrack2.tracking.element import Element ...@@ -17,9 +17,10 @@ from mbtrack2.tracking.element import Element
from copy import deepcopy from copy import deepcopy
from scipy.integrate import trapz from scipy.integrate import trapz
from scipy.interpolate import interp1d from scipy.interpolate import interp1d
from scipy.constants import c
import scipy as sc
from mpl_toolkits.axes_grid1.inset_locator import inset_axes from mpl_toolkits.axes_grid1.inset_locator import inset_axes
class ComplexData: class ComplexData:
""" """
Define a general data structure for a complex function based on a pandas Define a general data structure for a complex function based on a pandas
...@@ -198,7 +199,166 @@ class WakeFunction(ComplexData): ...@@ -198,7 +199,166 @@ class WakeFunction(ComplexData):
variable=np.array([-1e15, 1e15]), variable=np.array([-1e15, 1e15]),
function=np.array([0, 0]), wake_type='long'): function=np.array([0, 0]), wake_type='long'):
super().__init__(variable, function) super().__init__(variable, function)
pass self._wake_type = wake_type
self.data.index.name = "time [s]"
def add(self, structure_to_add, method='zero'):
"""
Method to add two WakeFunction objects. The two structures are
compared so that the addition is self-consistent.
"""
if isinstance(structure_to_add, (int, float, complex)):
result = super().add(structure_to_add, method=method,
index_name="time [s]")
elif isinstance(structure_to_add, WakeFunction):
if (self.wake_type == structure_to_add.wake_type):
result = super().add(structure_to_add, method=method,
index_name="time [s]")
else:
warnings.warn(('The two WakeFunction objects do not have the '
'same coordinates or plane or type. '
'Returning initial WakeFunction object.'),
UserWarning)
result = self
wake_to_return = WakeFunction(
result.data.index,
result.data.real.values,
self.wake_type)
return wake_to_return
def __radd__(self, structure_to_add):
return self.add(structure_to_add)
def __add__(self, structure_to_add):
return self.add(structure_to_add)
def multiply(self, factor):
"""
Multiply a WakeFunction object with a float or an int.
If the multiplication is done with something else, throw a warning.
"""
result = super().multiply(factor)
wake_to_return = WakeFunction(
result.data.index,
result.data.real.values,
self.wake_type)
return wake_to_return
def __mul__(self, factor):
return self.multiply(factor)
def __rmul__(self, factor):
return self.multiply(factor)
@property
def wake_type(self):
return self._wake_type
@wake_type.setter
def wake_type(self, value):
self._wake_type = value
def deconvolve_wp(self, Wp_filename, bunchlength=1e-3, nout=None,
stopfreq=200e9):
"""
Compute wake function from wake potential by deconvolution method.
Parameters
----------
Wp_filename : str
Text file that contains wake potential data.
bunchlength : float, optional
Spatial bunch length [m].
nout : int, optional
Length of the transformed axis of the output. For n output points,
n//2+1 input points are necessary. If the input is longer than this,
it is cropped. If it is shorter than this, it is padded with zeros.
If n is not given, it is taken to be 2*(m-1) where m is the length
of the input along the axis specified by axis.
stopfreq : float, optional
The maximum frequency for calculating the impedance.
Returns
-------
Wake function object indexed with time.
"""
# FT of wake potential
dist, wp0 = np.loadtxt(Wp_filename, unpack=True)
tau_wp = dist*1e-3/c
wp = wp0*1e12
sampling_wp = tau_wp[1] - tau_wp[0]
freq_wp = sc.fft.rfftfreq(len(tau_wp), sampling_wp)
dtau_wp = (tau_wp[-1]-tau_wp[0])/len(tau_wp)
dft_wp = sc.fft.rfft(wp) * dtau_wp
# FT of analytical bunch profile and analytical impedance
sigma = bunchlength/c
mu = tau_wp[0]
bunch_charge = 1.44e-9
index_lim = freq_wp < stopfreq
freq_wp_trun = freq_wp[index_lim]
dft_wp_trun = dft_wp[index_lim]
dft_rho_eq_trun = np.exp(-0.5*(sigma*2*np.pi*freq_wp_trun)**2 + \
1j*mu*2*np.pi*freq_wp_trun)*bunch_charge
imp = dft_wp_trun/dft_rho_eq_trun*-1*bunch_charge
# Wake function from impedance
if nout is None:
nout = len(imp)
fs = (freq_wp_trun[-1] - freq_wp_trun[0]) / len(freq_wp_trun)
time_array = sc.fft.fftfreq(nout, fs)
Wlong_raw = sc.fft.irfft(imp, n=nout) * nout * fs
time = sc.fft.fftshift(time_array)
Wlong = sc.fft.fftshift(Wlong_raw)
super().__init__(variable=time, function=Wlong)
self.data.index.name = "time [s]"
def deconvolve_imp(self, imp_obj, divided_by=1, nout=None):
"""
Compute wake function from impedance by deconvolution method.
Parameters
----------
imp_obj : impedance object
divided_by : int, optional
Total number of elements that generate the impedance.
nout : int, optional
Length of the transformed axis of the output. For n output points,
n//2+1 input points are necessary. If the input is longer than this,
it is cropped. If it is shorter than this, it is padded with zeros.
If n is not given, it is taken to be 2*(m-1) where m is the length of
the input along the axis specified by axis.
Returns
-------
Wake function object indexed with time.
"""
Z = (imp_obj.data['real'] + imp_obj.data['imag']*1j) / divided_by
fs = (Z.index[-1]-Z.index[0]) / len(Z.index)
sampling = Z.index[1] - Z.index[0]
if nout is not None:
Wlong_raw = sc.fft.irfft(np.array(Z), n=nout, axis=0) * nout * fs
time_array = sc.fft.fftfreq(nout, sampling)
else:
Wlong_raw = sc.fft.irfft(np.array(Z), n=Z.size, axis=0) *Z.size*fs
time_array = sc.fft.fftfreq(Z.size, sampling)
Wlong = sc.fft.fftshift(Wlong_raw)
time = sc.fft.fftshift(time_array)
super().__init__(variable=time, function=Wlong)
self.data.index.name = "time [s]"
class Impedance(ComplexData): class Impedance(ComplexData):
""" """
...@@ -368,6 +528,67 @@ class Impedance(ComplexData): ...@@ -368,6 +528,67 @@ class Impedance(ComplexData):
return kloss return kloss
def get_impedance(self, Wp_filename, axis0='dist', axis0_scale=1e-3,
axis1_scale=1e-12, bunch_length=1e-3, bunch_charge=1.44e-9,
stopfreq=200e9):
"""
Compute impedance from wake potential data.
Parameters
----------
Wp_filename : str
Text file that contains wake potential data.
axis0 : {'dist', 'time'}, optional
Viariable of the data file's first column. Use 'dist' for spacial
distance. Otherwise, 'time' for temporal distance.
axis0_scale : float, optional
Scale of the first column with respect to meter if axis0 = 'dist',
or second if axis0 = 'time'.
axis1_scale : float, optional
Scale of the wake potential (in second column) with respect to V/C.
bunch_length : float, optional
Electron bunch lenth [m].
bunch_charge : float, optional
Total bunch charge [C].
stopfreq : float, optional
The maximum frequency for calculating the impedance.
Returns
-------
Impedance object indexed with frequency.
"""
s, wp0 = np.loadtxt(Wp_filename, unpack=True)
if axis0 == 'dist':
tau = s*axis0_scale/c
elif axis0 == 'time':
tau = s*axis0_scale
else:
raise TypeError('Type of axis 0 not recognized.')
wp = wp0 * axis1_scale
# FT of wake potential
sampling = tau[1] - tau[0]
freq = sc.fft.rfftfreq(len(tau), sampling)
dtau = (tau[-1]-tau[0])/len(tau)
dft = sc.fft.rfft(wp) * dtau
# FT of analytical bunch profile and analytical impedance
sigma = bunch_length/c
mu = tau[0]
index_lim = freq < stopfreq
freq_trun = freq[index_lim]
dft_trun = dft[index_lim]
dft_rho_eq_trun = np.exp(-0.5*(sigma*2*np.pi*freq_trun)**2 + \
1j*mu*2*np.pi*freq_trun)*bunch_charge
imp = dft_trun/dft_rho_eq_trun*-1*bunch_charge
super().__init__(variable=freq_trun, function=imp)
self.data.index.name = "frequency [Hz]"
class Wakefield: class Wakefield:
""" """
Defines a Wakefield which corresponds to a single physical element which Defines a Wakefield which corresponds to a single physical element which
...@@ -418,7 +639,7 @@ class Wakefield: ...@@ -418,7 +639,7 @@ class Wakefield:
else: else:
self.__setattr__(attribute_name, structure_to_add) self.__setattr__(attribute_name, structure_to_add)
elif isinstance(structure_to_add, WakeFunction): elif isinstance(structure_to_add, WakeFunction):
attribute_name = "W" + structure_to_add.impedance_type attribute_name = "W" + structure_to_add.wake_type
if attribute_name in list_of_attributes: if attribute_name in list_of_attributes:
raise ValueError("There is already a component of the type " raise ValueError("There is already a component of the type "
"{} in this element.".format(attribute_name)) "{} in this element.".format(attribute_name))
...@@ -518,6 +739,88 @@ class Wakefield: ...@@ -518,6 +739,88 @@ class Wakefield:
else: else:
raise ValueError("Error in input.") raise ValueError("Error in input.")
class Resonator(Wakefield):
"""
Compute analytical wake function of a resonator and return a Resonator object.
Parameter
---------
ring : Synchrotron object.
Q_factor : float, optional
Resonator quality factor. The default value is 1.
f_res : float, optional
Resonator resonance frequency in [Hz]. The default value is 10e9 Hz.
R_shunt : float, optional
Resonator shunt impedance in [Ohm]. The default value is 100 Ohm.
"""
def __init__(self, ring, Q_factor=1, f_res=10e9, R_shunt=100):
super().__init__()
self.ring = ring
self.Q_factor = Q_factor
self.omega_res = 2*np.pi*f_res
self.R_shunt = R_shunt
if Q_factor >= 0.5:
self.Q_factor_p = np.sqrt(self.Q_factor**2 - 0.25)
self.omega_res_p = (self.omega_res*self.Q_factor_p)/self.Q_factor
else:
self.Q_factor_pp = np.sqrt(0.25 - self.Q_factor**2)
self.omega_res_p = (self.omega_res*self.Q_factor_pp)/self.Q_factor
def init_timestop(self):
"""
Calculate an approximate position where the wake field depletes.
"""
self.timestop = round(np.log(1000)/self.omega_res*2*self.Q_factor, 12)
def time_array(self):
self.tau = np.linspace(start=-30e-12, stop=self.timestop, num=1000)
def long_wakefunction(self):
tau_neg = self.tau[np.where(self.tau<0)[0]]
tau_pos = self.tau[np.where(self.tau>=0)[0]]
wlong_tau_neg = np.zeros((len(tau_neg),))
if self.Q_factor >= 0.5:
wlong_tau_pos = (self.omega_res*self.R_shunt/self.Q_factor)*\
np.exp(-self.omega_res*tau_pos/(2*self.Q_factor))*\
(np.cos(self.omega_res_p*tau_pos)-np.sin(self.omega_res_p*tau_pos)/ \
(2*self.Q_factor_p))
elif self.Q_factor < 0.5:
wlong_tau_pos = (self.omega_res*self.R_shunt/self.Q_factor)*\
np.exp(-self.omega_res*tau_pos/(2*self.Q_factor))*\
(np.cosh(self.omega_res_p*tau_pos)-np.sinh(self.omega_res_p*tau_pos)/ \
(2*self.Q_factor_pp))
self.wlong = np.concatenate((wlong_tau_neg, wlong_tau_pos))
def check_wake_tail(self):
"""
Checking whether the full wake function is obtained by the calculated
initial timestop. If not, extend the timestop by 50 ps.
"""
ratio = np.abs(max(self.wlong)) / np.abs(self.wlong[-6:-1])
while any(ratio < 1000):
self.timestop += 50e-12
self.time_array()
self.long_wakefunction()
ratio = np.abs(max(self.wlong)) / np.abs(self.wlong[-6:-1])
def get_wakefunction(self):
self.init_timestop()
self.time_array()
self.long_wakefunction()
self.check_wake_tail()
wake_func = WakeFunction(variable=self.tau, function=self.wlong)
super().append_to_model(wake_func)
class ImpedanceModel(Element): class ImpedanceModel(Element):
""" """
...@@ -1305,4 +1608,3 @@ def double_sided_impedance(impedance): ...@@ -1305,4 +1608,3 @@ def double_sided_impedance(impedance):
all_data = impedance.data.append(negative_data) all_data = impedance.data.append(negative_data)
all_data = all_data.sort_index() all_data = all_data.sort_index()
impedance.data = all_data impedance.data = all_data
\ No newline at end of file
...@@ -209,7 +209,8 @@ class WakePotential(Element): ...@@ -209,7 +209,8 @@ class WakePotential(Element):
self.W_long = np.array(w_list) self.W_long = np.array(w_list)
def wake_potential(self): def wake_potential(self):
self.W_p = signal.convolve(self.W_long*1e-12, self.rho, mode="same") dtau = (self.tau[-1]-self.tau[0]) / len(self.tau)
self.W_p = signal.convolve(self.W_long, self.rho, mode="same") * dtau
def plot(self, var, plot_rho=True): def plot(self, var, plot_rho=True):
""" """
...@@ -242,7 +243,7 @@ class WakePotential(Element): ...@@ -242,7 +243,7 @@ class WakePotential(Element):
elif var == "W_long": elif var == "W_long":
ax.plot(self.tau*1e12, self.W_long*1e-12) ax.plot(self.tau*1e12, self.W_long*1e-12)
ax.set_xlabel("$\\tau$ (ps)") ax.set_xlabel("$\\tau$ (ps)")
ax.set_ylabel("W$_{||}$ ($\\Omega$/ps)") ax.set_ylabel("W$_{||}$ (V/ps)")
if plot_rho is True: if plot_rho is True:
rho_array = np.array(self.rho) rho_array = np.array(self.rho)
...@@ -296,6 +297,127 @@ class WakePotential(Element): ...@@ -296,6 +297,127 @@ class WakePotential(Element):
bunch["delta"] += self.wp * bunch.charge / self.ring.E0 bunch["delta"] += self.wp * bunch.charge / self.ring.E0
class WakePotential2(Element):
"""
Compute a wake potential from a wake function by performing a convolution
with a bunch charge profile.
Parameters
----------
wake_obj : WakeFunction object
ring : Synchrotron object
n_bin : int, optional
Number of bins for constructing the longitudinal bunch profile.
bunch : Bunch or Beam object
Attributes
----------
rho : array of shape (n_bin, )
Bunch charge density profile.
Wlong : array
Wake function profile.
Wp : array
Wake potential profile.
Wp_interp : array of shape (mp_number, )
Wake potential, obtained from interpolating Wp, exerted on each macro-particle.
Methods
-------
charge_density(bunch)
Calculate bunch charge density [A/C]
plot(self, var, plot_rho=True)
Plotting wakefunction or wake potential. If plot_rho=True, the bunch
charge density is overlayed.
track(bunch)
Tracking method for the element.
"""
def __init__(self, wake_obj, ring, n_bin=65):
self.wake_obj = wake_obj
self.tau = self.wake_obj.data.index
self.Wlong = self.wake_obj.data['real']
self.ring = ring
self.n_bin = n_bin
def charge_density(self, bunch):
self.bins = bunch.binning(n_bin=self.n_bin)
self.bin_data = self.bins[2]
self.bin_size = self.bins[0].length
self.rho = bunch.charge_per_mp*self.bin_data/(self.bin_size*bunch.charge)
def get_wakepotential(self):
interp_func_rho = interp1d(self.bins[0].mid, self.rho, fill_value=0,
bounds_error=False)
rho_interp = interp_func_rho(self.tau)
peak = np.where(rho_interp == max(rho_interp))[0]
rho_interp_shift = np.roll(rho_interp, len(rho_interp)//2-peak)
dtau = (self.tau[-1]-self.tau[0]) / len(self.tau)
self.Wp = signal.convolve(self.Wlong*-1, rho_interp_shift, mode="same") * dtau
def plot(self, var, plot_rho=True):
"""
Plotting wakefunction or wake potential.
Parameters
----------
var : {'Wp', 'Wlong'}
If 'Wp', the wake potential is plotted.
If 'Wlong', the wakefunction is plotted.
plot_rho : bool, optional
Overlay the bunch charge density profile on the plot.
Returns
-------
fig : Figure
Figure object with the plot on it.
"""
fig, ax = plt.subplots()
if var == "Wp":
ax.plot(self.tau*1e12, self.Wp*1e-12)
ax.set_xlabel("$\\tau$ (ps)")
ax.set_ylabel("W$_p$ (V/pC)")
elif var == "Wlong":
ax.plot(self.tau*1e12, self.Wlong*1e-12)
ax.set_xlabel("$\\tau$ (ps)")
ax.set_ylabel("W$_{||}$ (V/ps)")
if plot_rho is True:
rho_array = np.array(self.rho)
rho_rescaled = rho_array/max(rho_array)*max(self.Wp)
ax.plot(self.bins[0].mid*1e12, rho_rescaled*1e-12)
return fig
@Element.parallel
def track(self, bunch):
"""
Tracking method for the element.
No bunch to bunch interaction, so written for Bunch objects and
@Element.parallel is used to handle Beam objects.
Parameters
----------
bunch : Bunch or Beam object.
"""
self.charge_density(bunch)
self.get_wakepotential()
f = interp1d(self.tau, self.Wp, fill_value = 0, bounds_error = False)
self.Wp_interp = f(bunch["tau"])
bunch["delta"] += self.Wp_interp * bunch.charge / self.ring.E0
class SynchrotronRadiation(Element): class SynchrotronRadiation(Element):
""" """
Element to handle synchrotron radiation, radiation damping and quantum Element to handle synchrotron radiation, radiation damping and quantum
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment