Skip to content
Snippets Groups Projects
  • Gamelin Alexis's avatar
    f980290c
    Bugfix · f980290c
    Gamelin Alexis authored
    Correct scale in plot_wakedata
    Avoid circular import by moving import in function
    f980290c
    History
    Bugfix
    Gamelin Alexis authored
    Correct scale in plot_wakedata
    Avoid circular import by moving import in function
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
wakefield.py 27.97 KiB
# -*- coding: utf-8 -*-
"""
This module defines general classes to describes wakefields, impedances and 
wake functions.

@author: David Amorin, Alexis Gamelin, Watanyu Foosang
@date: 24/09/2020
"""

import warnings
import re
import pandas as pd
import numpy as np
import scipy as sc
from copy import deepcopy
from scipy.interpolate import interp1d
from scipy.integrate import trapz
from scipy.constants import c

class ComplexData:
    """
    Define a general data structure for a complex function based on a pandas 
    DataFrame.

    Parameters
    ----------
    variable : list, numpy array
       contains the function variable values

    function : list or numpy array of complex numbers
        contains the values taken by the complex function
    """

    def __init__(self, variable=np.array([-1e15, 1e15]),
                 function=np.array([0, 0])):
        self.data = pd.DataFrame({'real': np.real(function),
                                  'imag': np.imag(function)},
                                 index=variable)
        self.data.index.name = 'variable'

    def add(self, structure_to_add, method='zero', interp_kind='cubic', 
            index_name="variable"):
        """
        Method to add two structures. If the data don't have the same length,
        different cases are handled.
        
        Parameters
        ----------
        structure_to_add : ComplexData object, int, float or complex.
            structure to be added.
        method : str, optional
            manage how the addition is done, possibilties are: 
            -common: the common range of the index (variable) from the two
                structures is used. The input data are cross-interpolated
                so that the result data contains the points at all initial
                points in the common domain.
            -extrapolate: extrapolate the value of both ComplexData.
            -zero: outside of the common range, the missing values are zero. In 
                the common range, behave as "common" method.
        interp_kind : str, optional
            interpolation method which is passed to pandas and to
            scipy.interpolate.interp1d.
        index_name : str, optional
            name of the dataframe index passed to the method
            
        Returns
        -------
        ComplexData 
            Contains the sum of the two inputs.
        """

        # Create first two new DataFrames merging the variable
        # from the two input data

        if isinstance(structure_to_add, (int, float, complex)):
            structure_to_add = ComplexData(variable=self.data.index,
                                           function=(structure_to_add * 
                                                     np.ones(len(self.data.index))))
                                
        data_to_concat = structure_to_add.data.index.to_frame().set_index(index_name)
        
        initial_data = pd.concat([self.data, data_to_concat], sort=True)
        initial_data = initial_data[~initial_data.index.duplicated(keep='first')]
        initial_data = initial_data.sort_index()

        data_to_add = pd.concat(
                        [structure_to_add.data,
                         self.data.index.to_frame().set_index(index_name)],
                        sort=True)
        data_to_add = data_to_add[~data_to_add.index.duplicated(keep='first')]
        data_to_add = data_to_add.sort_index()

        if method == 'common':
            max_variable = min(structure_to_add.data.index.max(),
                               self.data.index.max())

            min_variable = max(structure_to_add.data.index.min(),
                               self.data.index.min())

            initial_data = initial_data.interpolate(method=interp_kind)
            mask = ((initial_data.index <= max_variable)
                    & (initial_data.index >= min_variable))
            initial_data = initial_data[mask]

            data_to_add = data_to_add.interpolate(method=interp_kind)
            mask = ((data_to_add.index <= max_variable)
                    & (data_to_add.index >= min_variable))
            data_to_add = data_to_add[mask]

            result_structure = ComplexData()
            result_structure.data = initial_data + data_to_add
            return result_structure

        if method == 'extrapolate':
            print('Not there yet')
            return self
        
        if method == 'zero':
            max_variable = min(structure_to_add.data.index.max(),
                               self.data.index.max())

            min_variable = max(structure_to_add.data.index.min(),
                               self.data.index.min())

            mask = ((initial_data.index <= max_variable)
                    & (initial_data.index >= min_variable))
            initial_data[mask] = initial_data[mask].interpolate(method=interp_kind)

            mask = ((data_to_add.index <= max_variable)
                    & (data_to_add.index >= min_variable))
            data_to_add[mask] = data_to_add[mask].interpolate(method=interp_kind)
            
            initial_data.replace(to_replace=np.nan, value=0, inplace=True)
            data_to_add.replace(to_replace=np.nan, value=0, inplace=True)
            
            result_structure = ComplexData()
            result_structure.data = initial_data + data_to_add
            return result_structure

    def __radd__(self, structure_to_add):
        return self.add(structure_to_add, method='zero')

    def __add__(self, structure_to_add):
        return self.add(structure_to_add, method='zero')

    def multiply(self, factor):
        """
        Multiply a data strucure with a float or an int.
        If the multiplication is done with something else, throw a warning.
        """
        if isinstance(factor, (int, float)):
            result_structure = ComplexData()
            result_structure.data = self.data * factor
            return result_structure
        else:
            warnings.warn(('The multiplication factor is not a float '
                           'or an int.'), UserWarning)
            return self

    def __mul__(self, factor):
        return self.multiply(factor)

    def __rmul__(self, factor):
        return self.multiply(factor)
    
    def __call__(self, values, interp_kind="cubic"):
        """
        Interpolation of the data by calling the class to have a function-like
        behaviour.
        
        Parameters
        ----------
        values : list or numpy array of complex, int or float
            values to be interpolated.
        interp_kind : str, optional
            interpolation method which is passed to scipy.interpolate.interp1d.
            
        Returns
        -------
        numpy array 
            Contains the interpolated data.
        """
        real_func = interp1d(x = self.data.index, 
                             y = self.data["real"], kind=interp_kind)
        imag_func = interp1d(x = self.data.index, 
                             y = self.data["imag"], kind=interp_kind)
        return real_func(values) + 1j*imag_func(values)
    
