Source code for mbtrack2.impedance.impedance_model

# -*- coding: utf-8 -*-
"""
Module where the ImpedanceModel class is defined.
"""
import pickle
from copy import deepcopy

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib import colormaps
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
from scipy.integrate import trapz
from scipy.interpolate import interp1d

from mbtrack2.impedance.wakefield import WakeField, WakeFunction
from mbtrack2.utilities.misc import (
    beam_loss_factor,
    double_sided_impedance,
    effective_impedance,
)
from mbtrack2.utilities.spectrum import (
    beam_spectrum,
    gaussian_bunch_spectrum,
    spectral_density,
)


[docs]class ImpedanceModel(): """ Define the impedance model of the machine. The model must be completed with successive add(...) and add_global(...) calls, then compute_sum() must be run. The transverse impedance and wake functions are beta weighted and divided by the beta at the tracking location (ring.optics.local_beta). Parameters ---------- ring : Synchrotron object Attributes ---------- wakefields : list of WakeField objects WakeFields in the model. positions : list of arrays Positions corresponding the different WakeFields. names : list of str Names of the WakeField objects. sum : WakeField Sum of every WakeField in the model weigthed by beta functions. sum_"name" : WakeField Sum of the "name" Wakefield weigthed by beta functions. sum_names : array Names of attributes where the WakeFields are summed by name. globals : list of WakeField objects Globals WakeFields in the model. globals_names : list of str Names of the global WakeFields objects. Methods ------- add(wakefield, positions, name) Add the same WakeField object at different locations to the model. add_global(wakefield, name) Add a "global" WakeField object which will added to the sum WakeField but not weighted by beta functions. sum_beta(wake, beta) Weight a WakeField object by an array of beta functions. compute_sum_names() Compute the weighted WakeField for each WakeField object type. compute_sum() Compute the sum of all weighted WakeField into self.sum. plot_area(Z_type="Zlong", component="real", sigma=None, attr_list=None) Plot the contributions of different kind of WakeFields. save(file) Save impedance model to file. load(file) Load impedance model from file. """ def __init__(self, ring): self.ring = ring self.optics = self.ring.optics self.wakefields = [] self.positions = [] self.names = [] self.globals = [] self.globals_names = [] self.sum_names = []
[docs] def add(self, wakefield, positions, name=None): """ Add the same WakeField object at different locations to the model. Parameters ---------- wakefield : WakeField WakeField object to add to the model. positions : array, float or int Array of longitudinal positions where the elements are loacted. name : str, optional Name of the element type. If None, the name of the WakeField object is used. The default is None. Returns ------- None. """ if name is None: name = wakefield.name if name is None: raise ValueError("Please give a valid name.") if name not in self.names: self.names.append(name) else: raise ValueError("This name is already taken.") self.wakefields.append(wakefield) self.positions.append(positions)
[docs] def add_global(self, wakefield, name=None): """ Add a "global" WakeField object which will added to the sum WakeField but not weighted by beta functions. To use with "distributed" elements, for example a resistive wall wakefield computed from an effective radius (so which has already been weighted by beta functions). Parameters ---------- wakefield : WakeField WakeField object to add to the model. name : str, optional Name of the element type. If None, the name of the WakeField object is used. The default is None. Returns ------- None. """ self.globals.append(wakefield) if name is None: name = wakefield.name if name is None: raise ValueError("Please give a valid name.") if name not in self.globals_names: self.globals_names.append(name) else: raise ValueError("This name is already taken.")
# setattr(self, name, wakefield)
[docs] def sum_beta(self, wake, beta): """ Weight a WakeField object by an array of beta functions. Parameters ---------- wake : WakeField WakeField element object. beta : array of shape (2, N) Beta function at the locations of the elements. Returns ------- wake_sum : WakeField WakeField object weighted by beta functions. """ wake_sum = deepcopy(wake) local_beta = self.ring.optics.local_beta for component_name in wake_sum.components: comp = getattr(wake_sum, component_name) weight = ((beta[0, :]**comp.power_x) * (beta[1, :]**comp.power_y)) if comp.plane == "x": weight = weight.sum() / local_beta[0] if comp.plane == "y": weight = weight.sum() / local_beta[1] else: weight = weight.sum() setattr(wake_sum, component_name, weight * comp) return wake_sum
[docs] def compute_sum_names(self): """ Compute the weighted WakeField for each WakeField object type. The new summed WakeField object are set to into self.sum_name. """ for idx, wake in enumerate(self.wakefields): attribute_name = "sum_" + self.names[idx] beta = self.optics.beta(self.positions[idx]) wake_sum = self.sum_beta(wake, beta) wake_sum.name = attribute_name setattr(self, attribute_name, wake_sum) self.sum_names.append(attribute_name)
[docs] def compute_sum(self): """ Compute the sum of all weighted WakeField into self.sum. """ self.compute_sum_names() for i, name in enumerate(self.sum_names): if i == 0: self.sum = deepcopy(getattr(self, name)) self.sum.name = "sum" else: wake2 = getattr(self, name) for component_name2 in wake2.components: comp2 = getattr(wake2, component_name2) try: comp1 = getattr(self.sum, component_name2) setattr(self.sum, component_name2, comp1 + comp2) except AttributeError: setattr(self.sum, component_name2, comp2) for i, wake2 in enumerate(self.globals): name = self.globals_names[i] setattr(self, name, wake2) self.sum_names.append(name) if not hasattr(self, "sum"): self.sum = deepcopy(wake2) self.sum.name = "sum" else: for component_name2 in wake2.components: comp2 = getattr(wake2, component_name2) try: comp1 = getattr(self.sum, component_name2) setattr(self.sum, component_name2, comp1 + comp2) except AttributeError: setattr(self.sum, component_name2, comp2)
[docs] def group_attributes(self, string_in_name, names_to_group=None, property_list=['Zlong']): """Groups attributes in the ImpedanceModel based on a given string pattern. Args: string_in_name (str): The string pattern used to match attribute names for grouping. If names_to_group is given, this is a name of a new attribute instead. names_to_group (list, optional): List of attribute names to be explicitly grouped. If not provided, attributes matching the string pattern will be automatically grouped. Defaults to None. property_list (list, optional): List of property names to be grouped for each attribute. Defaults to ['Zlong']. Returns: int: Returns 0 indicating the successful grouping of attributes. Notes: - This method groups attributes in the ImpedanceModel class based on a given string pattern. - If names_to_group is not provided, it automatically selects attributes from the existing sum_names that match the given string pattern. - A new WakeField instance is created and assigned to the attribute named 'string_in_name'. - The specified properties from the first attribute in names_to_group are appended to the new WakeField instance. - The values of the specified properties from the remaining attributes in names_to_group are added to the corresponding properties in the new WakeField instance. - The names_to_group attributes are removed from the ImpedanceModel class. - The new grouped attribute is added to sum_names, and the first attribute in names_to_group is removed from sum_names.""" attrs = self.sum_names if names_to_group == None: names_to_group = [] for attr in attrs: if string_in_name in attr: names_to_group.append(attr) setattr(self, string_in_name, WakeField()) for prop in property_list: getattr(self, string_in_name).append_to_model( getattr(getattr(self, names_to_group[0]), prop)) for attr in names_to_group[1:]: for prop in property_list: old_values = getattr(getattr(self, string_in_name), prop) new_values = getattr(getattr(self, attr), prop) setattr(getattr(self, string_in_name), prop, old_values + new_values) self.sum_names.remove(attr) delattr(self, attr) self.sum_names.append(string_in_name) self.sum_names.remove(names_to_group[0]) delattr(self, names_to_group[0]) return 0
[docs] def rename_attribute(self, old_name, new_name): """Renames an attribute in the ImpedanceModel. Args: old_name (str): The current name of the attribute to be renamed. new_name (str): The new name for the attribute. Raises: KeyError: If the old_name doesn't exist as an attribute in the ImpedanceModel. Notes: - This method renames an attribute in the ImpedanceModel class. - The attribute with the old_name is removed from the class's dictionary (__dict__) using the pop() method. - The attribute is then added back to the class's dictionary with the new_name using the __dict__ attribute. - If the old_name doesn't exist as an attribute in the ImpedanceModel, a KeyError is raised. """ self.__dict__[new_name] = self.__dict__.pop(old_name)
[docs] def plot_area(self, Z_type="Zlong", component="real", sigma=None, attr_list=None, zoom=False): """ Plot the contributions of different kind of WakeFields. Parameters ---------- Z_type : str, optional Type of impedance to plot. component : str, optional Component to plot, can be "real" or "imag". sigma : float, optional RMS bunch length in [s] to use for the spectral density. If equal to None, the spectral density is not plotted. attr_list : list or array of str, optional Attributes to plot. zoom : bool If True, add a zoomed plot on top right corner. """ if attr_list is None: attr_list = self.sum_names # manage legend Ztype_dict = { "Zlong": 0, "Zxdip": 1, "Zydip": 2, "Zxquad": 3, "Zyquad": 4 } scale = [1e-3, 1e-6, 1e-6, 1e-6, 1e-6] label_list = [ r"$Z_\mathrm{long} \; (\mathrm{k}\Omega)$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{x,j} Z_{x,j}^\mathrm{Dip} \; (\mathrm{M}\Omega/\mathrm{m})$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{y,j} Z_{y,j}^\mathrm{Dip} \; (\mathrm{M}\Omega/\mathrm{m})$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{x,j} Z_{x,j}^\mathrm{Quad} \; (\mathrm{M}\Omega/\mathrm{m})$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{y,j} Z_{y,j}^\mathrm{Quad} \; (\mathrm{M}\Omega/\mathrm{m})$" ] leg = Ztype_dict[Z_type] # sort plot by decresing area size area = np.zeros((len(attr_list), )) for index, attr in enumerate(attr_list): try: sum_imp = getattr(getattr(self, attr), Z_type) area[index] = trapz(sum_imp.data[component], sum_imp.data.index) except AttributeError: pass sorted_index = area.argsort()[::-1] # Init fig fig = plt.figure() ax = fig.gca() zero_impedance = getattr(self.sum, Z_type) * 0 total_imp = 0 legend = [] if sigma is not None: legend.append("Spectral density for sigma = " + str(sigma) + " s") # Main plot colorblind = colormaps['tab10'].colors for index in sorted_index: attr = attr_list[index] # Set all impedances with common indexes using + zero_impedance try: sum_imp = getattr(getattr(self, attr), Z_type) + zero_impedance ax.fill_between(sum_imp.data.index * 1e-9, total_imp, total_imp + sum_imp.data[component] * scale[leg], edgecolor=colorblind[index % 10], color=colorblind[index % 10]) total_imp += sum_imp.data[component] * scale[leg] if attr[:4] == "sum_": legend.append(attr[4:]) else: legend.append(attr) except AttributeError: pass if sigma is not None: spect = spectral_density(zero_impedance.data.index, sigma) spect = spect / spect.max() * total_imp.max() ax.plot(sum_imp.data.index * 1e-9, spect, 'r', linewidth=2.5) ax.legend(legend, loc="upper left", ncol=2) ax.set_xlabel("Frequency (GHz)") ax.set_ylabel(label_list[leg] + " - " + component + " part") ax.set_title(label_list[leg] + " - " + component + " part") if zoom is True: in_ax = inset_axes( ax, width="30%", # width = 30% of parent_bbox height=1.5, # height : 1 inch loc=1) total_imp = 0 for index in sorted_index: attr = attr_list[index] # Set all impedances with common indexes using + zero_impedance try: sum_imp = getattr(getattr(self, attr), Z_type) + zero_impedance in_ax.fill_between(sum_imp.data.index * 1e-3, total_imp, total_imp + sum_imp.data[component] * 1e-9, edgecolor=colorblind[index % 10], color=colorblind[index % 10]) total_imp += sum_imp.data[component] * 1e-9 except AttributeError: pass in_ax.set_xlim([0, 200]) in_ax.set_xlabel("Frequency (kHz)") in_ax.set_ylabel(r"$[\mathrm{G}\Omega]$") return fig
[docs] def effective_impedance(self, m, mu, sigma, M, tuneS, xi=None, mode="Hermite"): """ Compute the longitudinal and transverse effective impedance. Parameters ---------- mu : int coupled bunch mode number, goes from 0 to (M-1) where M is the number of bunches m : int head-tail (or azimutal/synchrotron) mode number sigma : float RMS bunch length in [s] M : int Number of bunches. tuneS : float Synchrotron tune. xi : float, optional (non-normalized) chromaticity mode: str, optional type of the mode taken into account for the computation: -"Hermite" modes for Gaussian bunches Returns ------- summary : DataFrame Longitudinal and transverse effective impedance. """ attr_list = self.sum_names eff_array = np.zeros((len(attr_list), 3), dtype=complex) for i, attr in enumerate(attr_list): try: impedance = getattr(getattr(self, attr), "Zlong") eff_array[i, 0] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass try: impedance = getattr(getattr(self, attr), "Zxdip") eff_array[i, 1] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass try: impedance = getattr(getattr(self, attr), "Zydip") eff_array[i, 2] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass eff_array[:, 0] = eff_array[:, 0] * self.ring.omega0 * 1e3 eff_array[:, 1] = eff_array[:, 1] * 1e-3 eff_array[:, 2] = eff_array[:, 2] * 1e-3 summary = pd.DataFrame(eff_array, index=attr_list, columns=[ "Z/n [mOhm]", "sum betax x Zxeff [kOhm]", "sum betay x Zyeff [kOhm]" ]) return summary
[docs] def energy_loss(self, sigma, M, bunch_spacing, I, n_points=10e6): """ Compute the beam and bunch loss factor and energy losses for each type of element in the model assuming Gaussian bunches and constant spacing between bunches. Parameters ---------- sigma : float RMS bunch length in [s]. M : int Number of bunches in the beam. bunch_spacing : float Time between two bunches in [s]. I : float Total beam current in [A]. n_points : float, optional Number of points used in the frequency spectrums. Returns ------- summary : Dataframe Contains the beam and bunch loss factor and energy loss for the full model and for each type of different component. """ fmax = self.sum.Zlong.data.index.max() fmin = self.sum.Zlong.data.index.min() Q = I * self.ring.T0 / M if fmin >= 0: fmin = -1 * fmax f = np.linspace(fmin, fmax, int(n_points)) beam_spect = beam_spectrum(f, M, bunch_spacing, sigma=sigma) bunch_spect = gaussian_bunch_spectrum(f, sigma) attr_list = self.sum_names loss_array = np.zeros((len(attr_list), 2)) for i, attr in enumerate(attr_list): try: impedance = getattr(getattr(self, attr), "Zlong") loss_array[i, 0] = beam_loss_factor(impedance, f, beam_spect, self.ring) loss_array[i, 1] = beam_loss_factor(impedance, f, bunch_spect, self.ring) except AttributeError: pass loss_array = loss_array * 1e-12 summary = pd.DataFrame(loss_array, index=attr_list, columns=[ "loss factor (beam) [V/pC]", "loss factor (bunch) [V/pC]" ]) summary["P (beam) [W]"] = summary["loss factor (beam) [V/pC]"] * \ 1e12*Q**2/(self.ring.T0) summary["P (bunch) [W]"] = summary["loss factor (bunch) [V/pC]"] * \ 1e12*Q**2/(self.ring.T0)*M return summary
[docs] def power_loss_spectrum(self, sigma, M, bunch_spacing, I, n_points=10e6, max_overlap=False, plot=False): """ Compute the power loss spectrum of the summed longitudinal impedance as in Eq. (4) of [1]. Assume Gaussian bunches and constant spacing between bunches. Parameters ---------- sigma : float RMS bunch length in [s]. M : int Number of bunches in the beam. bunch_spacing : float Time between two bunches in [s]. I : float Total beam current in [A]. n_points : float, optional Number of points used in the frequency spectrum. max_overlap : bool, optional If True, the bunch spectrum (scaled to the number of bunches) is used instead of the beam spectrum to compute the maximum value of the power loss spectrum at each frequency. Should only be used to maximise the power loss at a given frequency (e.g. for HOMs) and not for the full spectrum. plot : bool, optional If True, plots: - the overlap between the real part of the longitudinal impedance and the beam spectrum. - the power loss spectrum. Returns ------- pf0 : array Frequency points. power_loss : array Power loss spectrum in [W]. References ---------- [1] : L. Teofili, et al. "A Multi-Physics Approach to Simulate the RF Heating 3D Power Map Induced by the Proton Beam in a Beam Intercepting Device", in IPAC'18, 2018, doi:10.18429/JACoW-IPAC2018-THPAK093 """ impedance = self.sum.Zlong fmax = impedance.data.index.max() fmin = impedance.data.index.min() Q = I * self.ring.T0 / M if fmin >= 0: fmin = -1 * fmax double_sided_impedance(impedance) frequency = np.linspace(fmin, fmax, int(n_points)) if max_overlap is False: spectrum = beam_spectrum(frequency, M, bunch_spacing, sigma) else: spectrum = gaussian_bunch_spectrum(frequency, sigma) * M pmax = np.floor(fmax / self.ring.f0) pmin = np.floor(fmin / self.ring.f0) p = np.arange(pmin + 1, pmax) pf0 = p * self.ring.f0 ReZ = np.real(impedance(pf0)) spectral_density = np.abs(spectrum)**2 # interpolation of the spectrum is needed to avoid problems liked to # division by 0 # computing the spectrum directly to the frequency points gives # wrong results spect = interp1d(frequency, spectral_density) power_loss = (self.ring.f0 * Q)**2 * ReZ * spect(pf0) if plot is True: fig, ax = plt.subplots() twin = ax.twinx() p1, = ax.plot(pf0, ReZ, color="r", label="Re[Z] [Ohm]") p2, = twin.plot(pf0, spect(pf0) * (self.ring.f0 * Q)**2, color="b", label="Beam spectrum [a.u.]") ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Re[Z] [Ohm]") twin.set_ylabel("Beam spectrum [a.u.]") plots = [p1, p2] ax.legend(handles=plots, loc="best") fig, ax = plt.subplots() ax.plot(pf0, power_loss) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Power loss [W]") return pf0, power_loss
[docs] def save(self, file): """ Save impedance model to file. The same pandas version is needed on both saving and loading computer for the pickle to work. Parameters ---------- file : str File where the impedance model is saved. Returns ------- None. """ to_save = { "wakefields": self.wakefields, "positions": self.positions, "names": self.names, "globals": self.globals, "globals_names": self.globals_names } with open(file, "wb") as f: pickle.dump(to_save, f)
[docs] def load(self, file): """ Load impedance model from file. The same pandas version is needed on both saving and loading computer for the pickle to work. Parameters ---------- file : str File where the impedance model is saved. Returns ------- None. """ with open(file, 'rb') as f: to_load = pickle.load(f) self.wakefields = to_load["wakefields"] self.positions = to_load["positions"] self.names = to_load["names"] self.globals = to_load["globals"] self.globals_names = to_load["globals_names"] self.compute_sum()