Something went wrong on our end
-
Alexis GAMELIN authoredAlexis GAMELIN authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
parallel.py 5.70 KiB
# -*- coding: utf-8 -*-
"""
Module to handle parallel computation
@author: Alexis Gamelin
@date: 06/03/2020
"""
import numpy as np
class Mpi:
"""
Class which handle parallel computation via the mpi4py module [1].
Parameters
----------
filling_pattern : bool array of shape (h,)
Filling pattern of the beam, like Beam.filling_pattern
Attributes
----------
comm : MPI.Intracomm object
MPI intra-comminicator of the processor group, used to manage
communication between processors.
rank : int
Rank of the processor which run the program
size : int
Number of processor within the processor group (in fact in the
intra-comminicator group)
table : int array of shape (size, 2)
Table of correspondance between the rank of the processor and its
associated bunch number
bunch_num : int
Return the bunch number corresponding to the current processor
Methods
-------
write_table(filling_pattern)
Write a table with the rank and the corresponding bunch number for each
bunch of the filling pattern
rank_to_bunch(rank)
Return the bunch number corresponding to rank
bunch_to_rank(bunch_num)
Return the rank corresponding to the bunch number bunch_num
References
----------
[1] L. Dalcin, P. Kler, R. Paz, and A. Cosimo, Parallel Distributed
Computing using Python, Advances in Water Resources, 34(9):1124-1139, 2011.
"""
def __init__(self, filling_pattern):
from mpi4py import MPI
self.MPI = MPI
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.size = self.comm.Get_size()
self.write_table(filling_pattern)
def write_table(self, filling_pattern):
"""
Write a table with the rank and the corresponding bunch number for each
bunch of the filling pattern
Parameters
----------
filling_pattern : bool array of shape (h,)
Filling pattern of the beam, like Beam.filling_pattern
"""
if(filling_pattern.sum() != self.size):
raise ValueError("The number of processors must be equal to the"
"number of (non-empty) bunches.")
table = np.zeros((self.size, 2), dtype = int)
table[:,0] = np.arange(0, self.size)
table[:,1] = np.where(filling_pattern)[0]
self.table = table
def rank_to_bunch(self, rank):
"""
Return the bunch number corresponding to rank
Parameters
----------
rank : int
Rank of a processor
Returns
-------
bunch_num : int
Bunch number corresponding to the input rank
"""
return self.table[rank,1]
def bunch_to_rank(self, bunch_num):
"""
Return the rank corresponding to the bunch number bunch_num
Parameters
----------
bunch_num : int
Bunch number
Returns
-------
rank : int
Rank of the processor which tracks the input bunch number
"""
try:
rank = np.where(self.table[:,1] == bunch_num)[0][0]
except IndexError:
print("The bunch " + str(bunch_num) + " is not tracked on any processor.")
rank = None
return rank
@property
def bunch_num(self):
"""Return the bunch number corresponding to the current processor"""
return self.rank_to_bunch(self.rank)
@property
def next_bunch(self):
"""Return the rank of the next tracked bunch"""
if self.rank + 1 in self.table[:,0]:
return self.rank + 1
else:
return 0
@property
def previous_bunch(self):
"""Return the rank of the previous tracked bunch"""
if self.rank - 1 in self.table[:,0]:
return self.rank - 1
else:
return max(self.table[:,0])
def share_distributions(self, beam, dimensions="tau", n_bin=75):
"""
Compute the bunch profiles and share it between the different bunches.
Parameters
----------
beam : Beam object
dimension : str or list of str, optional
Dimensions in which the binning is done. The default is "tau".
n_bin : int or list of int, optional
Number of bins. The default is 75.
"""
if(beam.mpi_switch == False):
print("Error, mpi is not initialised.")
if isinstance(dimensions, str):
dimensions = [dimensions]
if isinstance(n_bin, int):
n_bin = np.ones((len(dimensions),), dtype=int)*n_bin
bunch = beam[self.bunch_num]
charge_per_mp_all = self.comm.allgather(bunch.charge_per_mp)
self.charge_per_mp_all = charge_per_mp_all
for i in range(len(dimensions)):
dim = dimensions[i]
n = n_bin[i]
bins, sorted_index, profile, center = bunch.binning(dimension=dim, n_bin=n)
self.__setattr__(dim + "_center", np.empty((len(beam), len(center)), dtype=np.float64))
self.comm.Allgather([center, self.MPI.DOUBLE], [self.__getattribute__(dim + "_center"), self.MPI.DOUBLE])
self.__setattr__(dim + "_profile", np.empty((len(beam), len(profile)), dtype=np.int64))
self.comm.Allgather([profile, self.MPI.INT64_T], [self.__getattribute__(dim + "_profile"), self.MPI.INT64_T])
self.__setattr__(dim + "_sorted_index", sorted_index)