Something went wrong on our end
-
Alexis GAMELIN authored
Fix TD calculation for transverse dipolar wakes due to sign change in the 31/08/2021 commit.
Alexis GAMELIN authoredFix TD calculation for transverse dipolar wakes due to sign change in the 31/08/2021 commit.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
wakepotential.py 20.32 KiB
# -*- coding: utf-8 -*-
"""
This module defines the WakePotential class which deals with the single bunch
effects, both longitudinal and transverse.
@author: Alexis Gamelin, Watanyu Foosang
@date: 25/09/2020
"""
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import signal
from scipy.interpolate import interp1d
from mbtrack2.tracking.element import Element
from mbtrack2.collective_effects.utilities import gaussian_bunch
class WakePotential(Element):
"""
Compute a wake potential from uniformly sampled wake functions by
performing a convolution with a bunch charge profile.
Two different time bases are used. The first one is controled by the n_bin
parameter and is used to compute the bunch profile. Then the bunch profile
is interpolated on the wake function time base which is used to perform the
convolution to get the wake potential.
Parameters
----------
ring : Synchrotron object
wakefield : Wakefield object
Wakefield object which contains the wake functions to be used. The wake
functions must be uniformly sampled!
n_bin : int, optional
Number of bins for constructing the longitudinal bunch profile.
Attributes
----------
rho : array of shape (n_bin, )
Bunch charge density profile in the unit [1/s].
Wp : array
Wake potential profile.
Wp_interp : array of shape (mp_number, )
Wake potential, obtained from interpolating Wp, exerted on each macro-particle.
Methods
-------
charge_density(bunch)
Compute bunch charge density profile in [1/s].
dipole_moment(bunch, plane, tau0)
Return the dipole moment of the bunch computed on the same time array
as the wake function.
prepare_wakefunction(wake_type)
Prepare the wake function of a given wake_type to be used for the wake
potential computation.
get_wakepotential(bunch, wake_type)
Return the wake potential computed on the wake function time array
limited to the bunch profile.
track(bunch)
Tracking method for the element.
plot_last_wake(wake_type)
Plot the last wake potential of a given type computed during the last
call of the track method.
reference_loss(bunch)
Calculate the loss factor and kick factor from the wake potential and
compare it to a reference value assuming a Gaussian bunch computed in
the frequency domain.
check_sampling()
Check if the wake function sampling is uniform.
reduce_sampling(factor)
Reduce wake function samping by an integer factor.
"""
def __init__(self, ring, wakefield, n_bin=65):
self.wakefield = wakefield
self.types = self.wakefield.wake_components
self.n_types = len(self.wakefield.wake_components)
self.ring = ring
self.n_bin = n_bin
self.check_sampling()
def charge_density(self, bunch):
"""
Compute bunch charge density profile in [1/s].
Parameters
----------
bunch : Bunch object
"""
# Get binning data
a, b, c, d = bunch.binning(n_bin=self.n_bin)
self.bins = a
self.sorted_index = b
self.profile = c
self.center = d
self.bin_size = self.bins[1] - self.bins[0]
# Compute charge density
self.rho = bunch.charge_per_mp*self.profile/(self.bin_size*bunch.charge)
self.rho = np.array(self.rho)
# Compute time array
self.tau = np.array(self.center)
self.dtau = self.tau[1] - self.tau[0]
# Add N values before and after rho and tau
if self.n_bin % 2 == 0:
N = int(self.n_bin/2)
self.tau = np.arange(self.tau[0] - self.dtau*N,
self.tau[-1] + self.dtau*N,
self.dtau)
self.rho = np.append(self.rho, np.zeros(N))
self.rho = np.insert(self.rho, 0, np.zeros(N))
else:
N = int(np.floor(self.n_bin/2))
self.tau = np.arange(self.tau[0] - self.dtau*N,
self.tau[-1] + self.dtau*(N+1),
self.dtau)
self.rho = np.append(self.rho, np.zeros(N))
self.rho = np.insert(self.rho, 0, np.zeros(N+1))
if len(self.tau) != len(self.rho):
self.tau = np.append(self.tau, self.tau[-1] + self.dtau)
self.tau_mean = np.mean(self.tau)
self.tau -= self.tau_mean
def dipole_moment(self, bunch, plane, tau0):
"""
Return the dipole moment of the bunch computed on the same time array
as the wake function.
Parameters
----------
bunch : Bunch object
plane : str
Plane on which the dipole moment is computed, "x" or "y".
tau0 : array
Time array on which the dipole moment will be interpolated, in [s].
Returns
-------
dipole : array
Dipole moment of the bunch.
"""
dipole = np.zeros((self.n_bin - 1,))
for i in range(self.n_bin - 1):
dipole[i] = bunch[plane][self.sorted_index == i].sum()
dipole = dipole/self.profile
dipole[np.isnan(dipole)] = 0
# Add N values to get same size as tau/profile
if self.n_bin % 2 == 0:
N = int(self.n_bin/2)
dipole = np.append(dipole, np.zeros(N))
dipole = np.insert(dipole, 0, np.zeros(N))
else:
N = int(np.floor(self.n_bin/2))
dipole = np.append(dipole, np.zeros(N))
dipole = np.insert(dipole, 0, np.zeros(N+1))
# Interpole on tau0 to get the same size as W0
interp_dipole = interp1d(self.tau, dipole, fill_value=0,
bounds_error=False)
dipole0 = interp_dipole(tau0)
setattr(self, "dipole_" + plane, dipole0)
return dipole0
def prepare_wakefunction(self, wake_type, tau, save_data=True):
"""
Prepare the wake function of a given wake_type to be used for the wake
potential computation.
The new time array keeps the same sampling time as given in the
WakeFunction definition but is restricted to the bunch profile time
array.
Parameters
----------
wake_type : str
Type of the wake function to prepare: "Wlong", "Wxdip", ...
tau : array
Time domain array of the bunch profile in [s].
save_data : bool, optional
If True, the results are saved as atributes.
Returns
-------
tau0 : array
Time base of the wake function in [s].
dtau0 : float
Difference between two points of the wake function time base in
[s].
W0 : array
Wake function array in [V/C] or [V/C/m].
"""
tau0 = np.array(getattr(self.wakefield, wake_type).data.index)
dtau0 = tau0[1] - tau0[0]
W0 = np.array(getattr(self.wakefield, wake_type).data["real"])
# Keep only the wake function on the rho window
ind = np.all([min(tau[0], 0) < tau0, max(tau[-1], 0) > tau0],
axis=0)
tau0 = tau0[ind]
W0 = W0[ind]
# Check the wake function window for assymetry
assym = (np.abs(tau0[-1]) - np.abs(tau0[0])) / dtau0
n_assym = int(np.floor(assym))
if np.floor(assym) > 1:
# add at head
if np.abs(tau0[-1]) > np.abs(tau0[0]):
tau0 = np.arange(tau0[0] - dtau0*n_assym,
tau0[-1] + dtau0,
dtau0)
n_to_add = len(tau0) - len(W0)
W0 = np.insert(W0, 0, np.zeros(n_to_add))
# add at tail
elif np.abs(tau0[0]) > np.abs(tau0[-1]):
tau0 = np.arange(tau0[0],
tau0[-1] + dtau0*(n_assym+1),
dtau0)
n_to_add = len(tau0) - len(W0)
W0 = np.insert(W0, 0, np.zeros(n_to_add))
# Check is the wf is shorter than rho then add zeros
if (tau0[0] > tau[0]) or (tau0[-1] < tau[-1]):
n = max(int(np.ceil((tau0[0] - tau[0])/dtau0)),
int(np.ceil((tau[-1] - tau0[-1])/dtau0)))
tau0 = np.arange(tau0[0] - dtau0*n,
tau0[-1] + dtau0*(n+1),
dtau0)
W0 = np.insert(W0, 0, np.zeros(n))
n_to_add = len(tau0) - len(W0)
W0 = np.insert(W0, len(W0), np.zeros(n_to_add))
if save_data:
setattr(self, "tau0_" + wake_type, tau0)
setattr(self, "dtau0_" + wake_type, dtau0)
setattr(self, "W0_" + wake_type, W0)
return (tau0, dtau0, W0)
def get_wakepotential(self, bunch, wake_type):
"""
Return the wake potential computed on the wake function time array
limited to the bunch profile.
Parameters
----------
bunch : Bunch object
wake_type : str
Wake function type: "Wlong", "Wxdip", ...
Returns
-------
Wp : array
Wake potential.
"""
(tau0, dtau0, W0) = self.prepare_wakefunction(wake_type, self.tau)
interp_profile = interp1d(self.tau, self.rho, fill_value=0,
bounds_error=False)
profile0 = interp_profile(tau0)
if wake_type == "Wlong" or wake_type == "Wxquad" or wake_type == "Wyquad":
Wp = signal.convolve(profile0, W0*-1, mode='same')*dtau0
elif wake_type == "Wxdip":
dipole0 = self.dipole_moment(bunch, "x", tau0)
Wp = signal.convolve(profile0*dipole0, W0, mode='same')*dtau0
elif wake_type == "Wydip":
dipole0 = self.dipole_moment(bunch, "y", tau0)
Wp = signal.convolve(profile0*dipole0, W0, mode='same')*dtau0
else:
raise ValueError("This type of wake is not taken into account.")
setattr(self,"profile0_" + wake_type, profile0)
setattr(self, wake_type, Wp)
return tau0, Wp
@Element.parallel
def track(self, bunch):
"""
Tracking method for the element.
No bunch to bunch interaction, so written for Bunch objects and
@Element.parallel is used to handle Beam objects.
Parameters
----------
bunch : Bunch or Beam object.
"""
if len(bunch) != 0:
self.charge_density(bunch)
for wake_type in self.types:
tau0, Wp = self.get_wakepotential(bunch, wake_type)
f = interp1d(tau0 + self.tau_mean, Wp, fill_value = 0, bounds_error = False)
Wp_interp = f(bunch["tau"])
if wake_type == "Wlong":
bunch["delta"] += Wp_interp * bunch.charge / self.ring.E0
elif wake_type == "Wxdip":
bunch["xp"] += Wp_interp * bunch.charge / self.ring.E0
elif wake_type == "Wydip":
bunch["yp"] += Wp_interp * bunch.charge / self.ring.E0
elif wake_type == "Wxquad":
bunch["xp"] += (bunch["x"] * Wp_interp * bunch.charge
/ self.ring.E0)
elif wake_type == "Wyquad":
bunch["yp"] += (bunch["y"] * Wp_interp * bunch.charge
/ self.ring.E0)
def plot_last_wake(self, wake_type, plot_rho=True, plot_dipole=False,
plot_wake_function=True):
"""
Plot the last wake potential of a given type computed during the last
call of the track method.
Parameters
----------
wake_type : str
Type of the wake to plot: "Wlong", "Wxdip", ...
plot_rho : bool, optional
Plot the normalised bunch profile. The default is True.
plot_dipole : bool, optional
Plot the normalised dipole moment. The default is False.
plot_wake_function : bool, optional
Plot the normalised wake function. The default is True.
Returns
-------
fig : figure
"""
labels = {"Wlong" : r"$W_{p,long}$ (V/pC)",
"Wxdip" : r"$W_{p,x}^{D} (V/pC)$",
"Wydip" : r"$W_{p,y}^{D} (V/pC)$",
"Wxquad" : r"$W_{p,x}^{Q} (V/pC/m)$",
"Wyquad" : r"$W_{p,y}^{Q} (V/pC/m)$"}
Wp = getattr(self, wake_type)
tau0 = getattr(self, "tau0_" + wake_type)
fig, ax = plt.subplots()
ax.plot(tau0*1e12, Wp*1e-12, label=labels[wake_type])
ax.set_xlabel("$\\tau$ (ps)")
ax.set_ylabel(labels[wake_type])
if plot_rho is True:
profile0 = getattr(self,"profile0_" + wake_type)
profile_rescaled = profile0/max(profile0)*max(np.abs(Wp))
rho_rescaled = self.rho/max(self.rho)*max(np.abs(Wp))
ax.plot(tau0*1e12, profile_rescaled*1e-12, label=r"$\rho$ interpolated (a.u.)")
ax.plot((self.tau + self.tau_mean)*1e12, rho_rescaled*1e-12, label=r"$\rho$ (a.u.)", linestyle='dashed')
plt.legend()
if plot_wake_function is True:
W0 = getattr(self, "W0_" + wake_type)
W0_rescaled = W0/max(W0)*max(np.abs(Wp))
ax.plot(tau0*1e12, W0_rescaled*1e-12, label=r"$W_{function}$ (a.u.)")
plt.legend()
if plot_dipole is True:
dipole = getattr(self, "dipole_" + wake_type[1])
dipole_rescaled = dipole/max(dipole)*max(np.abs(Wp))
ax.plot(tau0*1e12, dipole_rescaled*1e-12, label=r"Dipole moment (a.u.)")
plt.legend()
return fig
def get_gaussian_wakepotential(self, sigma, wake_type, dipole=1e-3):
"""
Return the wake potential computed using a perfect gaussian profile.
Parameters
----------
sigma : float
RMS bunch length in [s].
wake_type : str
Wake function type: "Wlong", "Wxdip", ...
dipole : float, optional
Dipole moment to consider in [m], (uniform dipole moment).
Returns
-------
tau0 : array
Time base in [s].
W0 : array
Wake function.
Wp : array
Wake potential.
profile0 : array
Gaussian bunch profile.
dipole0 : array
Dipole moment.
"""
tau = np.linspace(-10*sigma,10*sigma, int(1e3))
(tau0, dtau0, W0) = self.prepare_wakefunction(wake_type, tau, False)
profile0 = gaussian_bunch(tau0, sigma)
dipole0 = np.ones_like(profile0)*dipole
if wake_type == "Wlong" or wake_type == "Wxquad" or wake_type == "Wyquad":
Wp = signal.convolve(profile0, W0*-1, mode='same')*dtau0
elif wake_type == "Wxdip":
Wp = signal.convolve(profile0*dipole0, W0*-1, mode='same')*dtau0
elif wake_type == "Wydip":
Wp = signal.convolve(profile0*dipole0, W0*-1, mode='same')*dtau0
else:
raise ValueError("This type of wake is not taken into account.")
return tau0, W0, Wp, profile0, dipole0
def plot_gaussian_wake(self, sigma, wake_type, dipole=1e-3, plot_rho=True,
plot_dipole=False, plot_wake_function=True):
"""
Plot the wake potential of a given type for a perfect gaussian bunch.
Parameters
----------
sigma : float
RMS bunch length in [s].
wake_type : str
Type of the wake to plot: "Wlong", "Wxdip", ...
dipole : float, optional
Dipole moment to consider in [m], (uniform dipole moment).
plot_rho : bool, optional
Plot the normalised bunch profile. The default is True.
plot_dipole : bool, optional
Plot the normalised dipole moment. The default is False.
plot_wake_function : bool, optional
Plot the normalised wake function. The default is True.
Returns
-------
fig : figure
"""
labels = {"Wlong" : r"$W_{p,long}$ (V/pC)",
"Wxdip" : r"$W_{p,x}^{D} (V/pC)$",
"Wydip" : r"$W_{p,y}^{D} (V/pC)$",
"Wxquad" : r"$W_{p,x}^{Q} (V/pC/m)$",
"Wyquad" : r"$W_{p,y}^{Q} (V/pC/m)$"}
tau0, W0, Wp, profile0, dipole0 = self.get_gaussian_wakepotential(sigma, wake_type, dipole)
fig, ax = plt.subplots()
ax.plot(tau0*1e12, Wp*1e-12, label=labels[wake_type])
ax.set_xlabel("$\\tau$ (ps)")
ax.set_ylabel(labels[wake_type])
if plot_rho is True:
profile_rescaled = profile0/max(profile0)*max(np.abs(Wp))
ax.plot(tau0*1e12, profile_rescaled*1e-12, label=r"$\rho$ (a.u.)")
plt.legend()
if plot_wake_function is True:
W0_rescaled = W0/max(W0)*max(np.abs(Wp))
ax.plot(tau0*1e12, W0_rescaled*1e-12, label=r"$W_{function}$ (a.u.)")
plt.legend()
if plot_dipole is True:
dipole_rescaled = dipole0/max(dipole0)*max(np.abs(Wp))
ax.plot(tau0*1e12, dipole_rescaled*1e-12, label=r"Dipole moment (a.u.)")
plt.legend()
return fig
def reference_loss(self, bunch):
"""
Calculate the loss factor and kick factor from the wake potential and
compare it to a reference value assuming a Gaussian bunch computed in
the frequency domain.
Parameters
----------
bunch : Bunch object
Returns
-------
loss_data : DataFrame
An output showing the loss/kick factors compared to the reference
values.
"""
loss = []
loss_0 = []
delta_loss = []
index = []
for wake_type in self.types:
tau0, Wp = self.get_wakepotential(bunch, wake_type)
profile0 = getattr(self, "profile0_" + wake_type)
factorTD = np.trapz(Wp*profile0, tau0)
if wake_type == "Wlong":
factorTD *= -1
if wake_type == "Wxdip":
factorTD /= bunch["x"].mean()
if wake_type == "Wydip":
factorTD /= bunch["y"].mean()
Z = getattr(self.wakefield, "Z" + wake_type[1:])
sigma = bunch['tau'].std()
factorFD = Z.loss_factor(sigma)
loss.append(factorTD)
loss_0.append(factorFD)
delta_loss.append( (factorTD - factorFD) / factorFD *100 )
if wake_type == "Wlong":
index.append("Wlong [V/C]")
else:
index.append(wake_type + " [V/C/m]")
column = ['TD factor', 'FD factor', 'Relative error [%]']
loss_data = pd.DataFrame(np.array([loss, loss_0, delta_loss]).T,
columns=column,
index=index)
return loss_data
def check_sampling(self):
"""
Check if the wake function sampling is uniform.
Raises
------
ValueError
"""
for wake_type in self.types:
idx = getattr(self.wakefield, wake_type).data.index
diff = idx[1:]-idx[:-1]
result = np.all(np.isclose(diff, diff[0], atol=1e-15))
if result is False:
raise ValueError("The wake function must be uniformly sampled.")
def reduce_sampling(self, factor):
"""
Reduce wake function samping by an integer factor.
Used to reduce computation time for long bunches.
Parameters
----------
factor : int
"""
for wake_type in self.types:
idx = getattr(self.wakefield, wake_type).data.index[::factor]
getattr(self.wakefield, wake_type).data = getattr(self.wakefield, wake_type).data.loc[idx]
self.check_sampling()