Skip to content
Snippets Groups Projects
Select Git revision
  • f3296289fb89342db4aad904dc27e4e31a42e3b4
  • develop default protected
  • feature-ibs-suggestions-from-salah
  • syntax-typehints
  • feature-ExponentialDumper-bugfix
  • Resisitve_wall_eff_radius_yokoya
  • feature-feedback-IQ-damper0
  • 18-synchrotron-object-string-representation
  • stable protected
  • feature-particle-in-cell
  • feature-quad_wakes_LongRangeResistiveWall
  • feature-iqdamper0
  • feature-read_wakis
  • use-one-bin
  • RF-FBv0.6
  • RF-FBv0.5
  • faster_pytorch
  • RF-FB
  • util
  • RFBucket
  • Long-range_wakepotential
  • 0.9.0
  • 0.8.0
  • 0.7.0
  • 0.6.0
  • 0.5.0
  • 0.4
  • 0.3
  • 0.2
  • 0.1
30 results

utilities.py

  • user avatar
    Gamelin Alexis authored
    Allow to read both wake function and impedance with read_IW2D_folder
    Fix yokoya_elliptic function
    878e5959
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    utilities.py 14.70 KiB
    # -*- coding: utf-8 -*-
    """
    This module defines utilities functions, helping to deals with the 
    collective_effects module.
    
    @author: Alexis Gamelin
    @date: 28/09/2020
    """
    
    import pandas as pd
    import numpy as np
    from pathlib import Path
    from scipy.interpolate import interp1d
    from scipy.constants import c
    from mbtrack2.collective_effects.wakefield import Impedance, WakeFunction, WakeField
    from pathlib import Path
    
    
    def read_CST(file, impedance_type='long', divide_by=None):
        """
        Read CST file format into an Impedance object.
        
        Parameters
        ----------
        file : str
            Path to the file to read.
        impedance_type : str, optional
            Type of the Impedance object.
        divide_by : float, optional
            Divide the impedance by a value. Mainly used to normalize transverse 
            impedance by displacement.
            
        Returns
        -------
        result : Impedance object
            Data from file.
        """
        df = pd.read_csv(file, comment="#", header = None, sep = "\t", 
                        names = ["Frequency","Real","Imaginary"])
        df["Frequency"] = df["Frequency"]*1e9 
        if divide_by is not None:
            df["Real"] = df["Real"]/divide_by
            df["Imaginary"] = df["Imaginary"]/divide_by
        if impedance_type == "long":
            df["Real"] = np.abs(df["Real"])
        df.set_index("Frequency", inplace = True)
        result = Impedance(variable = df.index,
                           function = df["Real"] + 1j*df["Imaginary"],
                           impedance_type=impedance_type)
        return result
    
    def read_IW2D(file, file_type='Zlong'):
        """
        Read IW2D file format into an Impedance object or a WakeField object.
        
        Parameters
        ----------
        file : str
            Path to the file to read.
        file_type : str, optional
            Type of the Impedance or WakeField object.
            
        Returns
        -------
        result : Impedance or WakeField object
            Data from file.
        """
        if file_type[0] == "Z":
            df = pd.read_csv(file, sep = ' ', header = None, 
                             names = ["Frequency","Real","Imaginary"], skiprows=1)
            df.set_index("Frequency", inplace = True)
            df = df[df["Real"].notna()]
            df = df[df["Imaginary"].notna()]
            result = Impedance(variable = df.index,
                               function = df["Real"] + 1j*df["Imaginary"],
                               impedance_type=file_type[1:])
        elif file_type[0] == "W":
            df = pd.read_csv(file, sep = ' ', header = None, 
                             names = ["Distance","Wake"], skiprows=1)
            df["Time"] = df["Distance"] / c
            df.set_index("Time", inplace = True)
            if np.any(df.isna()):
                index = df.isna().values
                df = df.interpolate()
                print("Nan values have been interpolated to:")
                print(df[index])
            # if file_type == "Wlong":
            #     df["Wake"] = df["Wake"]*-1
            result = WakeFunction(variable = df.index,
                               function = df["Wake"],
                               wake_type=file_type[1:])
        else:
            raise ValueError("file_type should begin by Z or W.")
        return result
    
    def read_IW2D_folder(folder, suffix, select="WZ"):
        """
        Read IW2D results into a WakeField object.
        
        Parameters
        ----------
        file : str
            Path to the file to read.
        suffix : str
            End of the name of each files. For example, in "Zlong_test.dat" the
            suffix should be "_test.dat".
        select : str, optional
            Select which object to load. "W" for WakeFunction, "Z" for Impedance 
            and "WZ" or "ZW" for both.
            
        Returns
        -------
        result : WakeField object
            WakeField object with Impedance and WakeFunction objects from the 
            different files.
        """
        if (select == "WZ") or (select == "ZW"):
            types = {"W" : WakeFunction,
                     "Z" : Impedance}
        elif (select == "W"):
            types = {"W" : WakeFunction}
        elif (select == "Z"):
            types = {"Z" : Impedance}
        else:
            raise ValueError("select should be W, Z or WZ.")
            
        components = ["long", "xdip", "ydip", "xquad", "yquad"]
        
        data_folder = Path(folder)
        
        list_for_wakefield = []
        for key, item in types.items():
            for component in components:
                name = data_folder / (key + component + suffix)
                res = read_IW2D(file=name, file_type=key + component)
                list_for_wakefield.append(res)
                
        wake = WakeField(list_for_wakefield)
        
        return wake
    
    def spectral_density(frequency, sigma, m = 1, mode="Hermite"):
        """
        Compute the spectral density of different modes for various values of the
        head-tail mode number, based on Table 1 p238 of [1].
        
        Parameters
        ----------
        frequency : list or numpy array
            sample points of the spectral density in [Hz]
        sigma : float
            RMS bunch length in [s]
        m : int, optional
            head-tail (or azimutal/synchrotron) mode number
        mode: str, optional
            type of the mode taken into account for the computation:
            -"Hermite" modes for Gaussian bunches
    
        Returns
        -------
        numpy array
        
        References
        ----------
        [1] : Handbook of accelerator physics and engineering, 3rd printing.
        """
        
        if mode == "Hermite":
            return (2*np.pi*frequency*sigma)**(2*m)*np.exp(
                    -1*(2*np.pi*frequency*sigma)**2)
        else:
            raise NotImplementedError("Not implemanted yet.")
            
    def gaussian_bunch_spectrum(frequency, sigma): 
        """
        Compute a Gaussian bunch spectrum [1].
    
        Parameters
        ----------
        frequency : array
            sample points of the beam spectrum in [Hz].
        sigma : float
            RMS bunch length in [s].
    
        Returns
        -------
        bunch_spectrum : array
            Bunch spectrum sampled at points given in frequency.
            
        References
        ----------
        [1] : Gamelin, A. (2018). Collective effects in a transient microbunching 
        regime and ion cloud mitigation in ThomX. p86, Eq. 4.19
        """
        return np.exp(-1/2*(2*np.pi*frequency)**2*sigma**2)
    
    def gaussian_bunch(time, sigma): 
        """
        Compute a Gaussian bunch profile.
    
        Parameters
        ----------
        time : array
            sample points of the bunch profile in [s].
        sigma : float
            RMS bunch length in [s].
    
        Returns
        -------
        bunch_profile : array
            Bunch profile in [s**-1] sampled at points given in time.
        """
        return np.exp(-1/2*(time**2/sigma**2))/(sigma*np.sqrt(2*np.pi))
            
            
    def beam_spectrum(frequency, M, bunch_spacing, sigma=None, 
                      bunch_spectrum=None): 
        """
        Compute the beam spectrum assuming constant spacing between bunches [1].
    
        Parameters
        ----------
        frequency : list or numpy array
            sample points of the beam spectrum in [Hz].
        M : int
            Number of bunches.
        bunch_spacing : float
            Time between two bunches in [s].
        sigma : float, optional
            If bunch_spectrum is None then a Gaussian bunch with sigma RMS bunch 
            length in [s] is assumed.
        bunch_spectrum : array, optional
            Bunch spectrum sampled at points given in frequency.
    
        Returns
        -------
        beam_spectrum : array
    
        References
        ----------
        [1] Rumolo, G - Beam Instabilities - CAS - CERN Accelerator School: 
            Advanced Accelerator Physics Course - 2014, Eq. 9
        """
        
        if bunch_spectrum is None:
            bunch_spectrum = gaussian_bunch_spectrum(frequency, sigma)
            
        beam_spectrum = (bunch_spectrum * np.exp(1j*np.pi*frequency * 
                                                 bunch_spacing*(M-1)) * 
                         np.sin(M*np.pi*frequency*bunch_spacing) / 
                         np.sin(np.pi*frequency*bunch_spacing))
        
        return beam_spectrum
        
        
    def effective_impedance(ring, imp, m, mu, sigma, M, tuneS, xi=None, 
                            mode="Hermite"):
        """
        Compute the effective (longitudinal or transverse) impedance. 
        Formulas from Eq. (1) and (2) p238 of [1].
        
        Parameters
        ----------
        ring : Synchrotron object
        imp : Impedance object
        mu : int
            coupled bunch mode number, goes from 0 to (M-1) where M is the
            number of bunches
        m : int
            head-tail (or azimutal/synchrotron) mode number
        sigma : float
            RMS bunch length in [s]
        M : int
            Number of bunches.
        tuneS : float
            Synchrotron tune.
        xi : float, optional
            (non-normalized) chromaticity
        mode: str, optional
            type of the mode taken into account for the computation:
            -"Hermite" modes for Gaussian bunches
    
        Returns
        -------
        Zeff : float 
            effective impedance in [ohm] or in [ohm/m] depanding on the impedance
            type.
            
        References
        ----------
        [1] : Handbook of accelerator physics and engineering, 3rd printing.
        """
        
        if not isinstance(imp, Impedance):
            raise TypeError("{} should be an Impedance object.".format(imp))
            
        fmin = imp.data.index.min()
        fmax = imp.data.index.max()
        if fmin > 0:
            double_sided_impedance(imp)
            
        if mode == "Hermite":
            def h(f):
                return spectral_density(frequency=f, sigma=sigma, m=m,
                                        mode="Hermite")
        else:
            raise NotImplementedError("Not implemanted yet.")
        
        pmax = fmax/(ring.f0 * M) - 1
        pmin = fmin/(ring.f0 * M) + 1
        
        p = np.arange(pmin,pmax+1)
        
        if imp.impedance_type == "long":
            fp = ring.f0*(p*M + mu + m*tuneS)
            fp = fp[np.nonzero(fp)] # Avoid division by 0
            num = np.sum( imp(fp) * h(fp) / (fp*2*np.pi) )
            den = np.sum( h(fp) )
            Zeff = num/den
            
        elif imp.impedance_type == "xdip" or imp.impedance_type == "ydip":
            if imp.impedance_type == "xdip":
                tuneXY = ring.tune[0]
                if xi is None :
                    xi = ring.chro[0]
            elif imp.impedance_type == "ydip":
                tuneXY = ring.tune[1]
                if xi is None:
                    xi = ring.chro[1]
            fp = ring.f0*(p*M + mu + tuneXY + m*tuneS)
            f_xi = xi/ring.eta*ring.f0
            num = np.sum( imp(fp) * h(fp - f_xi) )
            den = np.sum( h(fp - f_xi) )
            Zeff = num/den
        else:
            raise TypeError("Effective impedance is only defined for long, xdip"
                            " and ydip impedance type.")
            
        return Zeff
    
    
    def yokoya_elliptic(x_radius , y_radius):
        """
        Compute Yokoya factors for an elliptic beam pipe.
        Function adapted from N. Mounet IW2D.
    
        Parameters
        ----------
        x_radius : float
            Horizontal semi-axis of the ellipse in [m].
        y_radius : float
            Vertical semi-axis of the ellipse in [m].
    
        Returns
        -------
        yoklong : float
            Yokoya factor for the longitudinal impedance.
        yokxdip : float
            Yokoya factor for the dipolar horizontal impedance.
        yokydip : float
            Yokoya factor for the dipolar vertical impedance.
        yokxquad : float
            Yokoya factor for the quadrupolar horizontal impedance.
        yokyquad : float
            Yokoya factor for the quadrupolar vertical impedance.
        """
        if y_radius < x_radius:
            small_semiaxis = y_radius
            large_semiaxis = x_radius
        else:
            small_semiaxis = x_radius
            large_semiaxis = y_radius
            
        path_to_file = Path(__file__).parent
        file = path_to_file / "data" / "Yokoya_elliptic_from_Elias_USPAS.csv"
    
        # read Yokoya factors interpolation file
        # BEWARE: columns are ratio, dipy, dipx, quady, quadx
        yokoya_file = pd.read_csv(file)
        ratio_col = yokoya_file["x"]
        # compute semi-axes ratio (first column of this file)
        ratio = (large_semiaxis - small_semiaxis)/(large_semiaxis + small_semiaxis)
    
        # interpolate Yokoya file at the correct ratio
        yoklong = 1
        
        if y_radius < x_radius:
            yokydip = np.interp(ratio, ratio_col, yokoya_file["dipy"])
            yokxdip = np.interp(ratio, ratio_col, yokoya_file["dipx"])
            yokyquad = np.interp(ratio, ratio_col, yokoya_file["quady"])
            yokxquad = np.interp(ratio, ratio_col, yokoya_file["quadx"])
        else:
            yokxdip = np.interp(ratio, ratio_col, yokoya_file["dipy"])
            yokydip = np.interp(ratio, ratio_col, yokoya_file["dipx"])
            yokxquad = np.interp(ratio, ratio_col, yokoya_file["quady"])
            yokyquad = np.interp(ratio, ratio_col, yokoya_file["quadx"])        
    
        return (yoklong, yokxdip, yokydip, yokxquad, yokyquad)
    
    def beam_loss_factor(impedance, frequency, spectrum, ring):
        """
        Compute "beam" loss factor using the beam spectrum, uses a sum instead of 
        integral compared to loss_factor [1].
    
        Parameters
        ----------
        impedance : Impedance of type "long"
        frequency : array
            Sample points of spectrum.
        spectrum : array
            Beam spectrum to consider.
        ring : Synchrotron object
    
        Returns
        -------
        kloss_beam : float
            Beam loss factor in [V/C].
            
        References
        ----------
        [1] : Handbook of accelerator physics and engineering, 3rd printing. 
            Eq (3) p239.
        """
        pmax = np.floor(impedance.data.index.max()/ring.f0)
        pmin = np.floor(impedance.data.index.min()/ring.f0)
        
        if pmin >= 0:
            double_sided_impedance(impedance)
            pmin = -1*pmax
        
        p = np.arange(pmin+1,pmax)    
        pf0 = p*ring.f0
        ReZ = np.real(impedance(pf0))
        spectral_density = np.abs(spectrum)**2
        # interpolation of the spectrum is needed to avoid problems liked to 
        # division by 0
        # computing the spectrum directly to the frequency points gives
        # wrong results
        spect = interp1d(frequency, spectral_density)
        kloss_beam = ring.f0 * np.sum(ReZ*spect(pf0))
        
        return kloss_beam
    
    def double_sided_impedance(impedance):
        """
        Add negative frequency points to single sided impedance spectrum following
        symetries depending on impedance type.
    
        Parameters
        ----------
        impedance : Impedance object
            Single sided impedance.
        """
        fmin = impedance.data.index.min()
        
        if fmin >= 0:
            negative_index = impedance.data.index*-1
            negative_data = impedance.data.set_index(negative_index)
            
            imp_type = impedance.impedance_type
            
            if imp_type == "long":
                negative_data["imag"] = -1*negative_data["imag"]
                
            elif (imp_type == "xdip") or (imp_type == "ydip"):
                negative_data["real"] = -1*negative_data["real"]
            
            elif (imp_type == "xquad") or (imp_type == "yquad"):
                negative_data["real"] = -1*negative_data["real"]
                
            else:
                raise ValueError("Wrong impedance type")
                
            try:
                negative_data = negative_data.drop(0)
            except KeyError:
                pass
                
            all_data = impedance.data.append(negative_data)
            all_data = all_data.sort_index()
            impedance.data = all_data