Skip to content
Snippets Groups Projects
Commit eedde804 authored by Alexis GAMELIN's avatar Alexis GAMELIN
Browse files

Import function for ABCI files

Add the read_ABCI function to import ABCI computed wake and impedance.
Improve the read_CST function to support wake potential.
Add a plot method for both Impedance and WakeFunction class.
parent bb0b5ed3
No related branches found
No related tags found
No related merge requests found
......@@ -257,7 +257,7 @@ class WakeFunction(ComplexData):
variable : array-like
Time domain structure of the wake function in [s].
function : array-like
Wake function values in [V/C].
Wake function values in [V/C] for "long" and in [V/C/m] for others.
component_type : str, optinal
Type of the wake function: "long", "xdip", "xquad", ...
......@@ -386,6 +386,25 @@ class WakeFunction(ComplexData):
self.__init__(variable=wf.data.index, function=wf.data["real"],
component_type=component_type)
def plot(self):
"""
Plot the wake function data.
Returns
-------
ax : class:`matplotlib.axes.Axes`
Wake function data.
"""
ax = self.data["real"].plot()
if self.component_type == "long":
unit = " [V/C]"
else:
unit = " [V/C/m]"
label = r"$W_{" + self.component_type + r"}$" + unit
ax.set_ylabel(label)
return ax
class Impedance(ComplexData):
"""
Define an Impedance object based on a ComplexData object.
......@@ -637,6 +656,25 @@ class Impedance(ComplexData):
component_type=self.component_type)
return wf
def plot(self):
"""
Plot the impedance data.
Returns
-------
ax : class:`matplotlib.axes.Axes`
Impedance data.
"""
ax = self.data.plot()
if self.component_type == "long":
unit = " [ohm]"
else:
unit = " [ohm/m]"
label = r"$Z_{" + self.component_type + r"}$" + unit
ax.set_ylabel(label)
return ax
class WakeField:
"""
Defines a WakeField which corresponds to a single physical element which
......
# -*- coding: utf-8 -*-
from mbtrack2.utilities.read_impedance import (read_CST,
read_IW2D,
read_IW2D_folder)
read_IW2D_folder,
read_ABCI)
from mbtrack2.utilities.misc import (effective_impedance,
yokoya_elliptic,
beam_loss_factor,
......
......@@ -4,43 +4,63 @@ Module where function used to import impedance and wakes from other codes are
defined.
"""
import os
import pandas as pd
import numpy as np
from pathlib import Path
from tempfile import NamedTemporaryFile
from scipy.constants import c
from mbtrack2.impedance.wakefield import Impedance, WakeFunction, WakeField
def read_CST(file, component_type='long', divide_by=None):
def read_CST(file, component_type='long', divide_by=None, imp=True):
"""
Read CST file format into an Impedance object.
Read CST text file format into an Impedance or WakeFunction object.
Parameters
----------
file : str
Path to the file to read.
Path to the text file to read.
component_type : str, optional
Type of the Impedance object.
Type of the Impedance or WakeFunction object to load.
Default is 'long'.
divide_by : float, optional
Divide the impedance by a value. Mainly used to normalize transverse
impedance by displacement.
Default is None.
imp : bool, optional.
If True a Impedance object is loaded, if False a WakeFunction object is
loaded.
Default is True.
Returns
-------
result : Impedance object
result : Impedance or WakeFunction object.
Data from file.
"""
df = pd.read_csv(file, comment="#", header = None, sep = "\t",
names = ["Frequency","Real","Imaginary"])
df["Frequency"] = df["Frequency"]*1e9
if divide_by is not None:
df["Real"] = df["Real"]/divide_by
df["Imaginary"] = df["Imaginary"]/divide_by
if component_type == "long":
df["Real"] = np.abs(df["Real"])
df.set_index("Frequency", inplace = True)
result = Impedance(variable = df.index,
function = df["Real"] + 1j*df["Imaginary"],
component_type=component_type)
if imp:
df = pd.read_csv(file, comment="#", header = None, sep = "\t",
names = ["Frequency","Real","Imaginary"])
df["Frequency"] = df["Frequency"]*1e9
if divide_by is not None:
df["Real"] = df["Real"]/divide_by
df["Imaginary"] = df["Imaginary"]/divide_by
if component_type == "long":
df["Real"] = np.abs(df["Real"])
df.set_index("Frequency", inplace = True)
result = Impedance(variable = df.index,
function = df["Real"] + 1j*df["Imaginary"],
component_type=component_type)
else:
df = pd.read_csv(file, comment="#", header = None, sep = "\t",
names = ["Distance","Wake"])
df["Time"] = df["Distance"]*1e-3/c
df["Wake"] = df["Wake"]*1e-12
if divide_by is not None:
df["Wake"] = df["Wake"]/divide_by
df.set_index("Time", inplace = True)
result = WakeFunction(variable = df.index,
function = df["Wake"],
component_type=component_type)
return result
def read_IW2D(file, file_type='Zlong'):
......@@ -78,8 +98,6 @@ def read_IW2D(file, file_type='Zlong'):
df = df.interpolate()
print("Nan values have been interpolated to:")
print(df[index])
# if file_type == "Wlong":
# df["Wake"] = df["Wake"]*-1
result = WakeFunction(variable = df.index,
function = df["Wake"],
component_type=file_type[1:])
......@@ -131,4 +149,133 @@ def read_IW2D_folder(folder, suffix, select="WZ"):
wake = WakeField(list_for_wakefield)
return wake
def read_ABCI(file, azimuthal=False):
"""
Read ABCI output files [1].
Parameters
----------
file : str
Path to ABCI .pot file.
azimuthal : bool, optional
If True, the transverse wake potential and impedance is loaded from the
"AZIMUTHAL" data. In that case, a -1 factor is applied on the wake to
agree with mbtrack2 sign convention.
If False, it is loaded from the "TRANSVERSE" data.
The default is False.
Returns
-------
wake : WakeField
Object where the ABCI computed impedance and wake are stored.
References
----------
[1] : ABCI - https://abci.kek.jp/abci.htm
"""
if azimuthal:
source="AZIMUTHAL"
else:
source="TRANSVERSE"
def _read_temp(file, file_type, file2=None):
if file_type[0] == "Z":
df = pd.read_csv(file, delim_whitespace=True,
names=["Frequency","Real"])
df["Real"] = df["Real"]*1e3
df["Frequency"] = df["Frequency"]*1e9
df2 = pd.read_csv(file2, delim_whitespace=True,
names=["Frequency","Imaginary"])
df2["Imaginary"] = df2["Imaginary"]*1e3
df2["Frequency"] = df2["Frequency"]*1e9
df.set_index("Frequency", inplace = True)
df2.set_index("Frequency", inplace = True)
result = Impedance(variable = df.index,
function = df["Real"] + 1j*df2["Imaginary"],
component_type=file_type[1:])
elif file_type[0] == "W":
df = pd.read_csv(file, delim_whitespace=True,
names=["Time","Wake"])
df["Time"] = df["Time"] / c
df["Wake"] = df["Wake"] * 1e-12
df.set_index("Time", inplace = True)
result = WakeFunction(variable = df.index,
function = df["Wake"],
component_type=file_type[1:])
return result
abci_dict = {' TITLE: LONGITUDINAL WAKE POTENTIAL \n':'Wlong',
' TITLE: REAL PART OF LONGITUDINAL IMPEDANCE \n':'Zlong_re',
' TITLE: IMAGINARY PART OF LONGITUDINAL IMPEDANCE \n':'Zlong_im',
f' TITLE: {source} WAKE POTENTIAL \n':'Wxdip',
f' TITLE: REAL PART OF {source} IMPEDANCE \n':'Zxdip_re',
f' TITLE: IMAGINARY PART OF {source} IMPEDANCE \n':'Zxdip_im'}
wake_list = []
start = True
with open(file) as f:
while True:
if start is True:
# read the header
header = [next(f) for _ in range(5)]
start = False
elif line == '':
# check if file is over
break
else:
# read the header
header = [next(f) for _ in range(4)]
header.insert(0, line)
# read the body until next TITLE field or end of file
body = []
while True:
line = f.readline()
try:
if line[:8] == " TITLE:":
break
if line == '':
break
except:
pass
body.append(line)
# write body in temp file and then process it into wake/imp
try:
if abci_dict[header[0]][0] == "W":
tmp = NamedTemporaryFile(delete=False, mode = "w+")
tmp.writelines(body)
tmp.flush()
tmp.close()
comp = _read_temp(tmp.name, abci_dict[header[0]])
os.unlink(tmp.name)
wake_list.append(comp)
elif (abci_dict[header[0]][0] == "Z") and (abci_dict[header[0]][-2:] == "re"):
tmp1 = NamedTemporaryFile(delete=False, mode = "w+")
tmp1.writelines(body)
tmp1.flush()
tmp1.close()
elif (abci_dict[header[0]][0] == "Z") and (abci_dict[header[0]][-2:] == "im"):
tmp2 = NamedTemporaryFile(delete=False, mode = "w+")
tmp2.writelines(body)
tmp2.flush()
tmp2.close()
if abci_dict[header[0]][1:-3] == "long":
comp = _read_temp(tmp1.name, "Zlong", tmp2.name)
wake_list.append(comp)
else:
comp_x = _read_temp(tmp1.name, "Zxdip", tmp2.name)
comp_y = _read_temp(tmp1.name, "Zydip", tmp2.name)
wake_list.append(comp_x)
wake_list.append(comp_y)
os.unlink(tmp1.name)
os.unlink(tmp2.name)
except KeyError:
pass
wake = WakeField(wake_list)
return wake
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment