Something went wrong on our end
-
Alexis GAMELIN authored
Check that the bunch is not empty before binning in Mpi.share_distributions. If it is then the beam filling pattern is updated and empty array are sent. In CavityResonator.track, the tracking without mpi is modified to take into account the case with an empty bunch. Mpi case is fine as the beam filling pattern is updated in the call for Mpi.share_distributions. In WakePotential.track, if the bunch is empty tracking does nothing.
Alexis GAMELIN authoredCheck that the bunch is not empty before binning in Mpi.share_distributions. If it is then the beam filling pattern is updated and empty array are sent. In CavityResonator.track, the tracking without mpi is modified to take into account the case with an empty bunch. Mpi case is fine as the beam filling pattern is updated in the call for Mpi.share_distributions. In WakePotential.track, if the bunch is empty tracking does nothing.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
parallel.py 6.08 KiB
# -*- coding: utf-8 -*-
"""
Module to handle parallel computation
@author: Alexis Gamelin
@date: 06/03/2020
"""
import numpy as np
class Mpi:
"""
Class which handle parallel computation via the mpi4py module [1].
Parameters
----------
filling_pattern : bool array of shape (h,)
Filling pattern of the beam, like Beam.filling_pattern
Attributes
----------
comm : MPI.Intracomm object
MPI intra-comminicator of the processor group, used to manage
communication between processors.
rank : int
Rank of the processor which run the program
size : int
Number of processor within the processor group (in fact in the
intra-comminicator group)
table : int array of shape (size, 2)
Table of correspondance between the rank of the processor and its
associated bunch number
bunch_num : int
Return the bunch number corresponding to the current processor
Methods
-------
write_table(filling_pattern)
Write a table with the rank and the corresponding bunch number for each
bunch of the filling pattern
rank_to_bunch(rank)
Return the bunch number corresponding to rank
bunch_to_rank(bunch_num)
Return the rank corresponding to the bunch number bunch_num
References
----------
[1] L. Dalcin, P. Kler, R. Paz, and A. Cosimo, Parallel Distributed
Computing using Python, Advances in Water Resources, 34(9):1124-1139, 2011.
"""
def __init__(self, filling_pattern):
from mpi4py import MPI
self.MPI = MPI
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.size = self.comm.Get_size()
self.write_table(filling_pattern)
def write_table(self, filling_pattern):
"""
Write a table with the rank and the corresponding bunch number for each
bunch of the filling pattern
Parameters
----------
filling_pattern : bool array of shape (h,)
Filling pattern of the beam, like Beam.filling_pattern
"""
if(filling_pattern.sum() != self.size):
raise ValueError("The number of processors must be equal to the"
"number of (non-empty) bunches.")
table = np.zeros((self.size, 2), dtype = int)
table[:,0] = np.arange(0, self.size)
table[:,1] = np.where(filling_pattern)[0]
self.table = table
def rank_to_bunch(self, rank):
"""
Return the bunch number corresponding to rank
Parameters
----------
rank : int
Rank of a processor
Returns
-------
bunch_num : int
Bunch number corresponding to the input rank
"""
return self.table[rank,1]
def bunch_to_rank(self, bunch_num):
"""
Return the rank corresponding to the bunch number bunch_num
Parameters
----------
bunch_num : int
Bunch number
Returns
-------
rank : int
Rank of the processor which tracks the input bunch number
"""
try:
rank = np.where(self.table[:,1] == bunch_num)[0][0]
except IndexError:
print("The bunch " + str(bunch_num) + " is not tracked on any processor.")
rank = None
return rank
@property
def bunch_num(self):
"""Return the bunch number corresponding to the current processor"""
return self.rank_to_bunch(self.rank)
@property
def next_bunch(self):
"""Return the rank of the next tracked bunch"""
if self.rank + 1 in self.table[:,0]:
return self.rank + 1
else:
return 0
@property
def previous_bunch(self):
"""Return the rank of the previous tracked bunch"""
if self.rank - 1 in self.table[:,0]:
return self.rank - 1
else:
return max(self.table[:,0])
def share_distributions(self, beam, dimensions="tau", n_bin=75):
"""
Compute the bunch profiles and share it between the different bunches.
Parameters
----------
beam : Beam object
dimension : str or list of str, optional
Dimensions in which the binning is done. The default is "tau".
n_bin : int or list of int, optional
Number of bins. The default is 75.
"""
if(beam.mpi_switch == False):
print("Error, mpi is not initialised.")
if isinstance(dimensions, str):
dimensions = [dimensions]
if isinstance(n_bin, int):
n_bin = np.ones((len(dimensions),), dtype=int)*n_bin
bunch = beam[self.bunch_num]
charge_per_mp_all = self.comm.allgather(bunch.charge_per_mp)
self.charge_per_mp_all = charge_per_mp_all
for i in range(len(dimensions)):
dim = dimensions[i]
n = n_bin[i]
if len(bunch) != 0:
bins, sorted_index, profile, center = bunch.binning(dimension=dim, n_bin=n)
else:
sorted_index = None
profile = np.zeros((n-1,),dtype=np.int64)
center = np.zeros((n-1,),dtype=np.float64)
if beam.filling_pattern[self.bunch_num] is True:
beam.update_filling_pattern()
beam.update_distance_between_bunches()
self.__setattr__(dim + "_center", np.empty((self.size, n-1), dtype=np.float64))
self.comm.Allgather([center, self.MPI.DOUBLE], [self.__getattribute__(dim + "_center"), self.MPI.DOUBLE])
self.__setattr__(dim + "_profile", np.empty((self.size, n-1), dtype=np.int64))
self.comm.Allgather([profile, self.MPI.INT64_T], [self.__getattribute__(dim + "_profile"), self.MPI.INT64_T])
self.__setattr__(dim + "_sorted_index", sorted_index)