class WakeFunction(ComplexData):
    """
    Define a WakeFunction object based on a ComplexData object.
    
    Parameters
    ----------
    variable : array-like
        Time domain structure of the wake function in [s].
    function : array-like
        Wake function values in [V/C].
    wake_type : str, optinal
        Type of the wake function: "long", "xdip", "xquad", ...
    
    Attributes
    ----------
    data : DataFrame
    wake_type : str
    
    Methods
    -------
    from_wakepotential(file_name, bunch_length, bunch_charge, freq_lim)
        Compute a wake function from a wake potential file and load it to the 
        WakeFunction object.
    """

    def __init__(self,
                 variable=np.array([-1e15, 1e15]),
                 function=np.array([0, 0]), wake_type='long'):
        super().__init__(variable, function)
        self._wake_type = wake_type
        self.data.index.name = "time [s]"
        
    def add(self, structure_to_add, method='zero'):
        """
        Method to add two WakeFunction objects. The two structures are
        compared so that the addition is self-consistent.
        """
 
        if isinstance(structure_to_add, (int, float, complex)):
            result = super().add(structure_to_add, method=method,
                                 index_name="time [s]")
        elif isinstance(structure_to_add, WakeFunction):
            if (self.wake_type == structure_to_add.wake_type):
                result = super().add(structure_to_add, method=method,
                                     index_name="time [s]")
            else:
                warnings.warn(('The two WakeFunction objects do not have the '
                               'same coordinates or plane or type. '
                               'Returning initial WakeFunction object.'),
                              UserWarning)
                result = self
               
        wake_to_return = WakeFunction(
                                result.data.index,
                                result.data.real.values,
                                self.wake_type)   
        return wake_to_return
 
    def __radd__(self, structure_to_add):
        return self.add(structure_to_add)
 
    def __add__(self, structure_to_add):
        return self.add(structure_to_add)
 
    def multiply(self, factor):
        """
        Multiply a WakeFunction object with a float or an int.
        If the multiplication is done with something else, throw a warning.
        """
        result = super().multiply(factor)
        wake_to_return = WakeFunction(
                                result.data.index,
                                result.data.real.values,
                                self.wake_type)   
        return wake_to_return
 
    def __mul__(self, factor):
        return self.multiply(factor)
 
    def __rmul__(self, factor):
        return self.multiply(factor)
       
    @property
    def wake_type(self):
        return self._wake_type
   
    @wake_type.setter
    def wake_type(self, value):
        self._wake_type = value
        
    def from_wakepotential(self, file_name, bunch_length, bunch_charge, 
                           freq_lim, nout=None, trim=False):
        """
        Compute a wake function from a wake potential file and load it to the 
        WakeFunction object.
    
        Parameters
        ----------
        file_name : str
            Text file that contains wake potential data.
        bunch_length : float
            Spatial bunch length in [m].
        bunch_charge : float
            Bunch charge in [C].
        freq_lim : float
            The maximum frequency for calculating the impedance [Hz].
        nout : int, optional
            Length of the transformed axis of the output. If None, it is taken
            to be 2*(n-1) where n is the length of the input. If nout > n, the
            input is padded with zeros. If nout < n, the inpu it cropped.
            Note that, for any nout value, nout//2+1 input points are required.
        trim : bool or float, optional
            If True, the pseudo wake function is trimmed at time=0 and is forced 
            to zero where time<0. 
            If False, the original result is preserved.
            If a float is given, the pseudo wake function is trimmed from 
            time <= trim to 0. 
    
        """
        
        imp = Impedance()
        imp.from_wakepotential(file_name=file_name, bunch_length=bunch_length,
                               bunch_charge=bunch_charge, freq_lim=freq_lim)
        wf = imp.to_wakefunction(nout=nout, trim=trim)
        self.__init__(variable=wf.data.index, function=wf.data["real"])
        
