Select Git revision
utilities.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
utilities.py 14.70 KiB
# -*- coding: utf-8 -*-
"""
This module defines utilities functions, helping to deals with the
collective_effects module.
@author: Alexis Gamelin
@date: 28/09/2020
"""
import pandas as pd
import numpy as np
from pathlib import Path
from scipy.interpolate import interp1d
from scipy.constants import c
from mbtrack2.collective_effects.wakefield import Impedance, WakeFunction, WakeField
from pathlib import Path
def read_CST(file, impedance_type='long', divide_by=None):
"""
Read CST file format into an Impedance object.
Parameters
----------
file : str
Path to the file to read.
impedance_type : str, optional
Type of the Impedance object.
divide_by : float, optional
Divide the impedance by a value. Mainly used to normalize transverse
impedance by displacement.
Returns
-------
result : Impedance object
Data from file.
"""
df = pd.read_csv(file, comment="#", header = None, sep = "\t",
names = ["Frequency","Real","Imaginary"])
df["Frequency"] = df["Frequency"]*1e9
if divide_by is not None:
df["Real"] = df["Real"]/divide_by
df["Imaginary"] = df["Imaginary"]/divide_by
if impedance_type == "long":
df["Real"] = np.abs(df["Real"])
df.set_index("Frequency", inplace = True)
result = Impedance(variable = df.index,
function = df["Real"] + 1j*df["Imaginary"],
impedance_type=impedance_type)
return result
def read_IW2D(file, file_type='Zlong'):
"""
Read IW2D file format into an Impedance object or a WakeField object.
Parameters
----------
file : str
Path to the file to read.
file_type : str, optional
Type of the Impedance or WakeField object.
Returns
-------
result : Impedance or WakeField object
Data from file.
"""
if file_type[0] == "Z":
df = pd.read_csv(file, sep = ' ', header = None,
names = ["Frequency","Real","Imaginary"], skiprows=1)
df.set_index("Frequency", inplace = True)
df = df[df["Real"].notna()]
df = df[df["Imaginary"].notna()]
result = Impedance(variable = df.index,
function = df["Real"] + 1j*df["Imaginary"],
impedance_type=file_type[1:])
elif file_type[0] == "W":
df = pd.read_csv(file, sep = ' ', header = None,
names = ["Distance","Wake"], skiprows=1)
df["Time"] = df["Distance"] / c
df.set_index("Time", inplace = True)
if np.any(df.isna()):
index = df.isna().values
df = df.interpolate()
print("Nan values have been interpolated to:")
print(df[index])
# if file_type == "Wlong":
# df["Wake"] = df["Wake"]*-1
result = WakeFunction(variable = df.index,
function = df["Wake"],
wake_type=file_type[1:])
else:
raise ValueError("file_type should begin by Z or W.")
return result
def read_IW2D_folder(folder, suffix, select="WZ"):
"""
Read IW2D results into a WakeField object.
Parameters
----------
file : str
Path to the file to read.
suffix : str
End of the name of each files. For example, in "Zlong_test.dat" the
suffix should be "_test.dat".
select : str, optional
Select which object to load. "W" for WakeFunction, "Z" for Impedance
and "WZ" or "ZW" for both.
Returns
-------
result : WakeField object
WakeField object with Impedance and WakeFunction objects from the
different files.
"""
if (select == "WZ") or (select == "ZW"):
types = {"W" : WakeFunction,
"Z" : Impedance}
elif (select == "W"):
types = {"W" : WakeFunction}
elif (select == "Z"):
types = {"Z" : Impedance}
else:
raise ValueError("select should be W, Z or WZ.")
components = ["long", "xdip", "ydip", "xquad", "yquad"]
data_folder = Path(folder)
list_for_wakefield = []
for key, item in types.items():
for component in components:
name = data_folder / (key + component + suffix)
res = read_IW2D(file=name, file_type=key + component)
list_for_wakefield.append(res)
wake = WakeField(list_for_wakefield)
return wake
def spectral_density(frequency, sigma, m = 1, mode="Hermite"):
"""
Compute the spectral density of different modes for various values of the
head-tail mode number, based on Table 1 p238 of [1].
Parameters
----------
frequency : list or numpy array
sample points of the spectral density in [Hz]
sigma : float
RMS bunch length in [s]
m : int, optional
head-tail (or azimutal/synchrotron) mode number
mode: str, optional
type of the mode taken into account for the computation:
-"Hermite" modes for Gaussian bunches
Returns
-------
numpy array
References
----------
[1] : Handbook of accelerator physics and engineering, 3rd printing.
"""
if mode == "Hermite":
return (2*np.pi*frequency*sigma)**(2*m)*np.exp(
-1*(2*np.pi*frequency*sigma)**2)
else:
raise NotImplementedError("Not implemanted yet.")
def gaussian_bunch_spectrum(frequency, sigma):
"""
Compute a Gaussian bunch spectrum [1].
Parameters
----------
frequency : array
sample points of the beam spectrum in [Hz].
sigma : float
RMS bunch length in [s].
Returns
-------
bunch_spectrum : array
Bunch spectrum sampled at points given in frequency.
References
----------
[1] : Gamelin, A. (2018). Collective effects in a transient microbunching
regime and ion cloud mitigation in ThomX. p86, Eq. 4.19
"""
return np.exp(-1/2*(2*np.pi*frequency)**2*sigma**2)
def gaussian_bunch(time, sigma):
"""
Compute a Gaussian bunch profile.
Parameters
----------
time : array
sample points of the bunch profile in [s].
sigma : float
RMS bunch length in [s].
Returns
-------
bunch_profile : array
Bunch profile in [s**-1] sampled at points given in time.
"""
return np.exp(-1/2*(time**2/sigma**2))/(sigma*np.sqrt(2*np.pi))
def beam_spectrum(frequency, M, bunch_spacing, sigma=None,
bunch_spectrum=None):
"""
Compute the beam spectrum assuming constant spacing between bunches [1].
Parameters
----------
frequency : list or numpy array
sample points of the beam spectrum in [Hz].
M : int
Number of bunches.
bunch_spacing : float
Time between two bunches in [s].
sigma : float, optional
If bunch_spectrum is None then a Gaussian bunch with sigma RMS bunch
length in [s] is assumed.
bunch_spectrum : array, optional
Bunch spectrum sampled at points given in frequency.
Returns
-------
beam_spectrum : array
References
----------
[1] Rumolo, G - Beam Instabilities - CAS - CERN Accelerator School:
Advanced Accelerator Physics Course - 2014, Eq. 9
"""
if bunch_spectrum is None:
bunch_spectrum = gaussian_bunch_spectrum(frequency, sigma)
beam_spectrum = (bunch_spectrum * np.exp(1j*np.pi*frequency *
bunch_spacing*(M-1)) *
np.sin(M*np.pi*frequency*bunch_spacing) /
np.sin(np.pi*frequency*bunch_spacing))
return beam_spectrum
def effective_impedance(ring, imp, m, mu, sigma, M, tuneS, xi=None,
mode="Hermite"):
"""
Compute the effective (longitudinal or transverse) impedance.
Formulas from Eq. (1) and (2) p238 of [1].
Parameters
----------
ring : Synchrotron object
imp : Impedance object
mu : int
coupled bunch mode number, goes from 0 to (M-1) where M is the
number of bunches
m : int
head-tail (or azimutal/synchrotron) mode number
sigma : float
RMS bunch length in [s]
M : int
Number of bunches.
tuneS : float
Synchrotron tune.
xi : float, optional
(non-normalized) chromaticity
mode: str, optional
type of the mode taken into account for the computation:
-"Hermite" modes for Gaussian bunches
Returns
-------
Zeff : float
effective impedance in [ohm] or in [ohm/m] depanding on the impedance
type.
References
----------
[1] : Handbook of accelerator physics and engineering, 3rd printing.
"""
if not isinstance(imp, Impedance):
raise TypeError("{} should be an Impedance object.".format(imp))
fmin = imp.data.index.min()
fmax = imp.data.index.max()
if fmin > 0:
double_sided_impedance(imp)
if mode == "Hermite":
def h(f):
return spectral_density(frequency=f, sigma=sigma, m=m,
mode="Hermite")
else:
raise NotImplementedError("Not implemanted yet.")
pmax = fmax/(ring.f0 * M) - 1
pmin = fmin/(ring.f0 * M) + 1
p = np.arange(pmin,pmax+1)
if imp.impedance_type == "long":
fp = ring.f0*(p*M + mu + m*tuneS)
fp = fp[np.nonzero(fp)] # Avoid division by 0
num = np.sum( imp(fp) * h(fp) / (fp*2*np.pi) )
den = np.sum( h(fp) )
Zeff = num/den
elif imp.impedance_type == "xdip" or imp.impedance_type == "ydip":
if imp.impedance_type == "xdip":
tuneXY = ring.tune[0]
if xi is None :
xi = ring.chro[0]
elif imp.impedance_type == "ydip":
tuneXY = ring.tune[1]
if xi is None:
xi = ring.chro[1]
fp = ring.f0*(p*M + mu + tuneXY + m*tuneS)
f_xi = xi/ring.eta*ring.f0
num = np.sum( imp(fp) * h(fp - f_xi) )
den = np.sum( h(fp - f_xi) )
Zeff = num/den
else:
raise TypeError("Effective impedance is only defined for long, xdip"
" and ydip impedance type.")
return Zeff
def yokoya_elliptic(x_radius , y_radius):
"""
Compute Yokoya factors for an elliptic beam pipe.
Function adapted from N. Mounet IW2D.
Parameters
----------
x_radius : float
Horizontal semi-axis of the ellipse in [m].
y_radius : float
Vertical semi-axis of the ellipse in [m].
Returns
-------
yoklong : float
Yokoya factor for the longitudinal impedance.
yokxdip : float
Yokoya factor for the dipolar horizontal impedance.
yokydip : float
Yokoya factor for the dipolar vertical impedance.
yokxquad : float
Yokoya factor for the quadrupolar horizontal impedance.
yokyquad : float
Yokoya factor for the quadrupolar vertical impedance.
"""
if y_radius < x_radius:
small_semiaxis = y_radius
large_semiaxis = x_radius
else:
small_semiaxis = x_radius
large_semiaxis = y_radius
path_to_file = Path(__file__).parent
file = path_to_file / "data" / "Yokoya_elliptic_from_Elias_USPAS.csv"
# read Yokoya factors interpolation file
# BEWARE: columns are ratio, dipy, dipx, quady, quadx
yokoya_file = pd.read_csv(file)
ratio_col = yokoya_file["x"]
# compute semi-axes ratio (first column of this file)
ratio = (large_semiaxis - small_semiaxis)/(large_semiaxis + small_semiaxis)
# interpolate Yokoya file at the correct ratio
yoklong = 1
if y_radius < x_radius:
yokydip = np.interp(ratio, ratio_col, yokoya_file["dipy"])
yokxdip = np.interp(ratio, ratio_col, yokoya_file["dipx"])
yokyquad = np.interp(ratio, ratio_col, yokoya_file["quady"])
yokxquad = np.interp(ratio, ratio_col, yokoya_file["quadx"])
else:
yokxdip = np.interp(ratio, ratio_col, yokoya_file["dipy"])
yokydip = np.interp(ratio, ratio_col, yokoya_file["dipx"])
yokxquad = np.interp(ratio, ratio_col, yokoya_file["quady"])
yokyquad = np.interp(ratio, ratio_col, yokoya_file["quadx"])
return (yoklong, yokxdip, yokydip, yokxquad, yokyquad)
def beam_loss_factor(impedance, frequency, spectrum, ring):
"""
Compute "beam" loss factor using the beam spectrum, uses a sum instead of
integral compared to loss_factor [1].
Parameters
----------
impedance : Impedance of type "long"
frequency : array
Sample points of spectrum.
spectrum : array
Beam spectrum to consider.
ring : Synchrotron object
Returns
-------
kloss_beam : float
Beam loss factor in [V/C].
References
----------
[1] : Handbook of accelerator physics and engineering, 3rd printing.
Eq (3) p239.
"""
pmax = np.floor(impedance.data.index.max()/ring.f0)
pmin = np.floor(impedance.data.index.min()/ring.f0)
if pmin >= 0:
double_sided_impedance(impedance)
pmin = -1*pmax
p = np.arange(pmin+1,pmax)
pf0 = p*ring.f0
ReZ = np.real(impedance(pf0))
spectral_density = np.abs(spectrum)**2
# interpolation of the spectrum is needed to avoid problems liked to
# division by 0
# computing the spectrum directly to the frequency points gives
# wrong results
spect = interp1d(frequency, spectral_density)
kloss_beam = ring.f0 * np.sum(ReZ*spect(pf0))
return kloss_beam
def double_sided_impedance(impedance):
"""
Add negative frequency points to single sided impedance spectrum following
symetries depending on impedance type.
Parameters
----------
impedance : Impedance object
Single sided impedance.
"""
fmin = impedance.data.index.min()
if fmin >= 0:
negative_index = impedance.data.index*-1
negative_data = impedance.data.set_index(negative_index)
imp_type = impedance.impedance_type
if imp_type == "long":
negative_data["imag"] = -1*negative_data["imag"]
elif (imp_type == "xdip") or (imp_type == "ydip"):
negative_data["real"] = -1*negative_data["real"]
elif (imp_type == "xquad") or (imp_type == "yquad"):
negative_data["real"] = -1*negative_data["real"]
else:
raise ValueError("Wrong impedance type")
try:
negative_data = negative_data.drop(0)
except KeyError:
pass
all_data = impedance.data.append(negative_data)
all_data = all_data.sort_index()
impedance.data = all_data