diff --git a/collective_effects/wakefield.py b/collective_effects/wakefield.py index 971c759acf5dc1be131194c9033bd578a71dad19..6003ba6364325763c3ec1fb80efdc14958a09c54 100644 --- a/collective_effects/wakefield.py +++ b/collective_effects/wakefield.py @@ -17,6 +17,8 @@ from mbtrack2.tracking.element import Element from copy import deepcopy from scipy.integrate import trapz from scipy.interpolate import interp1d +from scipy.constants import c +import scipy as sc class ComplexData: """ @@ -196,8 +198,167 @@ class WakeFunction(ComplexData): variable=np.array([-1e15, 1e15]), function=np.array([0, 0]), wake_type='long'): super().__init__(variable, function) - pass + self._wake_type = wake_type + self.data.index.name = "time [s]" + + def add(self, structure_to_add, method='zero'): + """ + Method to add two WakeFunction objects. The two structures are + compared so that the addition is self-consistent. + """ + + if isinstance(structure_to_add, (int, float, complex)): + result = super().add(structure_to_add, method=method, + index_name="time [s]") + elif isinstance(structure_to_add, WakeFunction): + if (self.wake_type == structure_to_add.wake_type): + result = super().add(structure_to_add, method=method, + index_name="time [s]") + else: + warnings.warn(('The two WakeFunction objects do not have the ' + 'same coordinates or plane or type. ' + 'Returning initial WakeFunction object.'), + UserWarning) + result = self + + wake_to_return = WakeFunction( + result.data.index, + result.data.real.values, + self.wake_type) + return wake_to_return + + def __radd__(self, structure_to_add): + return self.add(structure_to_add) + + def __add__(self, structure_to_add): + return self.add(structure_to_add) + + def multiply(self, factor): + """ + Multiply a WakeFunction object with a float or an int. + If the multiplication is done with something else, throw a warning. + """ + result = super().multiply(factor) + wake_to_return = WakeFunction( + result.data.index, + result.data.real.values, + self.wake_type) + return wake_to_return + + def __mul__(self, factor): + return self.multiply(factor) + + def __rmul__(self, factor): + return self.multiply(factor) + + @property + def wake_type(self): + return self._wake_type + + @wake_type.setter + def wake_type(self, value): + self._wake_type = value + + def deconvolve_wp(self, Wp_filename, bunchlength=1e-3, nout=None, + stopfreq=200e9): + """ + Compute wake function from wake potential by deconvolution method. + + Parameters + ---------- + Wp_filename : str + Text file that contains wake potential data. + bunchlength : float, optional + Spatial bunch length [m]. + nout : int, optional + Length of the transformed axis of the output. For n output points, + n//2+1 input points are necessary. If the input is longer than this, + it is cropped. If it is shorter than this, it is padded with zeros. + If n is not given, it is taken to be 2*(m-1) where m is the length + of the input along the axis specified by axis. + stopfreq : float, optional + The maximum frequency for calculating the impedance. + + Returns + ------- + Wake function object indexed with time. + + """ + + # FT of wake potential + dist, wp0 = np.loadtxt(Wp_filename, unpack=True) + tau_wp = dist*1e-3/c + wp = wp0*1e12 + sampling_wp = tau_wp[1] - tau_wp[0] + freq_wp = sc.fft.rfftfreq(len(tau_wp), sampling_wp) + dtau_wp = (tau_wp[-1]-tau_wp[0])/len(tau_wp) + dft_wp = sc.fft.rfft(wp) * dtau_wp + + # FT of analytical bunch profile and analytical impedance + sigma = bunchlength/c + mu = tau_wp[0] + bunch_charge = 1.44e-9 + + index_lim = freq_wp < stopfreq + freq_wp_trun = freq_wp[index_lim] + dft_wp_trun = dft_wp[index_lim] + + dft_rho_eq_trun = np.exp(-0.5*(sigma*2*np.pi*freq_wp_trun)**2 + \ + 1j*mu*2*np.pi*freq_wp_trun)*bunch_charge + imp = dft_wp_trun/dft_rho_eq_trun*-1*bunch_charge + + # Wake function from impedance + if nout is None: + nout = len(imp) + + fs = (freq_wp_trun[-1] - freq_wp_trun[0]) / len(freq_wp_trun) + time_array = sc.fft.fftfreq(nout, fs) + Wlong_raw = sc.fft.irfft(imp, n=nout) * nout * fs + + time = sc.fft.fftshift(time_array) + Wlong = sc.fft.fftshift(Wlong_raw) + + super().__init__(variable=time, function=Wlong) + self.data.index.name = "time [s]" + + def deconvolve_imp(self, imp_obj, divided_by=1, nout=None): + """ + Compute wake function from impedance by deconvolution method. + + Parameters + ---------- + imp_obj : impedance object + divided_by : int, optional + Total number of elements that generate the impedance. + nout : int, optional + Length of the transformed axis of the output. For n output points, + n//2+1 input points are necessary. If the input is longer than this, + it is cropped. If it is shorter than this, it is padded with zeros. + If n is not given, it is taken to be 2*(m-1) where m is the length of + the input along the axis specified by axis. + + Returns + ------- + Wake function object indexed with time. + """ + + Z = (imp_obj.data['real'] + imp_obj.data['imag']*1j) / divided_by + fs = (Z.index[-1]-Z.index[0]) / len(Z.index) + sampling = Z.index[1] - Z.index[0] + if nout is not None: + Wlong_raw = sc.fft.irfft(np.array(Z), n=nout, axis=0) * nout * fs + time_array = sc.fft.fftfreq(nout, sampling) + else: + Wlong_raw = sc.fft.irfft(np.array(Z), n=Z.size, axis=0) *Z.size*fs + time_array = sc.fft.fftfreq(Z.size, sampling) + + Wlong = sc.fft.fftshift(Wlong_raw) + time = sc.fft.fftshift(time_array) + + super().__init__(variable=time, function=Wlong) + self.data.index.name = "time [s]" + class Impedance(ComplexData): """ Define an Impedance object based on a ComplexData object. @@ -366,6 +527,67 @@ class Impedance(ComplexData): return kloss + def get_impedance(self, Wp_filename, axis0='dist', axis0_scale=1e-3, + axis1_scale=1e-12, bunch_length=1e-3, bunch_charge=1.44e-9, + stopfreq=200e9): + """ + Compute impedance from wake potential data. + + Parameters + ---------- + Wp_filename : str + Text file that contains wake potential data. + axis0 : {'dist', 'time'}, optional + Viariable of the data file's first column. Use 'dist' for spacial + distance. Otherwise, 'time' for temporal distance. + axis0_scale : float, optional + Scale of the first column with respect to meter if axis0 = 'dist', + or second if axis0 = 'time'. + axis1_scale : float, optional + Scale of the wake potential (in second column) with respect to V/C. + bunch_length : float, optional + Electron bunch lenth [m]. + bunch_charge : float, optional + Total bunch charge [C]. + stopfreq : float, optional + The maximum frequency for calculating the impedance. + + Returns + ------- + Impedance object indexed with frequency. + + """ + s, wp0 = np.loadtxt(Wp_filename, unpack=True) + if axis0 == 'dist': + tau = s*axis0_scale/c + elif axis0 == 'time': + tau = s*axis0_scale + else: + raise TypeError('Type of axis 0 not recognized.') + + wp = wp0 * axis1_scale + + # FT of wake potential + sampling = tau[1] - tau[0] + freq = sc.fft.rfftfreq(len(tau), sampling) + dtau = (tau[-1]-tau[0])/len(tau) + dft = sc.fft.rfft(wp) * dtau + + # FT of analytical bunch profile and analytical impedance + sigma = bunch_length/c + mu = tau[0] + + index_lim = freq < stopfreq + freq_trun = freq[index_lim] + dft_trun = dft[index_lim] + + dft_rho_eq_trun = np.exp(-0.5*(sigma*2*np.pi*freq_trun)**2 + \ + 1j*mu*2*np.pi*freq_trun)*bunch_charge + imp = dft_trun/dft_rho_eq_trun*-1*bunch_charge + + super().__init__(variable=freq_trun, function=imp) + self.data.index.name = "frequency [Hz]" + class Wakefield: """ Defines a Wakefield which corresponds to a single physical element which @@ -416,7 +638,7 @@ class Wakefield: else: self.__setattr__(attribute_name, structure_to_add) elif isinstance(structure_to_add, WakeFunction): - attribute_name = "W" + structure_to_add.impedance_type + attribute_name = "W" + structure_to_add.wake_type if attribute_name in list_of_attributes: raise ValueError("There is already a component of the type " "{} in this element.".format(attribute_name)) @@ -515,7 +737,89 @@ class Wakefield: return wake_sum else: raise ValueError("Error in input.") + +class Resonator(Wakefield): + """ + Compute analytical wake function of a resonator and return a Resonator object. + + Parameter + --------- + ring : Synchrotron object. + Q_factor : float, optional + Resonator quality factor. The default value is 1. + f_res : float, optional + Resonator resonance frequency in [Hz]. The default value is 10e9 Hz. + R_shunt : float, optional + Resonator shunt impedance in [Ohm]. The default value is 100 Ohm. + """ + + def __init__(self, ring, Q_factor=1, f_res=10e9, R_shunt=100): + super().__init__() + self.ring = ring + self.Q_factor = Q_factor + self.omega_res = 2*np.pi*f_res + self.R_shunt = R_shunt + + if Q_factor >= 0.5: + self.Q_factor_p = np.sqrt(self.Q_factor**2 - 0.25) + self.omega_res_p = (self.omega_res*self.Q_factor_p)/self.Q_factor + else: + self.Q_factor_pp = np.sqrt(0.25 - self.Q_factor**2) + self.omega_res_p = (self.omega_res*self.Q_factor_pp)/self.Q_factor + + def init_timestop(self): + """ + Calculate an approximate position where the wake field depletes. + + """ + self.timestop = round(np.log(1000)/self.omega_res*2*self.Q_factor, 12) + + def time_array(self): + self.tau = np.linspace(start=-30e-12, stop=self.timestop, num=1000) + + def long_wakefunction(self): + tau_neg = self.tau[np.where(self.tau<0)[0]] + tau_pos = self.tau[np.where(self.tau>=0)[0]] + + wlong_tau_neg = np.zeros((len(tau_neg),)) + + if self.Q_factor >= 0.5: + wlong_tau_pos = (self.omega_res*self.R_shunt/self.Q_factor)*\ + np.exp(-self.omega_res*tau_pos/(2*self.Q_factor))*\ + (np.cos(self.omega_res_p*tau_pos)-np.sin(self.omega_res_p*tau_pos)/ \ + (2*self.Q_factor_p)) + + elif self.Q_factor < 0.5: + wlong_tau_pos = (self.omega_res*self.R_shunt/self.Q_factor)*\ + np.exp(-self.omega_res*tau_pos/(2*self.Q_factor))*\ + (np.cosh(self.omega_res_p*tau_pos)-np.sinh(self.omega_res_p*tau_pos)/ \ + (2*self.Q_factor_pp)) + + self.wlong = np.concatenate((wlong_tau_neg, wlong_tau_pos)) + + def check_wake_tail(self): + """ + Checking whether the full wake function is obtained by the calculated + initial timestop. If not, extend the timestop by 50 ps. + + """ + + ratio = np.abs(max(self.wlong)) / np.abs(self.wlong[-6:-1]) + while any(ratio < 1000): + self.timestop += 50e-12 + self.time_array() + self.long_wakefunction() + ratio = np.abs(max(self.wlong)) / np.abs(self.wlong[-6:-1]) + + def get_wakefunction(self): + self.init_timestop() + self.time_array() + self.long_wakefunction() + self.check_wake_tail() + wake_func = WakeFunction(variable=self.tau, function=self.wlong) + super().append_to_model(wake_func) + class ImpedanceModel(Element): """ @@ -1280,4 +1584,3 @@ def double_sided_impedance(impedance): all_data = impedance.data.append(negative_data) all_data = all_data.sort_index() impedance.data = all_data - \ No newline at end of file diff --git a/tracking/element.py b/tracking/element.py index 81f506cac6478144ed78ee60a5c588fb972ce863..881e693d53e873fc7cfff3631f03dd8ddb7c48a4 100644 --- a/tracking/element.py +++ b/tracking/element.py @@ -208,8 +208,9 @@ class WakePotential(Element): self.W_long = np.array(w_list) - def wake_potential(self): - self.W_p = signal.convolve(self.W_long*1e-12, self.rho, mode="same") + def wake_potential(self): + dtau = (self.tau[-1]-self.tau[0]) / len(self.tau) + self.W_p = signal.convolve(self.W_long, self.rho, mode="same") * dtau def plot(self, var, plot_rho=True): """ @@ -242,7 +243,7 @@ class WakePotential(Element): elif var == "W_long": ax.plot(self.tau*1e12, self.W_long*1e-12) ax.set_xlabel("$\\tau$ (ps)") - ax.set_ylabel("W$_{||}$ ($\\Omega$/ps)") + ax.set_ylabel("W$_{||}$ (V/ps)") if plot_rho is True: rho_array = np.array(self.rho) @@ -296,6 +297,127 @@ class WakePotential(Element): bunch["delta"] += self.wp * bunch.charge / self.ring.E0 +class WakePotential2(Element): + """ + Compute a wake potential from a wake function by performing a convolution + with a bunch charge profile. + + Parameters + ---------- + wake_obj : WakeFunction object + ring : Synchrotron object + n_bin : int, optional + Number of bins for constructing the longitudinal bunch profile. + bunch : Bunch or Beam object + + Attributes + ---------- + rho : array of shape (n_bin, ) + Bunch charge density profile. + Wlong : array + Wake function profile. + Wp : array + Wake potential profile. + Wp_interp : array of shape (mp_number, ) + Wake potential, obtained from interpolating Wp, exerted on each macro-particle. + + Methods + ------- + charge_density(bunch) + Calculate bunch charge density [A/C] + plot(self, var, plot_rho=True) + Plotting wakefunction or wake potential. If plot_rho=True, the bunch + charge density is overlayed. + track(bunch) + Tracking method for the element. + + """ + + def __init__(self, wake_obj, ring, n_bin=65): + self.wake_obj = wake_obj + self.tau = self.wake_obj.data.index + self.Wlong = self.wake_obj.data['real'] + + self.ring = ring + self.n_bin = n_bin + + def charge_density(self, bunch): + self.bins = bunch.binning(n_bin=self.n_bin) + self.bin_data = self.bins[2] + self.bin_size = self.bins[0].length + + self.rho = bunch.charge_per_mp*self.bin_data/(self.bin_size*bunch.charge) + + def get_wakepotential(self): + interp_func_rho = interp1d(self.bins[0].mid, self.rho, fill_value=0, + bounds_error=False) + rho_interp = interp_func_rho(self.tau) + peak = np.where(rho_interp == max(rho_interp))[0] + rho_interp_shift = np.roll(rho_interp, len(rho_interp)//2-peak) + dtau = (self.tau[-1]-self.tau[0]) / len(self.tau) + self.Wp = signal.convolve(self.Wlong*-1, rho_interp_shift, mode="same") * dtau + + def plot(self, var, plot_rho=True): + """ + Plotting wakefunction or wake potential. + + Parameters + ---------- + var : {'Wp', 'Wlong'} + If 'Wp', the wake potential is plotted. + If 'Wlong', the wakefunction is plotted. + plot_rho : bool, optional + Overlay the bunch charge density profile on the plot. + + Returns + ------- + fig : Figure + Figure object with the plot on it. + + """ + + fig, ax = plt.subplots() + + if var == "Wp": + ax.plot(self.tau*1e12, self.Wp*1e-12) + + ax.set_xlabel("$\\tau$ (ps)") + ax.set_ylabel("W$_p$ (V/pC)") + + elif var == "Wlong": + ax.plot(self.tau*1e12, self.Wlong*1e-12) + ax.set_xlabel("$\\tau$ (ps)") + ax.set_ylabel("W$_{||}$ (V/ps)") + + if plot_rho is True: + rho_array = np.array(self.rho) + rho_rescaled = rho_array/max(rho_array)*max(self.Wp) + + ax.plot(self.bins[0].mid*1e12, rho_rescaled*1e-12) + + return fig + + @Element.parallel + def track(self, bunch): + """ + Tracking method for the element. + No bunch to bunch interaction, so written for Bunch objects and + @Element.parallel is used to handle Beam objects. + + Parameters + ---------- + bunch : Bunch or Beam object. + + """ + + self.charge_density(bunch) + self.get_wakepotential() + + f = interp1d(self.tau, self.Wp, fill_value = 0, bounds_error = False) + self.Wp_interp = f(bunch["tau"]) + + bunch["delta"] += self.Wp_interp * bunch.charge / self.ring.E0 + class SynchrotronRadiation(Element): """ Element to handle synchrotron radiation, radiation damping and quantum