class Impedance(ComplexData):
    """
    Define an Impedance object based on a ComplexData object.
    
    Parameters
    ----------
    variable : array-like
        Frequency domain structure of the impedance in [Hz].
    function : array-like
        Impedance values in [Ohm].
    impedance_type : str, optinal
        Type of the impedance_type: "long", "xdip", "xquad", ...
    
    Attributes
    ----------
    data : DataFrame
    impedance_type : str
    
    Methods
    -------
    from_wakepotential(file_name, bunch_length, bunch_charge, freq_lim)
        Compute impedance from wake potential data and load it to the Impedance
        object.
    loss_factor(sigma)
        Compute the loss factor or the kick factor for a Gaussian bunch.
    to_wakefunction()
        Return a WakeFunction object from the impedance data.
    """

    def __init__(self,
                 variable=np.array([-1e15, 1e15]),
                 function=np.array([0, 0]), impedance_type='long'):
        super().__init__(variable, function)
        self._impedance_type = impedance_type
        self.data.index.name = 'frequency [Hz]'
        self.initialize_impedance_coefficient()


    def initialize_impedance_coefficient(self):
        """
        Define the impedance coefficients and the plane of the impedance that
        are presents in attributes of the class.
        """
        table = self.impedance_name_and_coefficients_table()
        
        try:
            component_coefficients = table[self.impedance_type].to_dict()
        except KeyError:
            print('Impedance type {} does not exist'.format(self.impedance_type))
        
        self.a = component_coefficients["a"]
        self.b = component_coefficients["b"]
        self.c = component_coefficients["c"]
        self.d = component_coefficients["d"]
        self.plane = component_coefficients["plane"]
                    
    def impedance_name_and_coefficients_table(self):
        """
        Return a table associating the human readbale names of an impedance
        component and its associated coefficients and plane.
        """

        component_dict = {
            'long': {'a': 0, 'b': 0, 'c': 0, 'd': 0, 'plane': 'z'},
            'xcst': {'a': 0, 'b': 0, 'c': 0, 'd': 0, 'plane': 'x'},
            'ycst': {'a': 0, 'b': 0, 'c': 0, 'd': 0, 'plane': 'y'},
            'xdip': {'a': 1, 'b': 0, 'c': 0, 'd': 0, 'plane': 'x'},
            'ydip': {'a': 0, 'b': 1, 'c': 0, 'd': 0, 'plane': 'y'},
            'xydip': {'a': 0, 'b': 1, 'c': 0, 'd': 0, 'plane': 'x'},
            'yxdip': {'a': 1, 'b': 0, 'c': 0, 'd': 0, 'plane': 'y'},
            'xquad': {'a': 0, 'b': 0, 'c': 1, 'd': 0, 'plane': 'x'},
            'yquad': {'a': 0, 'b': 0, 'c': 0, 'd': 1, 'plane': 'y'},
            'xyquad': {'a': 0, 'b': 0, 'c': 0, 'd': 1, 'plane': 'x'},
            'yxquad': {'a': 0, 'b': 0, 'c': 1, 'd': 0, 'plane': 'y'},
            }

        return pd.DataFrame(component_dict)

    def add(self, structure_to_add, beta_x=1, beta_y=1, method='zero'):
        """
        Method to add two Impedance objects. The two structures are
        compared so that the addition is self-consistent.
        Beta functions can be precised as well.
        """

        if isinstance(structure_to_add, (int, float, complex)):
            result = super().add(structure_to_add, method=method, 
                                 index_name="frequency [Hz]")
        elif isinstance(structure_to_add, Impedance):
            if (self.impedance_type == structure_to_add.impedance_type):
                weight = (beta_x ** self.power_x) * (beta_y ** self.power_y)
                result = super().add(structure_to_add * weight, method=method, 
                                     index_name="frequency [Hz]")
            else:
                warnings.warn(('The two Impedance objects do not have the '
                               'same coordinates or plane or type. '
                               'Returning initial Impedance object.'),
                              UserWarning)
                result = self
                
        impedance_to_return = Impedance(
                                result.data.index,
                                result.data.real.values + 1j*result.data.imag.values,
                                self.impedance_type)    
        return impedance_to_return

    def __radd__(self, structure_to_add):
        return self.add(structure_to_add)

    def __add__(self, structure_to_add):
        return self.add(structure_to_add)

    def multiply(self, factor):
        """
        Multiply a Impedance object with a float or an int.
        If the multiplication is done with something else, throw a warning.
        """
        result = super().multiply(factor)
        impedance_to_return = Impedance(
                                result.data.index,
                                result.data.real.values + 1j*result.data.imag.values,
                                self.impedance_type)    
        return impedance_to_return

    def __mul__(self, factor):
        return self.multiply(factor)

    def __rmul__(self, factor):
        return self.multiply(factor)
        
    @property
    def impedance_type(self):
        return self._impedance_type
    
    @impedance_type.setter
    def impedance_type(self, value):
        self._impedance_type = value
        self.initialize_impedance_coefficient()

    @property
    def power_x(self):
        power_x = self.a/2 + self.c/2.
        if self.plane == 'x':
            power_x += 1./2.
        return power_x

    @property
    def power_y(self):
        power_y = self.b/2. + self.d/2.
        if self.plane == 'y':
            power_y += 1./2.
        return power_y
    
    def loss_factor(self, sigma):
        """
        Compute the loss factor or the kick factor for a Gaussian bunch. 
        Formulas from Eq. (2) p239 and Eq.(7) p240 of [1].
        
        Parameters
        ----------
        sigma : float
            RMS bunch length in [s]
        
        Returns
        -------
        kloss: float
            Loss factor in [V/C] or kick factor in [V/C/m] depanding on the 
            impedance type.
            
        References
        ----------
        [1] : Handbook of accelerator physics and engineering, 3rd printing.
        """
        
        positive_index = self.data.index > 0
        frequency = self.data.index[positive_index]
        
        # Import here to avoid circular import
        from mbtrack2.collective_effects.utilities import spectral_density
        sd = spectral_density(frequency, sigma, m=0)
        
        if(self.impedance_type == "long"):
            kloss = trapz(x = frequency, 
                          y = 2*self.data["real"][positive_index]*sd)
        elif(self.impedance_type == "xdip" or self.impedance_type == "ydip"):
            kloss = trapz(x = frequency, 
                          y = 2*self.data["imag"][positive_index]*sd)
        elif(self.impedance_type == "xquad" or self.impedance_type == "yquad"):
            kloss = trapz(x = frequency, 
                          y = 2*self.data["imag"][positive_index]*sd)
        else:
            raise TypeError("Impedance type not recognized.")

        return kloss
    
    def from_wakepotential(self, file_name, bunch_length, bunch_charge, 
                           freq_lim, axis0='dist', axis0_scale=1e-3, 
                           axis1_scale=1e-12):
        """
        Compute impedance from wake potential data and load it to the Impedance
        object.

        Parameters
        ----------
        file_name : str
            Text file that contains wake potential data.
        freq_lim : float
            The maximum frequency for calculating the impedance [Hz].
        bunch_length : float
            Electron bunch lenth [m]. 
        bunch_charge : float
            Total bunch charge [C].
        axis0 : {'dist', 'time'}, optional
            Viariable of the data file's first column. Use 'dist' for spacial 
            distance. Otherwise, 'time' for temporal distance. 
        axis0_scale : float, optional
            Scale of the first column with respect to meter if axis0 = 'dist',
            or to second if axis0 = 'time'.
        axis1_scale : float, optional
            Scale of the wake potential (in second column) with respect to V/C.

        """
        
        s, wp0 = np.loadtxt(file_name, unpack=True)
        if axis0 == 'dist':
            tau = s*axis0_scale/c
        elif axis0 == 'time':
            tau = s*axis0_scale
        else:
            raise TypeError('Type of axis 0 not recognized.')
            
        wp = wp0 / axis1_scale
        
        # FT of wake potential
        sampling = tau[1] - tau[0]
        freq = sc.fft.rfftfreq(len(tau), sampling)
        dtau = (tau[-1]-tau[0])/len(tau)
        dft_wake = sc.fft.rfft(wp) * dtau
        
        # FT of analytical bunch profile and analytical impedance
        sigma = bunch_length/c
        mu = tau[0]
        
        i_limit = freq < freq_lim
        freq_trun = freq[i_limit]
        dft_wake_trun = dft_wake[i_limit]
        
        dft_rho_trun = np.exp(-0.5*(sigma*2*np.pi*freq_trun)**2 + \
                                  1j*mu*2*np.pi*freq_trun)*bunch_charge
        imp = dft_wake_trun/dft_rho_trun*-1*bunch_charge
        
        super().__init__(variable=freq_trun, function=imp)
        self.data.index.name = "frequency [Hz]"    
        
    def to_wakefunction(self, nout=None, trim=False):
        """
        Return a WakeFunction object from the impedance data.
    
        Parameters
        ----------
        nout : int, optional
            Length of the transformed axis of the output. If None, it is taken
            to be 2*(n-1) where n is the length of the input. If nout > n, the
            input is padded with zeros. If nout < n, the inpu it cropped.
            Note that, for any nout value, nout//2+1 input points are required.
        trim : bool or float, optional
            If True, the pseudo wake function is trimmed at time=0 and is forced 
            to zero where time<0. 
            If False, the original result is preserved.
            If a float is given, the pseudo wake function is trimmed from 
            time <= trim to 0. 
        """
        
        Z0 = (self.data['real'] + self.data['imag']*1j)
        Z = Z0[~np.isnan(Z0)]
        
        if self.impedance_type != "long":
            Z = Z / 1j
        
        freq = Z.index
        fs = ( freq[-1] - freq[0] ) / len(freq)
        sampling = freq[1] - freq[0]
        
        if nout is None:
            nout = len(Z)
            
        time_array = sc.fft.fftfreq(nout, sampling)
        Wlong_raw = sc.fft.irfft(np.array(Z), n=nout, axis=0) * nout * fs
        
        time = sc.fft.fftshift(time_array)
        Wlong = sc.fft.fftshift(Wlong_raw)
        
        if trim is not False:
            i_neg = np.where(time<trim)[0]
            Wlong[i_neg] = 0
                    
        wf = WakeFunction(variable=time, function=Wlong)
        return wf
    
