From b0da65b08064f2520315d66eaf7e3866c583aac2 Mon Sep 17 00:00:00 2001 From: Gamelin Alexis <gamelin@synchrotron-soleil.fr> Date: Tue, 24 Mar 2020 19:39:19 +0100 Subject: [PATCH] Monitor with a shared open HDF5 file between cores The HDF5 is now open only once on all the cores to be able to write in parallel in the same file. The file must be closed at the end of the tracking. BeamMonitor is used to save global statistics of all bunches Add docstrings TODO : Adding a flush() call at some point might be needed ! --- tracking/monitors.py | 378 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 336 insertions(+), 42 deletions(-) diff --git a/tracking/monitors.py b/tracking/monitors.py index 5d6f686..078c58a 100644 --- a/tracking/monitors.py +++ b/tracking/monitors.py @@ -1,38 +1,126 @@ # -*- coding: utf-8 -*- """ This module defines the different monitor class which are used to save data -from tracking. +during tracking. @author: Alexis Gamelin -@date: 17/03/2020 +@date: 24/03/2020 + """ import numpy as np import h5py as hp from tracking.element import Element from tracking.particles import Bunch, Beam -from abc import ABCMeta, abstractmethod +from abc import ABCMeta +from mpi4py import MPI class Monitor(Element, metaclass=ABCMeta): """ + Abstract Monitor class used for subclass inheritance to define all the + different kind of monitors objects. + + The Monitor class is based on h5py module to be able to write data on + structured binary files. The class provides a common file where the + different Monitor subclass can write. + + Attributes + ---------- + file : HDF5 file + Common file where all monitors, Monitor subclass elements, write the + saved data. Based on class attribute _file_storage. + file_name : string + Name of the HDF5 file where the data is stored. Based on class + attribute _file_name_storage. + + Methods + ------- + monitor_init(group_name, save_every, buffer_size, total_size, + dict_buffer, dict_file, file_name=None, mpi_mode=True) + Method called to initialize Monitor subclass. + write() + Write data from buffer to the HDF5 file. + to_buffer(object_to_save) + Save data to buffer. + close() + Close the HDF5 file shared by all Monitor subclass, must be called + by at least an instance of a Montior subclass at the end of the + tracking. """ - file_name_storage = [] + _file_name_storage = [] + _file_storage = [] @property def file_name(self): + """Common file where all monitors, Monitor subclass elements, write the + saved data.""" + try: + return self._file_name_storage[0] + except IndexError: + print("The HDF5 file name for monitors is not set.") + raise ValueError + + @property + def file(self): + """Name of the HDF5 file where the data is stored.""" try: - return self.file_name_storage[0] + return self._file_storage[0] except IndexError: - print("File name for monitors not set.") + print("The HDF5 file to store data is not set.") raise ValueError - def monitor_init(self, group_name, save_every, total_size, - buffer_size, dict_buffer, dict_file, file_name=None): + def monitor_init(self, group_name, save_every, buffer_size, total_size, + dict_buffer, dict_file, file_name=None, mpi_mode=True): + """ + Method called to initialize Monitor subclass. + Parameters + ---------- + group_name : string + Name of the HDF5 group in which the data for the current monitor + will be saved. + save_every : int or float + Set the frequency of the save. The data is saved every save_every + call of the montior. + buffer_size : int or float + Size of the save buffer. + total_size : int or float + Total size of the save. The following relationships between the + parameters must exist: + total_size % buffer_size == 0 + number of call to track / save_every == total_size + dict_buffer : dict + Dictionary with keys as the attribute name to save and values as + the shape of the buffer to create to hold the attribute, like + (key.shape, buffer_size) + dict_file : dict + Dictionary with keys as the attribute name to save and values as + the shape of the dataset to create to hold the attribute, like + (key.shape, total_size) + file_name : string, optional + Name of the HDF5 where the data will be stored. Must be specified + the first time a subclass of Monitor is instancied and must be None + the following times. + mpi_mode : bool, optional + If True, open the HDF5 file in parallel mode, which is needed to + allow several cores to write in the same file at the same time. + If False, open the HDF5 file in standard mode. + """ + + # setup and open common file for all monitors if file_name is not None: - if len(self.file_name_storage) == 0: - self.file_name_storage.append(file_name + ".hdf5") + if len(self._file_name_storage) == 0: + self._file_name_storage.append(file_name + ".hdf5") + if len(self._file_storage) == 0: + if mpi_mode == True: + f = hp.File(self.file_name, "w", libver='latest', + driver='mpio', comm=MPI.COMM_WORLD) + else: + f = hp.File(self.file_name, "w", libver='latest') + self._file_storage.append(f) + else: + raise ValueError("File is already open.") else: raise ValueError("File name for monitors is already attributed.") @@ -46,17 +134,19 @@ class Monitor(Element, metaclass=ABCMeta): self.write_count = 0 self.track_count = 0 + # setup attribute buffers from values given in dict_buffer for key, value in dict_buffer.items(): self.__setattr__(key,np.zeros(value)) self.time = np.zeros((self.buffer_size,), dtype=int) + + # create HDF5 groups and datasets to save data from group_name and + # dict_file + self.g = self.file.require_group(self.group_name) + self.g.require_dataset("time", (self.total_size,), dtype=int) + for key, value in dict_file.items(): + self.g.require_dataset(key, value, dtype=float) - with hp.File(self.file_name, "a", libver='latest') as f: - g = f.require_group(self.group_name) - g.require_dataset("time", (self.total_size,), dtype=int) - for key, value in dict_file.items(): - g.require_dataset(key, value, dtype=float) - - + # create a dictionary which slice_dict = {} for key, value in dict_file.items(): slice_dict[key] = [] @@ -65,23 +155,27 @@ class Monitor(Element, metaclass=ABCMeta): self.slice_dict = slice_dict def write(self): - """ - """ + """Write data from buffer to the HDF5 file.""" - with hp.File(self.file_name, "a", libver='latest') as f: - f[self.group_name]["time"][self.write_count*self.buffer_size:( + self.file[self.group_name]["time"][self.write_count*self.buffer_size:( self.write_count+1)*self.buffer_size] = self.time - for key, value in self.dict_buffer.items(): - slice_list = list(self.slice_dict[key]) - slice_list.append(slice(self.write_count*self.buffer_size, - (self.write_count+1)*self.buffer_size)) - slice_tuple = tuple(slice_list) - - f[self.group_name][key][slice_tuple] = self.__getattribute__(key) + for key, value in self.dict_buffer.items(): + slice_list = list(self.slice_dict[key]) + slice_list.append(slice(self.write_count*self.buffer_size, + (self.write_count+1)*self.buffer_size)) + slice_tuple = tuple(slice_list) + self.file[self.group_name][key][slice_tuple] = self.__getattribute__(key) + self.write_count += 1 def to_buffer(self, object_to_save): """ + Save data to buffer. + + Parameters + ---------- + object_to_save : python object + Depends on the Monitor subclass, typically a Beam or Bunch object. """ self.time[self.buffer_count] = self.track_count for key, value in self.dict_buffer.items(): @@ -95,28 +189,72 @@ class Monitor(Element, metaclass=ABCMeta): self.write() self.buffer_count = 0 + def close(self): + """ + Close the HDF5 file shared by all Monitor subclass, must be called + by at least an instance of a Montior subclass at the end of the + tracking. + """ + try: + self.file.close() + except ValueError: + pass + class BunchMonitor(Monitor): """ - Monitor a + Monitor a single bunch and save attributes (mean, std, emit and current). + + Parameters + ---------- + bunch_number : int + Bunch to monitor + file_name : string, optional + Name of the HDF5 where the data will be stored. Must be specified + the first time a subclass of Monitor is instancied and must be None + the following times. + save_every : int or float, optional + Set the frequency of the save. The data is saved every save_every + call of the montior. + buffer_size : int or float, optional + Size of the save buffer. + total_size : int or float, optional + Total size of the save. The following relationships between the + parameters must exist: + total_size % buffer_size == 0 + number of call to track / save_every == total_size + mpi_mode : bool, optional + If True, open the HDF5 file in parallel mode, which is needed to + allow several cores to write in the same file at the same time. + If False, open the HDF5 file in standard mode. + + Methods + ------- + track(object_to_save) + Save data """ - def __init__(self, bunch_number, file_name=None, - save_every=5, buffer_size = 500, total_size = 1e5): + def __init__(self, bunch_number, file_name=None, save_every=5, + buffer_size=500, total_size=2e4, mpi_mode=True): self.bunch_number = bunch_number group_name = "BunchData_" + str(self.bunch_number) - dict_buffer = {"mean":(6,buffer_size), "std":(6,buffer_size), - "emit":(3,buffer_size), "current":(buffer_size,)} - dict_file = {"mean":(6,total_size), "std":(6,total_size), - "emit":(3,total_size), "current":(total_size,)} - self.monitor_init(group_name, save_every, total_size, - buffer_size, dict_buffer, dict_file, file_name) + dict_buffer = {"mean":(6, buffer_size), "std":(6, buffer_size), + "emit":(3, buffer_size), "current":(buffer_size,)} + dict_file = {"mean":(6, total_size), "std":(6, total_size), + "emit":(3, total_size), "current":(total_size,)} + self.monitor_init(group_name, save_every, buffer_size, total_size, + dict_buffer, dict_file, file_name, mpi_mode) self.dict_buffer = dict_buffer self.dict_file = dict_file def track(self, object_to_save): """ + Save data + + Parameters + ---------- + object_to_save : Bunch or Beam object """ if self.track_count % self.save_every == 0: if isinstance(object_to_save, Beam): @@ -134,25 +272,60 @@ class BunchMonitor(Monitor): class PhaseSpaceMonitor(Monitor): """ - Monitor a + Monitor a single bunch and save the full phase space. + + Parameters + ---------- + bunch_number : int + Bunch to monitor + mp_number : int or float + Number of macroparticle in the phase space to save. + file_name : string, optional + Name of the HDF5 where the data will be stored. Must be specified + the first time a subclass of Monitor is instancied and must be None + the following times. + save_every : int or float, optional + Set the frequency of the save. The data is saved every save_every + call of the montior. + buffer_size : int or float, optional + Size of the save buffer. + total_size : int or float, optional + Total size of the save. The following relationships between the + parameters must exist: + total_size % buffer_size == 0 + number of call to track / save_every == total_size + mpi_mode : bool, optional + If True, open the HDF5 file in parallel mode, which is needed to + allow several cores to write in the same file at the same time. + If False, open the HDF5 file in standard mode. + + Methods + ------- + track(object_to_save) + Save data """ - def __init__(self, bunch_number, mp_number, file_name=None, - save_every=1e4, buffer_size = 1, total_size = 100): + def __init__(self, bunch_number, mp_number, file_name=None, save_every=1e3, + buffer_size=10, total_size=100, mpi_mode=True): self.bunch_number = bunch_number self.mp_number = int(mp_number) group_name = "PhaseSpaceData_" + str(self.bunch_number) dict_buffer = {"particles":(self.mp_number, 6, buffer_size)} dict_file = {"particles":(self.mp_number, 6, total_size)} - self.monitor_init(group_name, save_every, total_size, - buffer_size, dict_buffer, dict_file, file_name) + self.monitor_init(group_name, save_every, buffer_size, total_size, + dict_buffer, dict_file, file_name, mpi_mode) self.dict_buffer = dict_buffer self.dict_file = dict_file def track(self, object_to_save): """ + Save data + + Parameters + ---------- + object_to_save : Bunch or Beam object """ if self.track_count % self.save_every == 0: @@ -167,3 +340,124 @@ class PhaseSpaceMonitor(Monitor): else: raise TypeError("object_to_save should be a Beam or Bunch object.") self.track_count += 1 + + +class BeamMonitor(Monitor): + """ + Monitor the full beam and save each bunch attributes (mean, std, emit and + current). + + Parameters + ---------- + file_name : string, optional + Name of the HDF5 where the data will be stored. Must be specified + the first time a subclass of Monitor is instancied and must be None + the following times. + save_every : int or float, optional + Set the frequency of the save. The data is saved every save_every + call of the montior. + buffer_size : int or float, optional + Size of the save buffer. + total_size : int or float, optional + Total size of the save. The following relationships between the + parameters must exist: + total_size % buffer_size == 0 + number of call to track / save_every == total_size + mpi_mode : bool, optional + If True, open the HDF5 file in parallel mode, which is needed to + allow several cores to write in the same file at the same time. + If False, open the HDF5 file in standard mode. + + Methods + ------- + track(beam) + Save data + """ + + def __init__(self, file_name=None, save_every=5, buffer_size=500, + total_size=2e4, mpi_mode=True): + + group_name = "Beam" + dict_buffer = {} + dict_file = {} + self.monitor_init(group_name, save_every, buffer_size, total_size, + dict_buffer, dict_file, file_name, mpi_mode) + + self.mean = np.zeros((6, self.buffer_size), dtype=float) + self.std = np.zeros((6, self.buffer_size), dtype=float) + self.emit = np.zeros((3, self.buffer_size), dtype=float) + self.current = np.zeros((self.buffer_size,), dtype=float) + + self.g.require_dataset("mean", (6, 416, self.total_size,), dtype=float) + self.g.require_dataset("std", (6, 416, self.total_size,), dtype=float) + self.g.require_dataset("emit", (3, 416, self.total_size,), dtype=float) + self.g.require_dataset("current", (416, self.total_size,), dtype=float) + + def track(self, beam): + """ + Save data + + Parameters + ---------- + beam : Beam object + """ + if self.track_count % self.save_every == 0: + if (beam.mpi_switch == True): + self.to_buffer(beam[beam.mpi.bunch_num], beam.mpi.bunch_num) + else: + raise NotImplementedError + + self.track_count += 1 + + def to_buffer(self, bunch, bunch_num): + """ + Save data to buffer. + + Parameters + ---------- + bunch : Bunch object + bunch_num : int + """ + + self.time[self.buffer_count] = self.track_count + self.mean[:, self.buffer_count] = bunch.mean + self.std[:, self.buffer_count] = bunch.std + self.emit[:, self.buffer_count] = bunch.emit + self.current[self.buffer_count] = bunch.current + + self.buffer_count += 1 + + if self.buffer_count == self.buffer_size: + self.write(bunch_num) + self.buffer_count = 0 + + def write(self, bunch_num): + """ + Write data from buffer to the HDF5 file. + + Parameters + ---------- + bunch_num : int + """ + self.file[self.group_name]["time"][self.write_count*self.buffer_size:( + self.write_count+1)*self.buffer_size] = self.time + + self.file[self.group_name]["mean"][:, bunch_num, + self.write_count*self.buffer_size:(self.write_count+1) * + self.buffer_size] = self.mean + + self.file[self.group_name]["std"][:, bunch_num, + self.write_count*self.buffer_size:(self.write_count+1) * + self.buffer_size] = self.std + + self.file[self.group_name]["emit"][:, bunch_num, + self.write_count*self.buffer_size:(self.write_count+1) * + self.buffer_size] = self.emit + + self.file[self.group_name]["current"][bunch_num, + self.write_count*self.buffer_size:(self.write_count+1) * + self.buffer_size] = self.current + + self.write_count += 1 + + \ No newline at end of file -- GitLab