Skip to content
Snippets Groups Projects
Select Git revision
  • a32e6f1f6dbb630e2636442ae51b3f5bc27c8c52
  • develop default protected
  • feature-ibs-suggestions-from-salah
  • syntax-typehints
  • feature-ExponentialDumper-bugfix
  • Resisitve_wall_eff_radius_yokoya
  • feature-feedback-IQ-damper0
  • 18-synchrotron-object-string-representation
  • stable protected
  • feature-particle-in-cell
  • feature-quad_wakes_LongRangeResistiveWall
  • feature-iqdamper0
  • feature-read_wakis
  • use-one-bin
  • RF-FBv0.6
  • RF-FBv0.5
  • faster_pytorch
  • RF-FB
  • util
  • RFBucket
  • Long-range_wakepotential
  • 0.9.0
  • 0.8.0
  • 0.7.0
  • 0.6.0
  • 0.5.0
  • 0.4
  • 0.3
  • 0.2
  • 0.1
30 results

test_particle.py

Blame
  • Alexis Gamelin's avatar
    Alexis GAMELIN authored
    Beam.update_filling_pattern and Beam.update_distance_between_bunches are now called directly in the __init__ when a bunch_list is given.
    Beam.init_bunch_list_mpi is a convenience method to initialize a beam using MPI parallelisation with a Bunch per core.
    Add tests for Beam.init_bunch_list_mpi
    38aa2719
    History
    test_particle.py 14.57 KiB
    import numpy as np
    import matplotlib.pyplot as plt
    import pytest
    pytestmark = pytest.mark.unit
    from scipy.constants import e, m_p, c
    from mbtrack2 import Particle, Bunch, Beam
    
    class TestParticle:
    
        # Accessing the E_rest property to calculate rest energy
        def test_access_E_rest_property(self):
            particle = Particle(mass=m_p, charge=e)
            expected_E_rest = m_p * c**2 / e
            assert particle.E_rest == expected_E_rest
            
    @pytest.fixture
    def generate_bunch(demo_ring):
        def generate(ring=demo_ring,
                     mp_number=1e3,
                     current=1e-3,
                     track_alive=True,
                     alive=True,
                     load_from_file=None,
                     load_suffix=None,
                     init_gaussian=True,
                     ):
            bunch = Bunch(ring=ring,
                        mp_number=mp_number,
                        current=current,
                        track_alive=track_alive,
                        alive=alive,
                        load_from_file=load_from_file,
                        load_suffix=load_suffix)
            if init_gaussian:
                bunch.init_gaussian()
            return bunch
        return generate
    
    @pytest.fixture
    def small_bunch(generate_bunch):
        return generate_bunch(mp_number=10)
    
    @pytest.fixture
    def large_bunch(generate_bunch):
        return generate_bunch(mp_number=1e5)
    
    class TestBunch:
    
        # Calculate and verify the mean, std, skew, and kurtosis of particle positions
        def test_statistics_single_mp(self, generate_bunch):
            bunch = generate_bunch(mp_number=1)
            assert len(bunch.mean) == 6
            assert len(bunch.std) == 6
            assert len(bunch.skew) == 6
            assert len(bunch.kurtosis) == 6
            assert len(bunch.emit) == 3
            assert len(bunch.cs_invariant) == 3
            assert np.any(np.isnan(bunch.mean)) == False
            assert np.any(np.isnan(bunch.std)) == False
            assert np.any(np.isnan(bunch.skew)) == False
            assert np.any(np.isnan(bunch.kurtosis)) == False
            assert np.any(np.isnan(bunch.emit)) == False
            assert np.any(np.isnan(bunch.cs_invariant)) == False
    
        # Initialize a Bunch with zero macro-particles and verify behavior
        def test_initialize_zero_macro_particles(self, generate_bunch):
            bunch = generate_bunch(mp_number=0)
            assert bunch.mp_number == 0
            assert bunch.is_empty == True
            
        def test_drop_zero_macro_particles(self, generate_bunch):
            bunch = generate_bunch(mp_number=1)
            bunch.alive[0] = False
            assert len(bunch) == 0
            assert bunch.is_empty == True
            assert pytest.approx(bunch.charge) == 0
    
        # Verify the behavior of init_gaussian with custom covariance and mean
        def test_init_gaussian_custom_cov_mean(self, large_bunch):
        
            custom_cov = np.eye(6) * 0.5
            custom_mean = np.ones(6) * 2.0
        
            large_bunch.init_gaussian(cov=custom_cov, mean=custom_mean)
        
            assert np.allclose(large_bunch.mean, custom_mean, atol=1e-1)
            assert np.allclose(np.cov(large_bunch.particles['x'], large_bunch.particles['xp']), custom_cov[:2, :2], atol=1e-1)
    
        def test_bunch_values(self, small_bunch, demo_ring):
            mp_number = small_bunch.mp_number
            current = small_bunch.current
    
            assert len(small_bunch) == mp_number
            np.testing.assert_allclose(small_bunch.alive, np.ones((mp_number,), dtype=bool))
            assert pytest.approx(small_bunch.charge) == current * demo_ring.T0
            assert pytest.approx(small_bunch.charge_per_mp) == current * demo_ring.T0 / mp_number
            assert pytest.approx(small_bunch.particle_number) == current * demo_ring.T0 / e
            assert small_bunch.is_empty == False
    
        def test_bunch_magic(self, generate_bunch):
            mybunch = generate_bunch(init_gaussian=False)
            for label in mybunch:
                np.testing.assert_allclose(mybunch[label], np.zeros(len(mybunch)))
                mybunch[label] = np.ones(len(mybunch))
                np.testing.assert_allclose(mybunch[label], np.ones(len(mybunch)))
    
        def test_bunch_losses(self, small_bunch):
            charge_init = small_bunch.charge
            small_bunch.alive[0] = False
            assert len(small_bunch) == small_bunch.mp_number - 1
            assert pytest.approx(small_bunch.charge) == charge_init * len(small_bunch) / small_bunch.mp_number
    
        def test_bunch_init_gauss(self, large_bunch):
            large_bunch.init_gaussian(mean=np.ones((6,)))
            np.testing.assert_allclose(large_bunch.mean, np.ones((6,)), rtol=1e-2)
    
        def test_bunch_save_load(self, small_bunch, generate_bunch, tmp_path):
            small_bunch["x"] += 1
            small_bunch.save(str(tmp_path / "test"))
        
            mybunch2 = generate_bunch(mp_number=1, current=1e-5)
            mybunch2.load(str(tmp_path / "test.hdf5"))
        
            assert small_bunch.mp_number == mybunch2.mp_number
            assert pytest.approx(small_bunch.charge) == mybunch2.charge
            for label in small_bunch:
                np.testing.assert_allclose(small_bunch[label], mybunch2[label])
    
        def test_bunch_stats(self, demo_ring, large_bunch):
            np.testing.assert_array_almost_equal(large_bunch.mean, np.zeros((6,)), decimal=5)
            sig = np.concatenate((demo_ring.sigma(), [demo_ring.sigma_0, demo_ring.sigma_delta]))
            np.testing.assert_allclose(large_bunch.std, sig, rtol=1e-2)
            np.testing.assert_allclose(large_bunch.emit[:2], demo_ring.emit, rtol=1e-2)
            np.testing.assert_allclose(large_bunch.cs_invariant[:2], demo_ring.emit*2, rtol=1e-2)
    
        @pytest.mark.parametrize('n_bin',
                                 [(75),
                                  (1),
                                  (2)]
            )
        def test_bunch_binning(self, small_bunch, n_bin):
            (bins, sorted_index, profile, center) = small_bunch.binning(n_bin=n_bin)
            profile0 = np.zeros((len(bins)-1,))
            for i, val in enumerate(sorted_index):
                assert bins[val] <= small_bunch["tau"][i] <= bins[val+1]
                profile0[val] += 1
            np.testing.assert_allclose(profile0, profile)
    
        def test_bunch_plots(self, small_bunch):
            small_bunch.plot_phasespace()
            small_bunch.plot_profile()
            assert True
    
        def test_bunch_emittance(self, generate_bunch, demo_ring):
            mp_number = 1_000_000
            mybunch = generate_bunch(mp_number=mp_number, track_alive=False)
            
            np.testing.assert_allclose(mybunch.emit[0], demo_ring.emit[0], rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {demo_ring.emit[0]} initialised, {mybunch.emit[0]:} calculated')
            np.testing.assert_allclose(mybunch.emit[1], demo_ring.emit[1], rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {demo_ring.emit[1]} initialised, {mybunch.emit[1]:} calculated')
        
            np.testing.assert_allclose(mybunch.emit[0], mybunch.cs_invariant[0]/2, rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {mybunch.cs_invariant[0]/2} calculated with optics functions, {mybunch.emit[0]:} calculated with coordinates only')
            np.testing.assert_allclose(mybunch.emit[1], mybunch.cs_invariant[1]/2, rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {mybunch.cs_invariant[1]/2} calculated with optics functions, {mybunch.emit[1]:} calculated with coordinates only')
        
        def test_bunch_emittance_with_dispersion(self, generate_bunch, demo_ring):
            mp_number = 1_000_000
            mybunch = generate_bunch(mp_number=mp_number, track_alive=False)
    
            np.testing.assert_allclose(mybunch.emit[0], demo_ring.emit[0], rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {demo_ring.emit[0]} initialised, {mybunch.emit[0]:} calculated')
            np.testing.assert_allclose(mybunch.emit[1], demo_ring.emit[1], rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {demo_ring.emit[1]} initialised, {mybunch.emit[1]:} calculated')
        
            np.testing.assert_allclose(mybunch.emit[0], mybunch.cs_invariant[0]/2, rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {mybunch.cs_invariant[0]/2} calculated with optics functions, {mybunch.emit[0]:} calculated with coordinates only')
            np.testing.assert_allclose(mybunch.emit[1], mybunch.cs_invariant[1]/2, rtol=1e-2, atol=0,
             err_msg=f'Emittances do not match. {mybunch.cs_invariant[1]/2} calculated with optics functions, {mybunch.emit[1]:} calculated with coordinates only')
    
    
    @pytest.fixture
    def generate_beam(demo_ring, generate_bunch):
        def generate(ring=demo_ring,
                     filling_pattern=None, 
                     current_per_bunch=1e-3, 
                     mp_per_bunch=10,
                     track_alive=True,
                     mpi=False):
    
            beam = Beam(ring=ring)
            if filling_pattern is None:
                filling_pattern = np.ones((ring.h,), dtype=bool)
            beam.init_beam(filling_pattern=filling_pattern, 
                           current_per_bunch=current_per_bunch, 
                           mp_per_bunch=mp_per_bunch,
                           track_alive=track_alive,
                           mpi=mpi)
    
            return beam
        return generate
    
    @pytest.fixture
    def beam_uniform(generate_beam):
        return generate_beam()
    
    @pytest.fixture
    def beam_non_uniform(generate_beam, demo_ring):
        filling_pattern = np.ones((demo_ring.h,), dtype=bool)
        filling_pattern[4] = False
        filling_pattern[-3:] = False
        return generate_beam(filling_pattern=filling_pattern)
    
    @pytest.fixture
    def beam_1bunch_mpi(generate_beam, demo_ring):
        filling_pattern = np.zeros((demo_ring.h,), dtype=bool)
        filling_pattern[0] = True
        return generate_beam(filling_pattern=filling_pattern, mpi=True)
    
    class TestBeam:
    
        @pytest.mark.parametrize("n_bunch", [(1),(5),(10),(20)])
        def test_initialize_beam(self, generate_beam, demo_ring, n_bunch):
            filling_pattern = np.zeros((demo_ring.h,), dtype=bool)
            filling_pattern[0:n_bunch] = True
            beam = generate_beam(filling_pattern=filling_pattern)
            assert len(beam) == n_bunch
    
        @pytest.mark.parametrize("n_bunch", [(1),(5),(10),(20)])
        def test_calculate_total_beam_properties(self, generate_beam, demo_ring, n_bunch):
            filling_pattern = np.zeros((demo_ring.h,), dtype=bool)
            filling_pattern[0:n_bunch] = True
            beam = generate_beam(filling_pattern=filling_pattern, current_per_bunch=0.002)
            assert beam.current == pytest.approx(0.002 * n_bunch)
            assert beam.charge == pytest.approx(np.sum([bunch.charge for bunch in beam]))
            assert beam.particle_number == pytest.approx(np.sum([bunch.particle_number for bunch in beam]))
    
        def test_save_and_load_beam_data(self, tmp_path, beam_uniform, demo_ring):
            file_path = tmp_path / "beam_data"
            beam_uniform.save(str(file_path))
            loaded_beam = Beam(demo_ring)
            loaded_beam.load(str(file_path) + ".hdf5", mpi=False)
            assert np.array_equal(beam_uniform.bunch_current, loaded_beam.bunch_current)
            assert np.array_equal(beam_uniform.bunch_mean, loaded_beam.bunch_mean)
            assert np.array_equal(beam_uniform.bunch_std, loaded_beam.bunch_std)
    
        @pytest.mark.parametrize("var,option",
                                 [("bunch_current", None),
                                  ("bunch_charge", None),
                                  ("bunch_particle", None),
                                  ("bunch_mean","x"),
                                  ("bunch_std","x"),
                                  ("bunch_emit","x")])
        def test_plot_variables_with_respect_to_bunch_number(self, beam_uniform, var, option):
            fig = beam_uniform.plot(var, option)
            assert fig is not None
            plt.close("all")
    
        def test_initialize_beam_mismatched_bunch_list_length(self, demo_ring, generate_bunch):
            mismatched_bunch_list = [generate_bunch() for _ in range(demo_ring.h - 1)]
            with pytest.raises(ValueError):
                Beam(demo_ring, mismatched_bunch_list)
    
        def test_filling_pattern_and_distance_between_bunches(self, generate_beam, demo_ring):
            filling_pattern = np.ones((demo_ring.h,), dtype=bool)
            filling_pattern[5] = False
            filling_pattern[8:10] = False
            beam = generate_beam(filling_pattern=filling_pattern)
            np.testing.assert_array_equal(beam.filling_pattern, filling_pattern)
            expected_distances = np.ones((demo_ring.h,))
            expected_distances[4] = 2
            expected_distances[5] = 0
            expected_distances[7] = 3
            expected_distances[8:10] = 0
            np.testing.assert_array_equal(beam.distance_between_bunches, expected_distances)
    
        def test_update_filling_pattern_and_distance_between_bunches(self, beam_uniform, demo_ring):
            for i in [5,8,9]:
                beam_uniform[i].charge = 0
            beam_uniform[5].charge = 0
            beam_uniform.update_filling_pattern()
            beam_uniform.update_distance_between_bunches()
    
            expected_filling_pattern = np.ones((demo_ring.h,), dtype=bool)
            expected_filling_pattern[5] = False
            expected_filling_pattern[8:10] = False
            np.testing.assert_array_equal(beam_uniform.filling_pattern, expected_filling_pattern)
    
            expected_distances = np.ones((demo_ring.h,))
            expected_distances[4] = 2
            expected_distances[5] = 0
            expected_distances[7] = 3
            expected_distances[8:10] = 0
            np.testing.assert_array_equal(beam_uniform.distance_between_bunches, expected_distances)
    
        def test_mpi_gather_and_close_consistency(self, mocker, demo_ring, generate_bunch):
            mock_mpi = mocker.patch('mbtrack2.tracking.parallel.Mpi')
            mock_mpi_instance = mock_mpi.return_value
            mock_mpi_instance.comm.allgather.return_value = [generate_bunch() for _ in range(demo_ring.h)]
            beam = Beam(ring=demo_ring)
            beam.mpi_init()
            beam.mpi_gather()
            assert mock_mpi_instance.comm.allgather.called
            beam.mpi_close()
            assert not beam.mpi_switch
            assert beam.mpi is None
            
        def test_init_bunch_list(self, demo_ring):
            filling_pattern = np.ones((demo_ring.h,), dtype=bool)
            filling_pattern[5] = False
            filling_pattern[8:10] = False
            bunch_list = [Bunch(demo_ring, mp_number=1, alive=filling_pattern[i]) for i in range(demo_ring.h)]
            beam = Beam(demo_ring, bunch_list)
            assert len(beam) == filling_pattern.sum()
            np.testing.assert_array_equal(beam.filling_pattern, filling_pattern)
            assert beam.distance_between_bunches is not None
            
        def test_init_bunch_list_mpi(self, demo_ring, generate_bunch):
            filling_pattern = np.zeros((demo_ring.h,), dtype=bool)
            filling_pattern[0] = True
            beam = Beam(demo_ring)
            beam.init_bunch_list_mpi(generate_bunch(), filling_pattern)
            
            assert len(beam) == 1
            np.testing.assert_array_equal(beam.filling_pattern, filling_pattern)
            assert beam.distance_between_bunches[0] == demo_ring.h