Skip to content
Snippets Groups Projects
Commit 8adb9c18 authored by Alexis GAMELIN's avatar Alexis GAMELIN
Browse files

Add BeamSpectrumMonitor

Add a monitor for the coherent beam spectrum (i.e. coupled bunch modes).
Add utility functions to BunchSpectrumMonitor to know the fft and signal resolution.
Save the frequency samples for BunchSpectrumMonitor in the hdf5 file.
parent 9355219a
No related branches found
No related tags found
No related merge requests found
......@@ -12,7 +12,8 @@ from mbtrack2.tracking.monitors.monitors import (Monitor, BunchMonitor,
WakePotentialMonitor,
TuneMonitor,
CavityMonitor,
BunchSpectrumMonitor)
BunchSpectrumMonitor,
BeamSpectrumMonitor)
from mbtrack2.tracking.monitors.plotting import (plot_bunchdata,
plot_phasespacedata,
plot_profiledata,
......
......@@ -1089,6 +1089,15 @@ class BunchSpectrumMonitor(Monitor):
allow several cores to write in the same file at the same time.
If False, open the HDF5 file in standard mode.
Attributes
----------
fft_resolution : float
Return the fft resolution in [Hz].
signal_resolution : float
Return the signal resolution in [Hz].
frequency_samples : array of float
Return the fft frequency samples in [Hz].
Methods
-------
track(bunch):
......@@ -1146,9 +1155,37 @@ class BunchSpectrumMonitor(Monitor):
self.incoherent = np.zeros((3, self.n_fft//2+1, buffer_size))
self.coherent = np.zeros((3, self.n_fft//2+1, buffer_size))
self.file[self.group_name].create_dataset(
"freq", data=self.frequency_samples)
@property
def fft_resolution(self):
"""
Return the fft resolution in [Hz].
It is defined as the sampling frequency over the number of samples.
"""
return self.ring.f0/self.n_fft
@property
def signal_resolution(self):
"""
Return the signal resolution in [Hz].
It is defined as the inverse of the signal length.
"""
return 1/(self.ring.T0*self.save_every)
@property
def frequency_samples(self):
"""
Return the fft frequency samples in [Hz].
"""
return rfftfreq(self.n_fft, self.ring.T0)
def track(self, object_to_save):
"""
Save positions and mean data.
Save spectrum data.
Parameters
----------
......@@ -1227,7 +1264,7 @@ class BunchSpectrumMonitor(Monitor):
Returns
-------
incoherent : array
The average of the transformed input in each plane.
Bunch incoherent spectrum.
"""
fourier = rfft(positions, n=self.n_fft)
......@@ -1243,7 +1280,196 @@ class BunchSpectrumMonitor(Monitor):
Returns
-------
coherent : array
The average of the transformed input in each plane.
Bunch coherent spectrum.
"""
coherent = np.abs(rfft(mean, n=self.n_fft))
return coherent
class BeamSpectrumMonitor(Monitor):
"""
Monitor coherent beam spectrum.
Parameters
----------
ring : Synchrotron object
n_fft : int or float, optional
The number of points used for FFT computation, if n_fft is bigger than
save_every zero-padding is applied.
file_name : string, optional
Name of the HDF5 where the data will be stored. Must be specified
the first time a subclass of Monitor is instancied and must be None
the following times.
save_every : int or float, optional
Set the frequency of the save. The spectrum is computed saved every
save_every call of the montior.
buffer_size : int or float, optional
Size of the save buffer.
total_size : int or float, optional
Total size of the save. The following relationships between the
parameters must exist:
total_size % buffer_size == 0
number of call to track / save_every == total_size - 1
mpi_mode : bool, optional
If True, open the HDF5 file in parallel mode, which is needed to
allow several cores to write in the same file at the same time.
If False, open the HDF5 file in standard mode.
Attributes
----------
fft_resolution : float
Return the fft resolution in [Hz].
signal_resolution : float
Return the signal resolution in [Hz].
frequency_samples : array of float
Return the fft frequency samples in [Hz].
Methods
-------
track(bunch):
Save tune and/or FFT data.
"""
def __init__(self, ring, n_fft=10000,
file_name=None, save_every=10,
buffer_size=5, total_size=10, mpi_mode=True,
dim="all"):
self.n_fft = int(n_fft)
self.store_dict = {"x":0,"y":1,"tau":2}
if dim == "all":
self.track_dict = {"x":0,"y":1,"tau":2}
self.mean_index = [True, False, True, False, True, False]
elif dim == "tau":
self.track_dict = {"tau":0}
self.mean_index = [False, False, False, False, True, False]
elif dim == "x":
self.track_dict = {"x":0}
self.mean_index = [True, False, False, False, False, False]
elif dim == "y":
self.track_dict = {"y":0}
self.mean_index = [False, False, True, False, False, False]
self.size_list = len(self.track_dict)
self.ring = ring
group_name = "BeamSpectrum"
dict_buffer = {"coherent":(3, n_fft//2+1, buffer_size)}
dict_file = {"coherent":(3, n_fft//2+1, total_size)}
self.monitor_init(group_name, save_every, buffer_size, total_size,
dict_buffer, dict_file, file_name, mpi_mode)
self.dict_buffer = dict_buffer
self.dict_file = dict_file
self.save_count = 0
self.mean = np.zeros((self.size_list, ring.h, save_every))
self.coherent = np.zeros((3, self.n_fft//2+1, buffer_size))
self.file[self.group_name].create_dataset(
"freq", data=self.frequency_samples)
@property
def fft_resolution(self):
"""
Return the fft resolution in [Hz].
It is defined as the sampling frequency over the number of samples.
"""
return self.ring.f1/self.n_fft
@property
def signal_resolution(self):
"""
Return the signal resolution in [Hz].
It is defined as the inverse of the signal length.
"""
return 1/(self.ring.T0*self.save_every)
@property
def frequency_samples(self):
"""
Return the fft frequency samples in [Hz].
"""
return rfftfreq(self.n_fft, self.ring.T1)
def track(self, beam):
"""
Save mean data.
Parameters
----------
beam : Beam object
"""
if (beam.mpi_switch == True):
bunch_num = beam.mpi.bunch_num
bunch = beam[bunch_num]
self.mean[:, bunch_num, self.buffer_count] = bunch.mean[self.mean_index]
else:
self.mean[:, :, self.buffer_count] = beam.bunch_mean[self.mean_index,:]
self.save_count += 1
if self.track_count > 0 and self.track_count % self.save_every == 0:
self.to_buffer(beam)
self.save_count = 0
self.track_count += 1
def to_buffer(self, beam):
"""
A method to hold saved data before writing it to the output file.
"""
self.time[self.buffer_count] = self.track_count
for key, value in self.track_dict.items():
if (beam.mpi_switch == True):
data_core = self.mean[value, beam.mpi.bunch_num, :]
full_data = beam.mpi.comm.allgather(data_core)
data = np.reshape(full_data, (-1), 'F')
else:
data = np.reshape(self.mean[value, :, :], (-1), 'F')
self.coherent[self.store_dict[key],:,self.buffer_count] = self.get_beam_spectrum(data)
self.buffer_count += 1
if self.buffer_count == self.buffer_size:
self.write()
self.buffer_count = 0
def write(self):
"""
Write data from buffer to output file.
"""
self.file[self.group_name]["time"][self.write_count*self.buffer_size:(
self.write_count+1)*self.buffer_size] = self.time
self.file[self.group_name]["coherent"][:,:,
self.write_count * self.buffer_size:(self.write_count+1) *
self.buffer_size] = self.coherent
self.file.flush()
self.write_count += 1
def get_beam_spectrum(self, mean):
"""
Compute the beam coherent spectrum i.e. the absolute value of the FT
of the mean position of every bunch.
Returns
-------
coherent : array
The beam coherent spectrum.
"""
coherent = np.abs(rfft(mean, n=self.n_fft))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment