# -*- coding: utf-8 -*- """ This module defines general classes to describes wakefields, impedances and wake functions. Based on impedance library by David Amorim. @author: David Amorin, Alexis Gamelin @date: 14/01/2020 """ import warnings import re import pandas as pd import numpy as np import matplotlib.pyplot as plt from scipy.interpolate import interp1d from mbtrack2.tracking.element import Element from copy import deepcopy from scipy.integrate import trapz from scipy.interpolate import interp1d from scipy.constants import c import scipy as sc from mpl_toolkits.axes_grid1.inset_locator import inset_axes class ComplexData: """ Define a general data structure for a complex function based on a pandas DataFrame. Parameters ---------- variable : list, numpy array contains the function variable values function : list or numpy array of complex numbers contains the values taken by the complex function """ def __init__(self, variable=np.array([-1e15, 1e15]), function=np.array([0, 0])): self.data = pd.DataFrame({'real': np.real(function), 'imag': np.imag(function)}, index=variable) self.data.index.name = 'variable' def add(self, structure_to_add, method='zero', interp_kind='cubic', index_name="variable"): """ Method to add two structures. If the data don't have the same length, different cases are handled. Parameters ---------- structure_to_add : ComplexData object, int, float or complex. structure to be added. method : str, optional manage how the addition is done, possibilties are: -common: the common range of the index (variable) from the two structures is used. The input data are cross-interpolated so that the result data contains the points at all initial points in the common domain. -extrapolate: extrapolate the value of both ComplexData. -zero: outside of the common range, the missing values are zero. In the common range, behave as "common" method. interp_kind : str, optional interpolation method which is passed to pandas and to scipy.interpolate.interp1d. index_name : str, optional name of the dataframe index passed to the method Returns ------- ComplexData Contains the sum of the two inputs. """ # Create first two new DataFrames merging the variable # from the two input data if isinstance(structure_to_add, (int, float, complex)): structure_to_add = ComplexData(variable=self.data.index, function=(structure_to_add * np.ones(len(self.data.index)))) data_to_concat = structure_to_add.data.index.to_frame().set_index(index_name) initial_data = pd.concat([self.data, data_to_concat], sort=True) initial_data = initial_data[~initial_data.index.duplicated(keep='first')] initial_data = initial_data.sort_index() data_to_add = pd.concat( [structure_to_add.data, self.data.index.to_frame().set_index(index_name)], sort=True) data_to_add = data_to_add[~data_to_add.index.duplicated(keep='first')] data_to_add = data_to_add.sort_index() if method == 'common': max_variable = min(structure_to_add.data.index.max(), self.data.index.max()) min_variable = max(structure_to_add.data.index.min(), self.data.index.min()) initial_data = initial_data.interpolate(method=interp_kind) mask = ((initial_data.index <= max_variable) & (initial_data.index >= min_variable)) initial_data = initial_data[mask] data_to_add = data_to_add.interpolate(method=interp_kind) mask = ((data_to_add.index <= max_variable) & (data_to_add.index >= min_variable)) data_to_add = data_to_add[mask] result_structure = ComplexData() result_structure.data = initial_data + data_to_add return result_structure if method == 'extrapolate': print('Not there yet') return self if method == 'zero': max_variable = min(structure_to_add.data.index.max(), self.data.index.max()) min_variable = max(structure_to_add.data.index.min(), self.data.index.min()) mask = ((initial_data.index <= max_variable) & (initial_data.index >= min_variable)) initial_data[mask] = initial_data[mask].interpolate(method=interp_kind) mask = ((data_to_add.index <= max_variable) & (data_to_add.index >= min_variable)) data_to_add[mask] = data_to_add[mask].interpolate(method=interp_kind) initial_data.replace(to_replace=np.nan, value=0, inplace=True) data_to_add.replace(to_replace=np.nan, value=0, inplace=True) result_structure = ComplexData() result_structure.data = initial_data + data_to_add return result_structure def __radd__(self, structure_to_add): return self.add(structure_to_add, method='zero') def __add__(self, structure_to_add): return self.add(structure_to_add, method='zero') def multiply(self, factor): """ Multiply a data strucure with a float or an int. If the multiplication is done with something else, throw a warning. """ if isinstance(factor, (int, float)): result_structure = ComplexData() result_structure.data = self.data * factor return result_structure else: warnings.warn(('The multiplication factor is not a float ' 'or an int.'), UserWarning) return self def __mul__(self, factor): return self.multiply(factor) def __rmul__(self, factor): return self.multiply(factor) def __call__(self, values, interp_kind="cubic"): """ Interpolation of the data by calling the class to have a function-like behaviour. Parameters ---------- values : list or numpy array of complex, int or float values to be interpolated. interp_kind : str, optional interpolation method which is passed to scipy.interpolate.interp1d. Returns ------- numpy array Contains the interpolated data. """ real_func = interp1d(x = self.data.index, y = self.data["real"], kind=interp_kind) imag_func = interp1d(x = self.data.index, y = self.data["imag"], kind=interp_kind) return real_func(values) + 1j*imag_func(values) class WakeFunction(ComplexData): """ Define a WakeFunction object based on a ComplexData object. """ def __init__(self, variable=np.array([-1e15, 1e15]), function=np.array([0, 0]), wake_type='long'): super().__init__(variable, function) self._wake_type = wake_type self.data.index.name = "time [s]" def add(self, structure_to_add, method='zero'): """ Method to add two WakeFunction objects. The two structures are compared so that the addition is self-consistent. """ if isinstance(structure_to_add, (int, float, complex)): result = super().add(structure_to_add, method=method, index_name="time [s]") elif isinstance(structure_to_add, WakeFunction): if (self.wake_type == structure_to_add.wake_type): result = super().add(structure_to_add, method=method, index_name="time [s]") else: warnings.warn(('The two WakeFunction objects do not have the ' 'same coordinates or plane or type. ' 'Returning initial WakeFunction object.'), UserWarning) result = self wake_to_return = WakeFunction( result.data.index, result.data.real.values, self.wake_type) return wake_to_return def __radd__(self, structure_to_add): return self.add(structure_to_add) def __add__(self, structure_to_add): return self.add(structure_to_add) def multiply(self, factor): """ Multiply a WakeFunction object with a float or an int. If the multiplication is done with something else, throw a warning. """ result = super().multiply(factor) wake_to_return = WakeFunction( result.data.index, result.data.real.values, self.wake_type) return wake_to_return def __mul__(self, factor): return self.multiply(factor) def __rmul__(self, factor): return self.multiply(factor) @property def wake_type(self): return self._wake_type @wake_type.setter def wake_type(self, value): self._wake_type = value def deconvolve_wp(self, Wp_filename, bunchlength=1e-3, nout=None, stopfreq=200e9): """ Compute wake function from wake potential by deconvolution method. Parameters ---------- Wp_filename : str Text file that contains wake potential data. bunchlength : float, optional Spatial bunch length [m]. nout : int, optional Length of the transformed axis of the output. For n output points, n//2+1 input points are necessary. If the input is longer than this, it is cropped. If it is shorter than this, it is padded with zeros. If n is not given, it is taken to be 2*(m-1) where m is the length of the input along the axis specified by axis. stopfreq : float, optional The maximum frequency for calculating the impedance. Returns ------- Wake function object indexed with time. """ # FT of wake potential dist, wp0 = np.loadtxt(Wp_filename, unpack=True) tau_wp = dist*1e-3/c wp = wp0*1e12 sampling_wp = tau_wp[1] - tau_wp[0] freq_wp = sc.fft.rfftfreq(len(tau_wp), sampling_wp) dtau_wp = (tau_wp[-1]-tau_wp[0])/len(tau_wp) dft_wp = sc.fft.rfft(wp) * dtau_wp # FT of analytical bunch profile and analytical impedance sigma = bunchlength/c mu = tau_wp[0] bunch_charge = 1.44e-9 index_lim = freq_wp < stopfreq freq_wp_trun = freq_wp[index_lim] dft_wp_trun = dft_wp[index_lim] dft_rho_eq_trun = np.exp(-0.5*(sigma*2*np.pi*freq_wp_trun)**2 + \ 1j*mu*2*np.pi*freq_wp_trun)*bunch_charge imp = dft_wp_trun/dft_rho_eq_trun*-1*bunch_charge # Wake function from impedance if nout is None: nout = len(imp) fs = (freq_wp_trun[-1] - freq_wp_trun[0]) / len(freq_wp_trun) time_array = sc.fft.fftfreq(nout, fs) Wlong_raw = sc.fft.irfft(imp, n=nout) * nout * fs time = sc.fft.fftshift(time_array) Wlong = sc.fft.fftshift(Wlong_raw) super().__init__(variable=time, function=Wlong) self.data.index.name = "time [s]" def deconvolve_imp(self, imp_obj, divided_by=1, nout=None): """ Compute wake function from impedance by deconvolution method. Parameters ---------- imp_obj : impedance object divided_by : int, optional Total number of elements that generate the impedance. nout : int, optional Length of the transformed axis of the output. For n output points, n//2+1 input points are necessary. If the input is longer than this, it is cropped. If it is shorter than this, it is padded with zeros. If n is not given, it is taken to be 2*(m-1) where m is the length of the input along the axis specified by axis. Returns ------- Wake function object indexed with time. """ Z = (imp_obj.data['real'] + imp_obj.data['imag']*1j) / divided_by fs = (Z.index[-1]-Z.index[0]) / len(Z.index) sampling = Z.index[1] - Z.index[0] if nout is not None: Wlong_raw = sc.fft.irfft(np.array(Z), n=nout, axis=0) * nout * fs time_array = sc.fft.fftfreq(nout, sampling) else: Wlong_raw = sc.fft.irfft(np.array(Z), n=Z.size, axis=0) *Z.size*fs time_array = sc.fft.fftfreq(Z.size, sampling) Wlong = sc.fft.fftshift(Wlong_raw) time = sc.fft.fftshift(time_array) super().__init__(variable=time, function=Wlong) self.data.index.name = "time [s]" class Impedance(ComplexData): """ Define an Impedance object based on a ComplexData object. """ def __init__(self, variable=np.array([-1e15, 1e15]), function=np.array([0, 0]), impedance_type='long'): super().__init__(variable, function) self._impedance_type = impedance_type self.data.index.name = 'frequency [Hz]' self.initialize_impedance_coefficient() def initialize_impedance_coefficient(self): """ Define the impedance coefficients and the plane of the impedance that are presents in attributes of the class. """ table = self.impedance_name_and_coefficients_table() try: component_coefficients = table[self.impedance_type].to_dict() except KeyError: print('Impedance type {} does not exist'.format(self.impedance_type)) self.a = component_coefficients["a"] self.b = component_coefficients["b"] self.c = component_coefficients["c"] self.d = component_coefficients["d"] self.plane = component_coefficients["plane"] def impedance_name_and_coefficients_table(self): """ Return a table associating the human readbale names of an impedance component and its associated coefficients and plane. """ component_dict = { 'long': {'a': 0, 'b': 0, 'c': 0, 'd': 0, 'plane': 'z'}, 'xcst': {'a': 0, 'b': 0, 'c': 0, 'd': 0, 'plane': 'x'}, 'ycst': {'a': 0, 'b': 0, 'c': 0, 'd': 0, 'plane': 'y'}, 'xdip': {'a': 1, 'b': 0, 'c': 0, 'd': 0, 'plane': 'x'}, 'ydip': {'a': 0, 'b': 1, 'c': 0, 'd': 0, 'plane': 'y'}, 'xydip': {'a': 0, 'b': 1, 'c': 0, 'd': 0, 'plane': 'x'}, 'yxdip': {'a': 1, 'b': 0, 'c': 0, 'd': 0, 'plane': 'y'}, 'xquad': {'a': 0, 'b': 0, 'c': 1, 'd': 0, 'plane': 'x'}, 'yquad': {'a': 0, 'b': 0, 'c': 0, 'd': 1, 'plane': 'y'}, 'xyquad': {'a': 0, 'b': 0, 'c': 0, 'd': 1, 'plane': 'x'}, 'yxquad': {'a': 0, 'b': 0, 'c': 1, 'd': 0, 'plane': 'y'}, } return pd.DataFrame(component_dict) def add(self, structure_to_add, beta_x=1, beta_y=1, method='zero'): """ Method to add two Impedance objects. The two structures are compared so that the addition is self-consistent. Beta functions can be precised as well. """ if isinstance(structure_to_add, (int, float, complex)): result = super().add(structure_to_add, method=method, index_name="frequency [Hz]") elif isinstance(structure_to_add, Impedance): if (self.impedance_type == structure_to_add.impedance_type): weight = (beta_x ** self.power_x) * (beta_y ** self.power_y) result = super().add(structure_to_add * weight, method=method, index_name="frequency [Hz]") else: warnings.warn(('The two Impedance objects do not have the ' 'same coordinates or plane or type. ' 'Returning initial Impedance object.'), UserWarning) result = self impedance_to_return = Impedance( result.data.index, result.data.real.values + 1j*result.data.imag.values, self.impedance_type) return impedance_to_return def __radd__(self, structure_to_add): return self.add(structure_to_add) def __add__(self, structure_to_add): return self.add(structure_to_add) def multiply(self, factor): """ Multiply a Impedance object with a float or an int. If the multiplication is done with something else, throw a warning. """ result = super().multiply(factor) impedance_to_return = Impedance( result.data.index, result.data.real.values + 1j*result.data.imag.values, self.impedance_type) return impedance_to_return def __mul__(self, factor): return self.multiply(factor) def __rmul__(self, factor): return self.multiply(factor) @property def impedance_type(self): return self._impedance_type @impedance_type.setter def impedance_type(self, value): self._impedance_type = value self.initialize_impedance_coefficient() @property def power_x(self): power_x = self.a/2 + self.c/2. if self.plane == 'x': power_x += 1./2. return power_x @property def power_y(self): power_y = self.b/2. + self.d/2. if self.plane == 'y': power_y += 1./2. return power_y def loss_factor(self, sigma): """ Compute the loss factor or the kick factor for a Gaussian bunch. Formulas from Eq. (2) p239 and Eq.(7) p240 of [1]. Parameters ---------- sigma : float RMS bunch length in [s] Returns ------- kloss: float Loss factor in [V/C] or kick factor in [V/C/m] depanding on the impedance type. References ---------- [1] : Handbook of accelerator physics and engineering, 3rd printing. """ positive_index = self.data.index > 0 frequency = self.data.index[positive_index] sd = spectral_density(frequency, sigma, m=0) if(self.impedance_type == "long"): kloss = trapz(x = frequency, y = 2*self.data["real"][positive_index]*sd) elif(self.impedance_type == "xdip" or self.impedance_type == "ydip"): kloss = trapz(x = frequency, y = 2*self.data["imag"][positive_index]*sd) elif(self.impedance_type == "xquad" or self.impedance_type == "yquad"): kloss = trapz(x = frequency, y = 2*self.data["imag"][positive_index]*sd) else: raise TypeError("Impedance type not recognized.") return kloss def get_impedance(self, Wp_filename, axis0='dist', axis0_scale=1e-3, axis1_scale=1e-12, bunch_length=1e-3, bunch_charge=1.44e-9, stopfreq=200e9): """ Compute impedance from wake potential data. Parameters ---------- Wp_filename : str Text file that contains wake potential data. axis0 : {'dist', 'time'}, optional Viariable of the data file's first column. Use 'dist' for spacial distance. Otherwise, 'time' for temporal distance. axis0_scale : float, optional Scale of the first column with respect to meter if axis0 = 'dist', or second if axis0 = 'time'. axis1_scale : float, optional Scale of the wake potential (in second column) with respect to V/C. bunch_length : float, optional Electron bunch lenth [m]. bunch_charge : float, optional Total bunch charge [C]. stopfreq : float, optional The maximum frequency for calculating the impedance. Returns ------- Impedance object indexed with frequency. """ s, wp0 = np.loadtxt(Wp_filename, unpack=True) if axis0 == 'dist': tau = s*axis0_scale/c elif axis0 == 'time': tau = s*axis0_scale else: raise TypeError('Type of axis 0 not recognized.') wp = wp0 * axis1_scale # FT of wake potential sampling = tau[1] - tau[0] freq = sc.fft.rfftfreq(len(tau), sampling) dtau = (tau[-1]-tau[0])/len(tau) dft = sc.fft.rfft(wp) * dtau # FT of analytical bunch profile and analytical impedance sigma = bunch_length/c mu = tau[0] index_lim = freq < stopfreq freq_trun = freq[index_lim] dft_trun = dft[index_lim] dft_rho_eq_trun = np.exp(-0.5*(sigma*2*np.pi*freq_trun)**2 + \ 1j*mu*2*np.pi*freq_trun)*bunch_charge imp = dft_trun/dft_rho_eq_trun*-1*bunch_charge super().__init__(variable=freq_trun, function=imp) self.data.index.name = "frequency [Hz]" class Wakefield: """ Defines a Wakefield which corresponds to a single physical element which produces different types of wakes, represented by their Impedance or WakeFunction objects. Parameters ---------- structure_list : list, optional list of Impedance/WakeFunction objects to add to the Wakefield. name : str, optional Attributes ---------- impedance_components : array of str Impedance component names present for this element. Methods ------- append_to_model(structure_to_add) Add Impedance/WakeFunction to Wakefield. list_to_attr(structure_list) Add list of Impedance/WakeFunction to Wakefield. add_wakefields(wake1, beta1, wake2, beta2) Add two Wakefields taking into account beta functions. add_several_wakefields(wakefields, beta) Add a list of Wakefields taking into account beta functions. """ def __init__(self, structure_list=None, name=None): self.list_to_attr(structure_list) self.name = name def append_to_model(self, structure_to_add): """ Add Impedance/WakeFunction component to Wakefield. Parameters ---------- structure_to_add : Impedance or WakeFunction object """ list_of_attributes = dir(self) if isinstance(structure_to_add, Impedance): attribute_name = "Z" + structure_to_add.impedance_type if attribute_name in list_of_attributes: raise ValueError("There is already a component of the type " "{} in this element.".format(attribute_name)) else: self.__setattr__(attribute_name, structure_to_add) elif isinstance(structure_to_add, WakeFunction): attribute_name = "W" + structure_to_add.wake_type if attribute_name in list_of_attributes: raise ValueError("There is already a component of the type " "{} in this element.".format(attribute_name)) else: self.__setattr__(attribute_name, structure_to_add) else: raise ValueError("{} is not an Impedance nor a WakeFunction.".format(structure_to_add)) def list_to_attr(self, structure_list): """ Add list of Impedance/WakeFunction components to Wakefield. Parameters ---------- structure_list : list of Impedance or WakeFunction objects. """ if structure_list is not None: for component in structure_list: self.append_to_model(component) @property def impedance_components(self): """ Return an array of the impedance component names for the element. """ return np.array([comp for comp in dir(self) if re.match(r'[Z]', comp)]) @staticmethod def add_wakefields(wake1, beta1, wake2, beta2): """ Add two Wakefields taking into account beta functions. Parameters ---------- wake1 : Wakefield Wakefield to add. beta1 : array of shape (2,) Beta functions at wake1 postion. wake2 : Wakefield Wakefield to add. beta2 : array of shape (2,) Beta functions at wake2 postion. Returns ------- wake_sum : Wakefield Wakefield with summed components. """ wake_sum = deepcopy(wake1) for component_name1 in wake1.impedance_components: impedance1 = getattr(wake_sum, component_name1) weight1 = ((beta1[0] ** impedance1.power_x) * (beta1[1] ** impedance1.power_y)) setattr(wake_sum, component_name1, weight1*impedance1) for component_name2 in wake2.impedance_components: impedance2 = getattr(wake2, component_name2) weight2 = ((beta2[0] ** impedance2.power_x) * (beta2[1] ** impedance2.power_y)) try: impedance1 = getattr(wake_sum, component_name2) setattr(wake_sum, component_name2, impedance1 + weight2*impedance2) except AttributeError: setattr(wake_sum, component_name2, weight2*impedance2) return wake_sum @staticmethod def add_several_wakefields(wakefields, beta): """ Add a list of Wakefields taking into account beta functions. Parameters ---------- wakefields : list of Wakefield Wakefields to add. beta : array of shape (len(wakefields), 2) Beta functions at the Wakefield postions. Returns ------- wake_sum : Wakefield Wakefield with summed components.. """ if len(wakefields) == 1: return wakefields[0] elif len(wakefields) > 1: wake_sum = Wakefield.add_wakefields(wakefields[0], beta[:,0], wakefields[1], beta[:,1]) for i in range(len(wakefields) - 2): wake_sum = Wakefield.add_wakefields(wake_sum, [1 ,1], wakefields[i+2], beta[:,i+2]) return wake_sum else: raise ValueError("Error in input.") class Resonator(Wakefield): """ Compute analytical wake function of a resonator and return a Resonator object. Parameter --------- ring : Synchrotron object. Q_factor : float, optional Resonator quality factor. The default value is 1. f_res : float, optional Resonator resonance frequency in [Hz]. The default value is 10e9 Hz. R_shunt : float, optional Resonator shunt impedance in [Ohm]. The default value is 100 Ohm. """ def __init__(self, ring, Q_factor=1, f_res=10e9, R_shunt=100): super().__init__() self.ring = ring self.Q_factor = Q_factor self.omega_res = 2*np.pi*f_res self.R_shunt = R_shunt if Q_factor >= 0.5: self.Q_factor_p = np.sqrt(self.Q_factor**2 - 0.25) self.omega_res_p = (self.omega_res*self.Q_factor_p)/self.Q_factor else: self.Q_factor_pp = np.sqrt(0.25 - self.Q_factor**2) self.omega_res_p = (self.omega_res*self.Q_factor_pp)/self.Q_factor def init_timestop(self): """ Calculate an approximate position where the wake field depletes. """ self.timestop = round(np.log(1000)/self.omega_res*2*self.Q_factor, 12) def time_array(self): self.tau = np.linspace(start=-30e-12, stop=self.timestop, num=1000) def long_wakefunction(self): tau_neg = self.tau[np.where(self.tau<0)[0]] tau_pos = self.tau[np.where(self.tau>=0)[0]] wlong_tau_neg = np.zeros((len(tau_neg),)) if self.Q_factor >= 0.5: wlong_tau_pos = (self.omega_res*self.R_shunt/self.Q_factor)*\ np.exp(-self.omega_res*tau_pos/(2*self.Q_factor))*\ (np.cos(self.omega_res_p*tau_pos)-np.sin(self.omega_res_p*tau_pos)/ \ (2*self.Q_factor_p)) elif self.Q_factor < 0.5: wlong_tau_pos = (self.omega_res*self.R_shunt/self.Q_factor)*\ np.exp(-self.omega_res*tau_pos/(2*self.Q_factor))*\ (np.cosh(self.omega_res_p*tau_pos)-np.sinh(self.omega_res_p*tau_pos)/ \ (2*self.Q_factor_pp)) self.wlong = np.concatenate((wlong_tau_neg, wlong_tau_pos)) def check_wake_tail(self): """ Checking whether the full wake function is obtained by the calculated initial timestop. If not, extend the timestop by 50 ps. """ ratio = np.abs(max(self.wlong)) / np.abs(self.wlong[-6:-1]) while any(ratio < 1000): self.timestop += 50e-12 self.time_array() self.long_wakefunction() ratio = np.abs(max(self.wlong)) / np.abs(self.wlong[-6:-1]) def get_wakefunction(self): self.init_timestop() self.time_array() self.long_wakefunction() self.check_wake_tail() wake_func = WakeFunction(variable=self.tau, function=self.wlong) super().append_to_model(wake_func) class ImpedanceModel(Element): """ Define the impedance model of the machine. Parameters ---------- ring : Synchrotron object wakefield_list : list of Wakefield objects Wakefields to add to the model. wakefiled_postions : list Longitudinal positions corresponding to the added Wakfields. Attributes ---------- wakefields : list of Wakefield objects Wakefields in the model. positions : array Wakefields positions. names : array Names of (unique) Wakefield objects. sum : Wakefield Sum of every Wakefield in the model. sum_"name" : Wakefield Sum of every Wakefield with the same "name". sum_names : array Names of attributes where the wakefields are summed by name. Methods ------- add(wakefield_list, wakefiled_postions) Add elements to the model. add_multiple_elements(wakefield, wakefiled_postions) Add the same element at different locations to the model. sum_elements() Sum all wakefields into self.sum. update_name_list() Update self.names with uniques names of self.wakefields. find_wakefield(name) Return indexes of wakefields with the same name in self.wakefields. sum_by_name(name) Sum the elements with the same name in the model into sum_name. sum_by_name_all(name) Sum all the elements with the same name in the model into sum_name. plot_area(Z_type="Zlong", component="real", sigma=None, attr_list=None) Plot the contributions of different kind of wakefields. """ def __init__(self, ring, wakefield_list=None, wakefiled_postions=None): self.ring = ring self.optics = self.ring.optics self.wakefields = [] self.positions = np.array([]) self.names = np.array([]) self.sum_names = np.array([]) self.add(wakefield_list, wakefiled_postions) def track(self, beam): """ Track a beam object through this Element. Parameters ---------- beam : Beam object """ raise NotImplementedError def sum_elements(self): """Sum all wakefields into self.sum""" beta = self.optics.beta(self.positions) self.sum = Wakefield.add_several_wakefields(self.wakefields, beta) if "sum" not in self.sum_names: self.sum_names = np.append(self.sum_names, "sum") def update_name_list(self): """Update self.names with uniques names of self.wakefields.""" for wakefield in self.wakefields: if wakefield.name is None: continue if wakefield.name not in self.names: self.names = np.append(self.names, wakefield.name) def count_elements(self): """Count number of each type of wakefield in the model.""" self.count = np.zeros(len(self.names)) for wakefield in self.wakefields: if wakefield.name is not None: self.count += (wakefield.name == self.names).astype(int) def find_wakefield(self, name): """ Return indexes of wakefields with the same name in self.wakefields. Parameters ---------- name : str Wakefield name. Returns ------- index : list Index of positions in self.wakefields. """ index = [] for i, wakefield in enumerate(self.wakefields): if wakefield.name == name: index.append(i) return index def sum_by_name(self, name): """ Sum the elements with the same name in the model into sum_name. Parameters ---------- name : str Name of the wakefield to sum. """ attribute_name = "sum_" + name index = self.find_wakefield(name) beta = self.optics.beta(self.positions[index]) wakes = [] for i in index: wakes.append(self.wakefields[i]) wake_sum = Wakefield.add_several_wakefields(wakes, beta) setattr(self, attribute_name, wake_sum) if attribute_name not in self.sum_names: self.sum_names = np.append(self.sum_names, attribute_name) def sum_by_name_all(self): """ Sum all the elements with the same name in the model into sum_name. """ for name in self.names: self.sum_by_name(name) def add(self, wakefield_list, wakefiled_postions): """ Add elements to the model. Parameters ---------- wakefield_list : list of Wakefield objects Wakefields to add to the model. wakefiled_postions : list Longitudinal positions corresponding to the added Wakfields. """ if (wakefield_list is not None) and (wakefiled_postions is not None): for wakefield in wakefield_list: self.wakefields.append(wakefield) for position in wakefiled_postions: self.positions = np.append(self.positions, position) self.update_name_list() def add_multiple_elements(self, wakefield, wakefiled_postions): """ Add the same element at different locations to the model. Parameters ---------- wakefield : Wakefield object Wakefield to add to the model. wakefiled_postions : list Longitudinal positions corresponding to the added Wakfield. """ for position in wakefiled_postions: self.positions = np.append(self.positions, position) self.wakefields.append(wakefield) self.update_name_list() def plot_area(self, Z_type="Zlong", component="real", sigma=None, attr_list=None, zoom=False): """ Plot the contributions of different kind of wakefields. Parameters ---------- Z_type : str, optional Type of impedance to plot. component : str, optional Component to plot, can be "real" or "imag". sigma : float, optional RMS bunch length in [s] to use for the spectral density. If equal to None, the spectral density is not plotted. attr_list : list or array of str, optional Attributes to plot. zoom : bool If True, add a zoomed plot on top right corner. """ if attr_list is None: attr_list = self.sum_names[self.sum_names != "sum"] # manage legend Ztype_dict = {"Zlong":0, "Zxdip":1, "Zydip":2, "Zxquad":3, "Zyquad":4} scale = [1e-3, 1e-6, 1e-6, 1e-6, 1e-6] label_list = [r"$Z_{long} \; [k\Omega]$", r"$\sum_{j} \beta_{x,j} Z_{x,j}^{Dip} \; [M\Omega]$", r"$\sum_{j} \beta_{y,j} Z_{y,j}^{Dip} \; [M\Omega]$", r"$\sum_{j} \beta_{x,j} Z_{x,j}^{Quad} \; [M\Omega]$", r"$\sum_{j} \beta_{y,j} Z_{y,j}^{Quad} \; [M\Omega]$"] leg = Ztype_dict[Z_type] # sort plot by decresing area size area = np.zeros((len(attr_list),)) for index, attr in enumerate(attr_list): try: sum_imp = getattr(getattr(self, attr), Z_type) area[index] = trapz(sum_imp.data[component], sum_imp.data.index) except AttributeError: pass sorted_index = area.argsort()[::-1] # Init fig fig = plt.figure() ax = fig.gca() zero_impedance = getattr(self.sum, Z_type)*0 total_imp = 0 legend = [] if sigma is not None: legend.append("Spectral density for sigma = " + str(sigma) + " s") # Main plot for index in sorted_index: attr = attr_list[index] # Set all impedances with common indexes using + zero_impedance try: sum_imp = getattr(getattr(self, attr), Z_type) + zero_impedance ax.fill_between(sum_imp.data.index*1e-9, total_imp, total_imp + sum_imp.data[component]*scale[leg]) total_imp += sum_imp.data[component]*scale[leg] if attr[:4] == "sum_": legend.append(attr[4:]) else: legend.append(attr) except AttributeError: pass if sigma is not None: spect = spectral_density(zero_impedance.data.index, sigma) spect = spect/spect.max()*total_imp.max() ax.plot(sum_imp.data.index*1e-9, spect, 'r', linewidth=2.5) ax.legend(legend, loc="upper left") ax.set_xlabel("Frequency [GHz]") ax.set_ylabel(label_list[leg] + " - " + component + " part") ax.set_title(label_list[leg] + " - " + component + " part") if zoom is True: in_ax = inset_axes(ax, width="30%", # width = 30% of parent_bbox height=1.5, # height : 1 inch loc=1) total_imp = 0 for index in sorted_index: attr = attr_list[index] # Set all impedances with common indexes using + zero_impedance try: sum_imp = getattr(getattr(self, attr), Z_type) + zero_impedance in_ax.fill_between(sum_imp.data.index*1e-3, total_imp, total_imp + sum_imp.data[component]*1e-9) total_imp += sum_imp.data[component]*1e-9 except AttributeError: pass in_ax.set_xlim([0, 200]) in_ax.set_xlabel("Frequency [kHz]") in_ax.set_ylabel(r"$[G\Omega]$") return fig def effective_impedance(self, m, mu, sigma, M, tuneS, xi=None, mode="Hermite"): """ Compute the longitudinal and transverse effective impedance. Parameters ---------- mu : int coupled bunch mode number, goes from 0 to (M-1) where M is the number of bunches m : int head-tail (or azimutal/synchrotron) mode number sigma : float RMS bunch length in [s] M : int Number of bunches. tuneS : float Synchrotron tune. xi : float, optional (non-normalized) chromaticity mode: str, optional type of the mode taken into account for the computation: -"Hermite" modes for Gaussian bunches Returns ------- summary : DataFrame Longitudinal and transverse effective impedance. """ attr_list = self.sum_names eff_array = np.zeros((len(attr_list),3), dtype=complex) for i, attr in enumerate(attr_list): try: impedance = getattr(getattr(self, attr), "Zlong") eff_array[i,0] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass try: impedance = getattr(getattr(self, attr), "Zxdip") eff_array[i,1] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass try: impedance = getattr(getattr(self, attr), "Zydip") eff_array[i,2] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass eff_array[:,0] = eff_array[:,0]*self.ring.omega0*1e3 eff_array[:,1] = eff_array[:,1]*1e-3 eff_array[:,2] = eff_array[:,2]*1e-3 summary = pd.DataFrame(eff_array, index=attr_list, columns=["Z/n [mOhm]", "sum betax x Zxeff [kOhm]", "sum betay x Zyeff [kOhm]"]) return summary def energy_loss(self, sigma, M, bunch_spacing, I, n_points=10e6): """ Compute the beam and bunch loss factor and energy losses for each type of element in the model assuming Gaussian bunches and constant spacing between bunches. Parameters ---------- sigma : float RMS bunch length in [s]. M : int Number of bunches in the beam. bunch_spacing : float Time between two bunches in [s]. I : float Total beam current in [A]. n_points : float, optional Number of points used in the frequency spectrums. Returns ------- summary : Dataframe Contains the beam and bunch loss factor and energy loss for the full model and for each type of different component. """ fmax = self.sum.Zlong.data.index.max() fmin = self.sum.Zlong.data.index.min() Q = I*self.ring.T0/M if fmin >= 0: fmin = -1*fmax f = np.linspace(fmin, fmax, int(n_points)) beam_spect = beam_spectrum(f, M, bunch_spacing, sigma= sigma) bunch_spect = gaussian_bunch_spectrum(f, sigma) attr_list = self.sum_names loss_array = np.zeros((len(attr_list),2)) for i, attr in enumerate(attr_list): try: impedance = getattr(getattr(self, attr), "Zlong") loss_array[i,0] = beam_loss_factor(impedance, f, beam_spect, self.ring) loss_array[i,1] = beam_loss_factor(impedance, f, bunch_spect, self.ring) except AttributeError: pass loss_array = loss_array*1e-12 summary = pd.DataFrame(loss_array, index=attr_list, columns=["loss factor (beam) [V/pC]", "loss factor (bunch) [V/pC]"]) summary["P (beam) [W]"] = summary["loss factor (beam) [V/pC]"]*1e12*Q**2/(self.ring.T0) summary["P (bunch) [W]"] = summary["loss factor (bunch) [V/pC]"]*1e12*Q**2/(self.ring.T0)*M return summary def read_CST(file, impedance_type='long', divide_by=None): """ Read CST file format into an Impedance object. Parameters ---------- file : str Path to the file to read. impedance_type : str, optional Type of the Impedance object. divide_by : float, optional Divide the impedance by a value. Mainly used to normalize transverse impedance by displacement. Returns ------- result : Impedance object Data from file. """ df = pd.read_csv(file, comment="#", header = None, sep = "\t", names = ["Frequency","Real","Imaginary"]) df["Frequency"] = df["Frequency"]*1e9 if divide_by is not None: df["Real"] = df["Real"]/divide_by df["Imaginary"] = df["Imaginary"]/divide_by if impedance_type == "long": df["Real"] = np.abs(df["Real"]) df.set_index("Frequency", inplace = True) result = Impedance(variable = df.index, function = df["Real"] + 1j*df["Imaginary"], impedance_type=impedance_type) return result def read_IW2D(file, impedance_type='long'): """ Read IW2D file format into an Impedance object. Parameters ---------- file : str Path to the file to read. impedance_type : str, optional Type of the Impedance object. Returns ------- result : Impedance object Data from file. """ df = pd.read_csv(file, sep = ' ', header = None, names = ["Frequency","Real","Imaginary"], skiprows=1) df.set_index("Frequency", inplace = True) result = Impedance(variable = df.index, function = df["Real"] + 1j*df["Imaginary"], impedance_type=impedance_type) return result def spectral_density(frequency, sigma, m = 1, mode="Hermite"): """ Compute the spectral density of different modes for various values of the head-tail mode number, based on Table 1 p238 of [1]. Parameters ---------- frequency : list or numpy array sample points of the spectral density in [Hz] sigma : float RMS bunch length in [s] m : int, optional head-tail (or azimutal/synchrotron) mode number mode: str, optional type of the mode taken into account for the computation: -"Hermite" modes for Gaussian bunches Returns ------- numpy array References ---------- [1] : Handbook of accelerator physics and engineering, 3rd printing. """ if mode == "Hermite": return (2*np.pi*frequency*sigma)**(2*m)*np.exp( -1*(2*np.pi*frequency*sigma)**2) else: raise NotImplementedError("Not implemanted yet.") def gaussian_bunch_spectrum(frequency, sigma): """ Compute a Gaussian bunch spectrum [1]. Parameters ---------- frequency : array sample points of the beam spectrum in [Hz]. sigma : float RMS bunch length in [s]. Returns ------- bunch_spectrum : array Bunch spectrum sampled at points given in frequency. References ---------- [1] : Gamelin, A. (2018). Collective effects in a transient microbunching regime and ion cloud mitigation in ThomX. p86, Eq. 4.19 """ return np.exp(-1/2*(2*np.pi*frequency)**2*sigma**2) def beam_spectrum(frequency, M, bunch_spacing, sigma=None, bunch_spectrum=None): """ Compute the beam spectrum assuming constant spacing between bunches [1]. Parameters ---------- frequency : list or numpy array sample points of the beam spectrum in [Hz]. M : int Number of bunches. bunch_spacing : float Time between two bunches in [s]. sigma : float, optional If bunch_spectrum is None then a Gaussian bunch with sigma RMS bunch length in [s] is assumed. bunch_spectrum : array, optional Bunch spectrum sampled at points given in frequency. Returns ------- beam_spectrum : array References ---------- [1] Rumolo, G - Beam Instabilities - CAS - CERN Accelerator School: Advanced Accelerator Physics Course - 2014, Eq. 9 """ if bunch_spectrum is None: bunch_spectrum = gaussian_bunch_spectrum(frequency, sigma) beam_spectrum = (bunch_spectrum * np.exp(1j*np.pi*frequency * bunch_spacing*(M-1)) * np.sin(M*np.pi*frequency*bunch_spacing) / np.sin(np.pi*frequency*bunch_spacing)) return beam_spectrum def effective_impedance(ring, imp, m, mu, sigma, M, tuneS, xi=None, mode="Hermite"): """ Compute the effective (longitudinal or transverse) impedance. Formulas from Eq. (1) and (2) p238 of [1]. Parameters ---------- ring : Synchrotron object imp : Impedance object mu : int coupled bunch mode number, goes from 0 to (M-1) where M is the number of bunches m : int head-tail (or azimutal/synchrotron) mode number sigma : float RMS bunch length in [s] M : int Number of bunches. tuneS : float Synchrotron tune. xi : float, optional (non-normalized) chromaticity mode: str, optional type of the mode taken into account for the computation: -"Hermite" modes for Gaussian bunches Returns ------- Zeff : float effective impedance in [ohm] or in [ohm/m] depanding on the impedance type. References ---------- [1] : Handbook of accelerator physics and engineering, 3rd printing. """ if not isinstance(imp, Impedance): raise TypeError("{} should be an Impedance object.".format(imp)) fmin = imp.data.index.min() fmax = imp.data.index.max() if fmin > 0: double_sided_impedance(imp) if mode == "Hermite": def h(f): return spectral_density(frequency=f, sigma=sigma, m=m, mode="Hermite") else: raise NotImplementedError("Not implemanted yet.") pmax = fmax/(ring.f0 * M) - 1 pmin = fmin/(ring.f0 * M) + 1 p = np.arange(pmin,pmax+1) if imp.impedance_type == "long": fp = ring.f0*(p*M + mu + m*tuneS) fp = fp[np.nonzero(fp)] # Avoid division by 0 num = np.sum( imp(fp) * h(fp) / (fp*2*np.pi) ) den = np.sum( h(fp) ) Zeff = num/den elif imp.impedance_type == "xdip" or imp.impedance_type == "ydip": if imp.impedance_type == "xdip": tuneXY = ring.tune[0] if xi is None : xi = ring.chro[0] elif imp.impedance_type == "ydip": tuneXY = ring.tune[1] if xi is None: xi = ring.chro[1] fp = ring.f0*(p*M + mu + tuneXY + m*tuneS) f_xi = xi/ring.eta*ring.f0 num = np.sum( imp(fp) * h(fp - f_xi) ) den = np.sum( h(fp - f_xi) ) Zeff = num/den else: raise TypeError("Effective impedance is only defined for long, xdip" " and ydip impedance type.") return Zeff def yokoya_elliptic(x_radius , y_radius): """ Compute Yokoya factors for an elliptic beam pipe. Function adapted from N. Mounet IW2D. Parameters ---------- x_radius : float Horizontal semi-axis of the ellipse in [m]. y_radius : float Vertical semi-axis of the ellipse in [m]. Returns ------- yoklong : float Yokoya factor for the longitudinal impedance. yokxdip : float Yokoya factor for the dipolar horizontal impedance. yokydip : float Yokoya factor for the dipolar vertical impedance. yokxquad : float Yokoya factor for the quadrupolar horizontal impedance. yokyquad : float Yokoya factor for the quadrupolar vertical impedance. """ if y_radius < x_radius: small_semiaxis = y_radius large_semiaxis = x_radius else: small_semiaxis = x_radius large_semiaxis = y_radius # read Yokoya factors interpolation file # BEWARE: columns are ratio, dipy, dipx, quady, quadx yokoya_file = pd.read_csv("collective_effects/data/" + "Yokoya_elliptic_from_Elias_USPAS.csv") ratio_col = yokoya_file["x"] # compute semi-axes ratio (first column of this file) ratio = (large_semiaxis - small_semiaxis)/(large_semiaxis + small_semiaxis) # interpolate Yokoya file at the correct ratio yoklong = 1 if y_radius < x_radius: yokydip = np.interp(ratio, ratio_col, yokoya_file["dipy"]) yokxdip = np.interp(ratio, ratio_col, yokoya_file["dipx"]) yokyquad = np.interp(ratio, ratio_col, yokoya_file["quady"]) yokxquad = np.interp(ratio, ratio_col, yokoya_file["quadx"]) else: yokxdip = np.interp(ratio, ratio_col, yokoya_file["dipy"]) yokydip = np.interp(ratio, ratio_col, yokoya_file["dipx"]) yokxquad = np.interp(ratio, ratio_col, yokoya_file["quady"]) yokyquad = np.interp(ratio, ratio_col, yokoya_file["quadx"]) return (yoklong, yokxdip, yokydip, yokxquad, yokyquad) def beam_loss_factor(impedance, frequency, spectrum, ring): """ Compute "beam" loss factor using the beam spectrum, uses a sum instead of integral compared to loss_factor [1]. Parameters ---------- impedance : Impedance of type "long" frequency : array Sample points of spectrum. spectrum : array Beam spectrum to consider. ring : Synchrotron object Returns ------- kloss_beam : float Beam loss factor in [V/C]. References ---------- [1] : Handbook of accelerator physics and engineering, 3rd printing. Eq (3) p239. """ pmax = np.floor(impedance.data.index.max()/ring.f0) pmin = np.floor(impedance.data.index.min()/ring.f0) if pmin >= 0: double_sided_impedance(impedance) pmin = -1*pmax p = np.arange(pmin+1,pmax) pf0 = p*ring.f0 ReZ = np.real(impedance(pf0)) spectral_density = np.abs(spectrum)**2 # interpolation of the spectrum is needed to avoid problems liked to # division by 0 # computing the spectrum directly to the frequency points gives # wrong results spect = interp1d(frequency, spectral_density) kloss_beam = ring.f0 * np.sum(ReZ*spect(pf0)) return kloss_beam def double_sided_impedance(impedance): """ Add negative frequency points to single sided impedance spectrum following symetries depending on impedance type. Parameters ---------- impedance : Impedance object Single sided impedance. """ fmin = impedance.data.index.min() if fmin >= 0: negative_index = impedance.data.index*-1 negative_data = impedance.data.set_index(negative_index) imp_type = impedance.impedance_type if imp_type == "long": negative_data["imag"] = -1*negative_data["imag"] elif (imp_type == "xdip") or (imp_type == "ydip"): negative_data["real"] = -1*negative_data["real"] elif (imp_type == "xquad") or (imp_type == "yquad"): negative_data["real"] = -1*negative_data["real"] else: raise ValueError("Wrong impedance type") try: negative_data = negative_data.drop(0) except KeyError: pass all_data = impedance.data.append(negative_data) all_data = all_data.sort_index() impedance.data = all_data