class WakeField:
    """
    Defines a WakeField which corresponds to a single physical element which 
    produces different types of wakes, represented by their Impedance or 
    WakeFunction objects.
    
    Parameters
    ----------
    structure_list : list, optional
        list of Impedance/WakeFunction objects to add to the WakeField.
    name : str, optional
    
    Attributes
    ----------
    impedance_components : array of str
        Impedance component names present for this element.
        
    Methods
    -------
    append_to_model(structure_to_add)
        Add Impedance/WakeFunction to WakeField.
    list_to_attr(structure_list)
        Add list of Impedance/WakeFunction to WakeField.
    add_wakefields(wake1, beta1, wake2, beta2)
        Add two WakeFields taking into account beta functions.
    add_several_wakefields(wakefields, beta)
        Add a list of WakeFields taking into account beta functions.
    """

    def __init__(self, structure_list=None, name=None):
        self.list_to_attr(structure_list)
        self.name = name

    def append_to_model(self, structure_to_add):
        """
        Add Impedance/WakeFunction component to WakeField.

        Parameters
        ----------
        structure_to_add : Impedance or WakeFunction object
        """
        list_of_attributes = dir(self)
        if isinstance(structure_to_add, Impedance):
            attribute_name = "Z" + structure_to_add.impedance_type
            if attribute_name in list_of_attributes:
                raise ValueError("There is already a component of the type "
                                 "{} in this element.".format(attribute_name))
            else:
                self.__setattr__(attribute_name, structure_to_add)
        elif isinstance(structure_to_add, WakeFunction):
            attribute_name = "W" + structure_to_add.wake_type
            if attribute_name in list_of_attributes:
                raise ValueError("There is already a component of the type "
                                 "{} in this element.".format(attribute_name))
            else:
                self.__setattr__(attribute_name, structure_to_add)
        else:
            raise ValueError("{} is not an Impedance nor a WakeFunction.".format(structure_to_add))
    
    def list_to_attr(self, structure_list):
        """
         Add list of Impedance/WakeFunction components to WakeField.

        Parameters
        ----------
        structure_list : list of Impedance or WakeFunction objects.
        """
        if structure_list is not None:
            for component in structure_list:
                self.append_to_model(component)
    
    @property
    def impedance_components(self):
        """
        Return an array of the impedance component names for the element.
        """
        return np.array([comp for comp in dir(self) if re.match(r'[Z]', comp)])
    
    @property
    def wake_components(self):
        """
        Return an array of the wake function component names for the element.
        """
        return np.array([comp for comp in dir(self) if re.match(r'[W]', comp)])
    
    @staticmethod
    def add_wakefields(wake1, beta1, wake2, beta2):
        """
        Add two WakeFields taking into account beta functions.

        Parameters
        ----------
        wake1 : WakeField
            WakeField to add.
        beta1 : array of shape (2,)
            Beta functions at wake1 postion.
        wake2 : WakeField
            WakeField to add.
        beta2 : array of shape (2,)
            Beta functions at wake2 postion.

        Returns
        -------
        wake_sum : WakeField
            WakeField with summed components.

        """
        wake_sum = deepcopy(wake1)
        for component_name1 in wake1.impedance_components:
            impedance1 = getattr(wake_sum, component_name1)
            weight1 = ((beta1[0] ** impedance1.power_x) * 
                      (beta1[1] ** impedance1.power_y))
            setattr(wake_sum, component_name1, weight1*impedance1)
            
        for component_name2 in wake2.impedance_components: 
            impedance2 = getattr(wake2, component_name2)
            weight2 = ((beta2[0] ** impedance2.power_x) * 
                      (beta2[1] ** impedance2.power_y))
            try:
                impedance1 = getattr(wake_sum, component_name2)
                setattr(wake_sum, component_name2, impedance1 +
                        weight2*impedance2)
            except AttributeError:
                setattr(wake_sum, component_name2, weight2*impedance2)

        return wake_sum
    
    @staticmethod
    def add_several_wakefields(wakefields, beta):
        """
        Add a list of WakeFields taking into account beta functions.
        
        Parameters
        ----------
        wakefields : list of WakeField
            WakeFields to add.
        beta : array of shape (len(wakefields), 2)
            Beta functions at the WakeField postions.

        Returns
        -------
        wake_sum : WakeField
            WakeField with summed components..

        """
        if len(wakefields) == 1:
            return wakefields[0]
        elif len(wakefields) > 1:
            wake_sum = WakeField.add_wakefields(wakefields[0], beta[:,0],
                                     wakefields[1], beta[:,1])
            for i in range(len(wakefields) - 2):
                wake_sum = WakeField.add_wakefields(wake_sum, [1 ,1], 
                                         wakefields[i+2], beta[:,i+2])
            return wake_sum
        else:
            raise ValueError("Error in input.")