Skip to content
Snippets Groups Projects
Commit b0da65b0 authored by Gamelin Alexis's avatar Gamelin Alexis
Browse files

Monitor with a shared open HDF5 file between cores

The HDF5 is now open only once on all the cores to be able to write in parallel in the same file. The file must be closed at the end of the tracking.
BeamMonitor is used to save global statistics of all bunches
Add docstrings
TODO : Adding a flush() call at some point might be needed !
parent a5daf052
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
"""
This module defines the different monitor class which are used to save data
from tracking.
during tracking.
@author: Alexis Gamelin
@date: 17/03/2020
@date: 24/03/2020
"""
import numpy as np
import h5py as hp
from tracking.element import Element
from tracking.particles import Bunch, Beam
from abc import ABCMeta, abstractmethod
from abc import ABCMeta
from mpi4py import MPI
class Monitor(Element, metaclass=ABCMeta):
"""
Abstract Monitor class used for subclass inheritance to define all the
different kind of monitors objects.
The Monitor class is based on h5py module to be able to write data on
structured binary files. The class provides a common file where the
different Monitor subclass can write.
Attributes
----------
file : HDF5 file
Common file where all monitors, Monitor subclass elements, write the
saved data. Based on class attribute _file_storage.
file_name : string
Name of the HDF5 file where the data is stored. Based on class
attribute _file_name_storage.
Methods
-------
monitor_init(group_name, save_every, buffer_size, total_size,
dict_buffer, dict_file, file_name=None, mpi_mode=True)
Method called to initialize Monitor subclass.
write()
Write data from buffer to the HDF5 file.
to_buffer(object_to_save)
Save data to buffer.
close()
Close the HDF5 file shared by all Monitor subclass, must be called
by at least an instance of a Montior subclass at the end of the
tracking.
"""
file_name_storage = []
_file_name_storage = []
_file_storage = []
@property
def file_name(self):
"""Common file where all monitors, Monitor subclass elements, write the
saved data."""
try:
return self._file_name_storage[0]
except IndexError:
print("The HDF5 file name for monitors is not set.")
raise ValueError
@property
def file(self):
"""Name of the HDF5 file where the data is stored."""
try:
return self.file_name_storage[0]
return self._file_storage[0]
except IndexError:
print("File name for monitors not set.")
print("The HDF5 file to store data is not set.")
raise ValueError
def monitor_init(self, group_name, save_every, total_size,
buffer_size, dict_buffer, dict_file, file_name=None):
def monitor_init(self, group_name, save_every, buffer_size, total_size,
dict_buffer, dict_file, file_name=None, mpi_mode=True):
"""
Method called to initialize Monitor subclass.
Parameters
----------
group_name : string
Name of the HDF5 group in which the data for the current monitor
will be saved.
save_every : int or float
Set the frequency of the save. The data is saved every save_every
call of the montior.
buffer_size : int or float
Size of the save buffer.
total_size : int or float
Total size of the save. The following relationships between the
parameters must exist:
total_size % buffer_size == 0
number of call to track / save_every == total_size
dict_buffer : dict
Dictionary with keys as the attribute name to save and values as
the shape of the buffer to create to hold the attribute, like
(key.shape, buffer_size)
dict_file : dict
Dictionary with keys as the attribute name to save and values as
the shape of the dataset to create to hold the attribute, like
(key.shape, total_size)
file_name : string, optional
Name of the HDF5 where the data will be stored. Must be specified
the first time a subclass of Monitor is instancied and must be None
the following times.
mpi_mode : bool, optional
If True, open the HDF5 file in parallel mode, which is needed to
allow several cores to write in the same file at the same time.
If False, open the HDF5 file in standard mode.
"""
# setup and open common file for all monitors
if file_name is not None:
if len(self.file_name_storage) == 0:
self.file_name_storage.append(file_name + ".hdf5")
if len(self._file_name_storage) == 0:
self._file_name_storage.append(file_name + ".hdf5")
if len(self._file_storage) == 0:
if mpi_mode == True:
f = hp.File(self.file_name, "w", libver='latest',
driver='mpio', comm=MPI.COMM_WORLD)
else:
f = hp.File(self.file_name, "w", libver='latest')
self._file_storage.append(f)
else:
raise ValueError("File is already open.")
else:
raise ValueError("File name for monitors is already attributed.")
......@@ -46,17 +134,19 @@ class Monitor(Element, metaclass=ABCMeta):
self.write_count = 0
self.track_count = 0
# setup attribute buffers from values given in dict_buffer
for key, value in dict_buffer.items():
self.__setattr__(key,np.zeros(value))
self.time = np.zeros((self.buffer_size,), dtype=int)
# create HDF5 groups and datasets to save data from group_name and
# dict_file
self.g = self.file.require_group(self.group_name)
self.g.require_dataset("time", (self.total_size,), dtype=int)
for key, value in dict_file.items():
self.g.require_dataset(key, value, dtype=float)
with hp.File(self.file_name, "a", libver='latest') as f:
g = f.require_group(self.group_name)
g.require_dataset("time", (self.total_size,), dtype=int)
for key, value in dict_file.items():
g.require_dataset(key, value, dtype=float)
# create a dictionary which
slice_dict = {}
for key, value in dict_file.items():
slice_dict[key] = []
......@@ -65,23 +155,27 @@ class Monitor(Element, metaclass=ABCMeta):
self.slice_dict = slice_dict
def write(self):
"""
"""
"""Write data from buffer to the HDF5 file."""
with hp.File(self.file_name, "a", libver='latest') as f:
f[self.group_name]["time"][self.write_count*self.buffer_size:(
self.file[self.group_name]["time"][self.write_count*self.buffer_size:(
self.write_count+1)*self.buffer_size] = self.time
for key, value in self.dict_buffer.items():
slice_list = list(self.slice_dict[key])
slice_list.append(slice(self.write_count*self.buffer_size,
(self.write_count+1)*self.buffer_size))
slice_tuple = tuple(slice_list)
f[self.group_name][key][slice_tuple] = self.__getattribute__(key)
for key, value in self.dict_buffer.items():
slice_list = list(self.slice_dict[key])
slice_list.append(slice(self.write_count*self.buffer_size,
(self.write_count+1)*self.buffer_size))
slice_tuple = tuple(slice_list)
self.file[self.group_name][key][slice_tuple] = self.__getattribute__(key)
self.write_count += 1
def to_buffer(self, object_to_save):
"""
Save data to buffer.
Parameters
----------
object_to_save : python object
Depends on the Monitor subclass, typically a Beam or Bunch object.
"""
self.time[self.buffer_count] = self.track_count
for key, value in self.dict_buffer.items():
......@@ -95,28 +189,72 @@ class Monitor(Element, metaclass=ABCMeta):
self.write()
self.buffer_count = 0
def close(self):
"""
Close the HDF5 file shared by all Monitor subclass, must be called
by at least an instance of a Montior subclass at the end of the
tracking.
"""
try:
self.file.close()
except ValueError:
pass
class BunchMonitor(Monitor):
"""
Monitor a
Monitor a single bunch and save attributes (mean, std, emit and current).
Parameters
----------
bunch_number : int
Bunch to monitor
file_name : string, optional
Name of the HDF5 where the data will be stored. Must be specified
the first time a subclass of Monitor is instancied and must be None
the following times.
save_every : int or float, optional
Set the frequency of the save. The data is saved every save_every
call of the montior.
buffer_size : int or float, optional
Size of the save buffer.
total_size : int or float, optional
Total size of the save. The following relationships between the
parameters must exist:
total_size % buffer_size == 0
number of call to track / save_every == total_size
mpi_mode : bool, optional
If True, open the HDF5 file in parallel mode, which is needed to
allow several cores to write in the same file at the same time.
If False, open the HDF5 file in standard mode.
Methods
-------
track(object_to_save)
Save data
"""
def __init__(self, bunch_number, file_name=None,
save_every=5, buffer_size = 500, total_size = 1e5):
def __init__(self, bunch_number, file_name=None, save_every=5,
buffer_size=500, total_size=2e4, mpi_mode=True):
self.bunch_number = bunch_number
group_name = "BunchData_" + str(self.bunch_number)
dict_buffer = {"mean":(6,buffer_size), "std":(6,buffer_size),
"emit":(3,buffer_size), "current":(buffer_size,)}
dict_file = {"mean":(6,total_size), "std":(6,total_size),
"emit":(3,total_size), "current":(total_size,)}
self.monitor_init(group_name, save_every, total_size,
buffer_size, dict_buffer, dict_file, file_name)
dict_buffer = {"mean":(6, buffer_size), "std":(6, buffer_size),
"emit":(3, buffer_size), "current":(buffer_size,)}
dict_file = {"mean":(6, total_size), "std":(6, total_size),
"emit":(3, total_size), "current":(total_size,)}
self.monitor_init(group_name, save_every, buffer_size, total_size,
dict_buffer, dict_file, file_name, mpi_mode)
self.dict_buffer = dict_buffer
self.dict_file = dict_file
def track(self, object_to_save):
"""
Save data
Parameters
----------
object_to_save : Bunch or Beam object
"""
if self.track_count % self.save_every == 0:
if isinstance(object_to_save, Beam):
......@@ -134,25 +272,60 @@ class BunchMonitor(Monitor):
class PhaseSpaceMonitor(Monitor):
"""
Monitor a
Monitor a single bunch and save the full phase space.
Parameters
----------
bunch_number : int
Bunch to monitor
mp_number : int or float
Number of macroparticle in the phase space to save.
file_name : string, optional
Name of the HDF5 where the data will be stored. Must be specified
the first time a subclass of Monitor is instancied and must be None
the following times.
save_every : int or float, optional
Set the frequency of the save. The data is saved every save_every
call of the montior.
buffer_size : int or float, optional
Size of the save buffer.
total_size : int or float, optional
Total size of the save. The following relationships between the
parameters must exist:
total_size % buffer_size == 0
number of call to track / save_every == total_size
mpi_mode : bool, optional
If True, open the HDF5 file in parallel mode, which is needed to
allow several cores to write in the same file at the same time.
If False, open the HDF5 file in standard mode.
Methods
-------
track(object_to_save)
Save data
"""
def __init__(self, bunch_number, mp_number, file_name=None,
save_every=1e4, buffer_size = 1, total_size = 100):
def __init__(self, bunch_number, mp_number, file_name=None, save_every=1e3,
buffer_size=10, total_size=100, mpi_mode=True):
self.bunch_number = bunch_number
self.mp_number = int(mp_number)
group_name = "PhaseSpaceData_" + str(self.bunch_number)
dict_buffer = {"particles":(self.mp_number, 6, buffer_size)}
dict_file = {"particles":(self.mp_number, 6, total_size)}
self.monitor_init(group_name, save_every, total_size,
buffer_size, dict_buffer, dict_file, file_name)
self.monitor_init(group_name, save_every, buffer_size, total_size,
dict_buffer, dict_file, file_name, mpi_mode)
self.dict_buffer = dict_buffer
self.dict_file = dict_file
def track(self, object_to_save):
"""
Save data
Parameters
----------
object_to_save : Bunch or Beam object
"""
if self.track_count % self.save_every == 0:
......@@ -167,3 +340,124 @@ class PhaseSpaceMonitor(Monitor):
else:
raise TypeError("object_to_save should be a Beam or Bunch object.")
self.track_count += 1
class BeamMonitor(Monitor):
"""
Monitor the full beam and save each bunch attributes (mean, std, emit and
current).
Parameters
----------
file_name : string, optional
Name of the HDF5 where the data will be stored. Must be specified
the first time a subclass of Monitor is instancied and must be None
the following times.
save_every : int or float, optional
Set the frequency of the save. The data is saved every save_every
call of the montior.
buffer_size : int or float, optional
Size of the save buffer.
total_size : int or float, optional
Total size of the save. The following relationships between the
parameters must exist:
total_size % buffer_size == 0
number of call to track / save_every == total_size
mpi_mode : bool, optional
If True, open the HDF5 file in parallel mode, which is needed to
allow several cores to write in the same file at the same time.
If False, open the HDF5 file in standard mode.
Methods
-------
track(beam)
Save data
"""
def __init__(self, file_name=None, save_every=5, buffer_size=500,
total_size=2e4, mpi_mode=True):
group_name = "Beam"
dict_buffer = {}
dict_file = {}
self.monitor_init(group_name, save_every, buffer_size, total_size,
dict_buffer, dict_file, file_name, mpi_mode)
self.mean = np.zeros((6, self.buffer_size), dtype=float)
self.std = np.zeros((6, self.buffer_size), dtype=float)
self.emit = np.zeros((3, self.buffer_size), dtype=float)
self.current = np.zeros((self.buffer_size,), dtype=float)
self.g.require_dataset("mean", (6, 416, self.total_size,), dtype=float)
self.g.require_dataset("std", (6, 416, self.total_size,), dtype=float)
self.g.require_dataset("emit", (3, 416, self.total_size,), dtype=float)
self.g.require_dataset("current", (416, self.total_size,), dtype=float)
def track(self, beam):
"""
Save data
Parameters
----------
beam : Beam object
"""
if self.track_count % self.save_every == 0:
if (beam.mpi_switch == True):
self.to_buffer(beam[beam.mpi.bunch_num], beam.mpi.bunch_num)
else:
raise NotImplementedError
self.track_count += 1
def to_buffer(self, bunch, bunch_num):
"""
Save data to buffer.
Parameters
----------
bunch : Bunch object
bunch_num : int
"""
self.time[self.buffer_count] = self.track_count
self.mean[:, self.buffer_count] = bunch.mean
self.std[:, self.buffer_count] = bunch.std
self.emit[:, self.buffer_count] = bunch.emit
self.current[self.buffer_count] = bunch.current
self.buffer_count += 1
if self.buffer_count == self.buffer_size:
self.write(bunch_num)
self.buffer_count = 0
def write(self, bunch_num):
"""
Write data from buffer to the HDF5 file.
Parameters
----------
bunch_num : int
"""
self.file[self.group_name]["time"][self.write_count*self.buffer_size:(
self.write_count+1)*self.buffer_size] = self.time
self.file[self.group_name]["mean"][:, bunch_num,
self.write_count*self.buffer_size:(self.write_count+1) *
self.buffer_size] = self.mean
self.file[self.group_name]["std"][:, bunch_num,
self.write_count*self.buffer_size:(self.write_count+1) *
self.buffer_size] = self.std
self.file[self.group_name]["emit"][:, bunch_num,
self.write_count*self.buffer_size:(self.write_count+1) *
self.buffer_size] = self.emit
self.file[self.group_name]["current"][bunch_num,
self.write_count*self.buffer_size:(self.write_count+1) *
self.buffer_size] = self.current
self.write_count += 1
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment