Skip to content
Snippets Groups Projects
Select Git revision
  • e792f6483641a4dbc50a6d976c25bddb72ba0b31
  • develop default protected
  • feature-ibs-suggestions-from-salah
  • syntax-typehints
  • feature-ExponentialDumper-bugfix
  • Resisitve_wall_eff_radius_yokoya
  • feature-feedback-IQ-damper0
  • 18-synchrotron-object-string-representation
  • stable protected
  • feature-particle-in-cell
  • feature-quad_wakes_LongRangeResistiveWall
  • feature-iqdamper0
  • feature-read_wakis
  • use-one-bin
  • RF-FBv0.6
  • RF-FBv0.5
  • faster_pytorch
  • RF-FB
  • util
  • RFBucket
  • Long-range_wakepotential
  • 0.9.0
  • 0.8.0
  • 0.7.0
  • 0.6.0
  • 0.5.0
  • 0.4
  • 0.3
  • 0.2
  • 0.1
30 results

test_rf.py

Blame
  • test_rf.py 19.26 KiB
    import pytest
    pytestmark = pytest.mark.unit
    import numpy as np
    from utility_test_functions import assert_attr_changed
    from mbtrack2 import (RFCavity, 
                          CavityResonator, 
                          ProportionalLoop, 
                          ProportionalIntegralLoop,
                          TunerLoop,
                          DirectFeedback)
    
    class TestRFCavity:
    
        # Initialize RFCavity with valid parameters and verify attributes are changed when tracked
        def test_rf_cavity(self, demo_ring, small_bunch):
            cavity = RFCavity(ring=demo_ring, m=1, Vc=1e6, theta=np.pi/4)
            assert_attr_changed(cavity, small_bunch, attrs_changed=["delta"])
    
    @pytest.fixture
    def cav_res(demo_ring):
        cav_res = CavityResonator(demo_ring, m=1, Rs=1e6, Q=1e4, QL=1e3, detune=-20e3, Ncav=4)
        return cav_res
    
    @pytest.fixture
    def cav_res_tracking(demo_ring, cav_res):
        cav_res.Vc = 1e6
        cav_res.theta = np.arccos(demo_ring.U0/cav_res.Vc)
        cav_res.set_generator(0.5)   
        return cav_res
    
    class TestCavityResonator:
    
        @pytest.mark.parametrize("n_bin", [(75), (1)])
        def test_track_mpi(self, cav_res_tracking, beam_1bunch_mpi, n_bin):
            cav_res_tracking.n_bin = n_bin
            initial_phasor = cav_res_tracking.beam_phasor
            initial_beam_phasor_record = cav_res_tracking.beam_phasor_record.copy()
    
            beam_1bunch_mpi.mpi.share_distributions(beam_1bunch_mpi, n_bin=n_bin)
            assert_attr_changed(cav_res_tracking, beam_1bunch_mpi,attrs_changed=["delta"])
            
            assert not np.array_equal(cav_res_tracking.beam_phasor, initial_phasor)
            assert not np.array_equal(cav_res_tracking.beam_phasor_record, initial_beam_phasor_record)
    
        @pytest.mark.parametrize("n_bin", [(75), (1)])
        def test_track_no_mpi(self, cav_res_tracking, beam_uniform, n_bin):
            cav_res_tracking.n_bin = n_bin
            initial_phasor = cav_res_tracking.beam_phasor
            initial_beam_phasor_record = cav_res_tracking.beam_phasor_record.copy()
            
            assert_attr_changed(cav_res_tracking, beam_uniform, attrs_changed=["delta"])
    
            assert not np.array_equal(cav_res_tracking.beam_phasor, initial_phasor)
            assert not np.array_equal(cav_res_tracking.beam_phasor_record, initial_beam_phasor_record)
            
        @pytest.mark.parametrize("n_bin", [(75), (1)])
        def test_track_no_mpi_non_uniform(self, cav_res_tracking, beam_non_uniform, n_bin):
            cav_res_tracking.n_bin = n_bin
            initial_phasor = cav_res_tracking.beam_phasor
            initial_beam_phasor_record = cav_res_tracking.beam_phasor_record.copy()
            
            assert_attr_changed(cav_res_tracking, beam_non_uniform, attrs_changed=["delta"])
    
            assert not np.array_equal(cav_res_tracking.beam_phasor, initial_phasor)
            assert not np.array_equal(cav_res_tracking.beam_phasor_record, initial_beam_phasor_record)
            
        # Track a beam with zero charge per macro-particle
        def test_track_zero_charge_per_mp(self, cav_res_tracking, generate_beam):
            beam = generate_beam(current_per_bunch=0)
            assert_attr_changed(cav_res_tracking, beam, change=False)
            
        # Apply RF feedback loops correctly during tracking
        def test_apply_rf_feedback(self, cav_res_tracking, beam_uniform, mocker):
            cav_res_tracking.feedback.append(mocker.Mock())
            cav_res_tracking.track(beam_uniform)
            cav_res_tracking.feedback[0].track.assert_called_once(), "RF feedback should be applied"
        
        @pytest.mark.parametrize("ref_frame", [("beam"), ("rf")])
        def test_phasor_decay(self, cav_res, ref_frame):
            init_phasor = 1 + 1j
            cav_res.beam_phasor = init_phasor
            cav_res.phasor_decay(10e-6, ref_frame)
            assert cav_res.beam_phasor != init_phasor
            
        @pytest.mark.parametrize("ref_frame", [("beam"), ("rf")])
        def test_phasor_evol(self, cav_res, ref_frame):
            init_phasor = 1 + 1j
            cav_res.beam_phasor = init_phasor
            profile = np.random.randint(1000, size=(10,))
            bin_length = 1e-12
            charge_per_mp = 1e-15
            cav_res.phasor_evol(profile, bin_length, charge_per_mp, ref_frame)
            assert cav_res.beam_phasor != init_phasor
            
        def test_phasor_init(self, cav_res, beam_non_uniform):
            init_phasor = cav_res.beam_phasor
            cav_res.init_phasor_track(beam_non_uniform)
            phasor_init_phasor_track = cav_res.beam_phasor
            
            cav_res.beam_phasor = init_phasor
            cav_res.init_phasor(beam_non_uniform)
            phasor_init_phasor = cav_res.beam_phasor
            
            assert phasor_init_phasor != init_phasor
            assert phasor_init_phasor_track != init_phasor
            assert np.allclose(phasor_init_phasor, phasor_init_phasor_track, rtol=1e-2)
            
        def test_phasor_init_mpi(self, cav_res, beam_1bunch_mpi):
            init_phasor = cav_res.beam_phasor
            cav_res.init_phasor_track(beam_1bunch_mpi)
            phasor_init_phasor_track = cav_res.beam_phasor
            
            cav_res.beam_phasor = init_phasor
            cav_res.init_phasor(beam_1bunch_mpi)
            phasor_init_phasor = cav_res.beam_phasor
            
            assert phasor_init_phasor != init_phasor
            assert phasor_init_phasor_track != init_phasor
            assert np.allclose(phasor_init_phasor, phasor_init_phasor_track, rtol=1e-2)
            
        # Setting detune updates _detune, _fr, _wr, and _psi correctly
        def test_detune(self, cav_res):
            detune_value = 1000
            cav_res.detune = detune_value
            assert cav_res._detune == detune_value
            assert cav_res._fr == detune_value + cav_res.m * cav_res.ring.f1
            assert cav_res._wr == cav_res._fr * 2 * np.pi
            assert cav_res._psi == np.arctan(cav_res.QL * (cav_res._fr / (cav_res.m * cav_res.ring.f1) - (cav_res.m * cav_res.ring.f1) / cav_res._fr))
    
        # update_feedback is called after setting detune
        def test_update_feedback_called(self, mocker, cav_res):
            mocker.spy(cav_res, 'update_feedback')
            cav_res.detune = 1000
            cav_res.update_feedback.assert_called_once()        
    
        def test_plot_phasor_diagram(self, cav_res_tracking):
            fig = cav_res_tracking.plot_phasor(0.5)
            assert fig is not None
            
        # Calculating generator phasor using Vg and theta_g
        def test_generator_phasor_calculation(self, cav_res):
            cav_res.Vg = 1000
            cav_res.theta_g = np.pi / 4
            expected_phasor = 1000 * np.exp(1j * np.pi / 4)
            assert cav_res.generator_phasor == expected_phasor
    
        # Computing cavity phasor by summing generator and beam phasors
        def test_cavity_phasor_computation(self, cav_res):
            cav_res.Vg = 1000
            cav_res.theta_g = np.pi / 4
            cav_res.beam_phasor = 500 * np.exp(1j * np.pi / 6)
            expected_phasor = cav_res.generator_phasor + cav_res.beam_phasor
            assert cav_res.cavity_phasor == expected_phasor
    
        # Retrieving cavity phasor record for each bunch
        def test_cavity_phasor_record_retrieval(self, cav_res):
            cav_res.generator_phasor_record = np.array([1+1j, 2+2j])
            cav_res.beam_phasor_record = np.array([0.5+0.5j, 1+1j])
            expected_record = np.array([1.5+1.5j, 3+3j])
            assert np.allclose(cav_res.cavity_phasor_record, expected_record)
    
        # Accessing ig phasor record when feedback is present
        def test_ig_phasor_record_with_feedback(self, cav_res):
            pi_loop = ProportionalIntegralLoop(cav_res.ring, cav_res, [1.0, 1.0], 10, 10, 10)
            pi_loop.ig_phasor_record = np.array([1+1j, 2+2j])
            cav_res.feedback.append(pi_loop)
            assert np.allclose(cav_res.ig_phasor_record, pi_loop.ig_phasor_record)
    
        # Determining cavity voltage and phase from cavity phasor
        def test_cavity_voltage_and_phase_determination(self, cav_res):
            cav_res.Vg = 1000
            cav_res.theta_g = np.pi / 4
            cav_res.beam_phasor = 500 * np.exp(1j * np.pi / 6)
            expected_voltage = np.abs(cav_res.cavity_phasor)
            expected_phase = np.angle(cav_res.cavity_phasor)
            assert cav_res.cavity_voltage == expected_voltage
            assert cav_res.cavity_phase == expected_phase
    
        # Calculating beam voltage and phase from beam phasor
        def test_beam_voltage_and_phase_calculation(self, cav_res):
            cav_res.beam_phasor = 500 * np.exp(1j * np.pi / 6)
            expected_voltage = np.abs(cav_res.beam_phasor)
            expected_phase = np.angle(cav_res.beam_phasor)
            assert cav_res.beam_voltage == expected_voltage
            assert cav_res.beam_phase == expected_phase
    
        # Handling feedback list with no ProportionalIntegralLoop or DirectFeedback
        def test_ig_phasor_record_no_feedback(self, cav_res):
            cav_res.feedback = []
            expected_record = np.zeros(cav_res.ring.h)
            assert np.allclose(cav_res.ig_phasor_record, expected_record)
    
        # Calculating loaded shunt impedance RL
        def test_loaded_shunt_impedance_rl(self, cav_res):
            # Set up the initial conditions
            cav_res.Rs = 1e6  # Shunt impedance
            cav_res.QL = 1e3  # Loaded quality factor
            cav_res.Q = 1e4   # Quality factor
    
            # Calculate expected RL
            expected_RL = cav_res.Rs / (1 + (cav_res.Q / cav_res.QL - 1))
    
            # Assert that the calculated RL matches the expected value
            assert cav_res.RL == expected_RL
    
        # Set optimal coupling and detuning and verify changes in attributes
        def test_set_optimal_coupling_and_detuning(self, demo_ring, cav_res):
            initial_beta = cav_res.beta
            initial_psi = cav_res.psi
            cav_res.set_optimal_coupling(0.5)
            cav_res.set_optimal_detune(0.5)
            assert cav_res.beta != initial_beta
            assert cav_res.psi != initial_psi
        
        # Verify that is_CBI_stable correctly calculates growth rate and mode number
        def test_calculate_growth_rate_and_mode(self, cav_res_tracking):
            growth_rate, mu = cav_res_tracking.is_CBI_stable(I0=0.5)
            assert isinstance(growth_rate, float)
            assert np.issubdtype(type(mu), np.integer)
            
        # Ensure is_CBI_stable returns growth rates for specified modes when modes is provided
        def test_return_growth_rates_for_specified_modes(self, cav_res_tracking):
            modes = [0, 1, 2]
            growth_rates = cav_res_tracking.is_CBI_stable(I0=0.5, modes=modes)
            assert len(growth_rates) == len(modes)
            
        def test_is_DC_Robinson_stable(self, cav_res_tracking):
            assert isinstance(cav_res_tracking.is_DC_Robinson_stable(0.5), (bool,np.bool_))
            
        def test_plot_DC_Robinson_stability(self, cav_res_tracking):
            fig = cav_res_tracking.plot_DC_Robinson_stability()
            assert fig is not None
            
        @pytest.mark.parametrize("z,expected_type",
                                 [(1e-3, float),
                                  (np.linspace(-1e-3, 1e-3, 100), np.ndarray)])
        def test_VRF(self, cav_res_tracking, z, expected_type):
            assert isinstance(cav_res_tracking.VRF(z, 0.5), expected_type)
            assert isinstance(cav_res_tracking.dVRF(z, 0.5), expected_type)
            assert isinstance(cav_res_tracking.ddVRF(z, 0.5), expected_type)
            assert isinstance(cav_res_tracking.deltaVRF(z, 0.5), expected_type)
            
        def test_sample_voltage_default_parameters(self, cav_res_tracking, beam_uniform):
            cav_res_tracking.init_tracking(beam_uniform)
            n_points=1e4
            beam_phasor_init = 1 + 1j
            cav_res_tracking.beam_phasor = beam_phasor_init
            pos, voltage_rec = cav_res_tracking.sample_voltage(n_points=n_points)
            assert len(pos) == n_points
            assert len(voltage_rec) == n_points
            assert np.allclose(cav_res_tracking.beam_phasor, beam_phasor_init)
            
        def test_set_generator(self, cav_res, demo_ring):
            cav_res.Vc = 1e6
            cav_res.theta = np.arccos(demo_ring.U0/cav_res.Vc)
            cav_res.set_generator(0.5)
            assert cav_res.Pg is not None
            assert cav_res.Vgr is not None
            assert cav_res.theta_gr is not None
            assert cav_res.Vg is not None
            assert cav_res.theta_g is not None
            assert cav_res.generator_phasor_record is not None
            
        def test_functions(self, cav_res_tracking):
            assert isinstance(cav_res_tracking.Pb(0.5), float)
            assert isinstance(cav_res_tracking.Pr(0.5), float)
            assert isinstance(cav_res_tracking.Vbr(0.5), float)
            assert isinstance(cav_res_tracking.Vb(0.5), float)
            assert isinstance(cav_res_tracking.Z(1e9), complex)
            
        def test_properties(self, cav_res_tracking):
            assert cav_res_tracking.Pc > 0
            assert cav_res_tracking.filling_time > 0
            assert cav_res_tracking.loss_factor > 0
            
    class TestProportionalLoop:
        
        @pytest.fixture
        def prop_loop(self, demo_ring, cav_res_tracking):
            return ProportionalLoop(demo_ring, cav_res_tracking, gain_A=0.5, gain_P=0.5, delay=2)
    
        # track method updates cav_res.Vg and cav_res.theta_g correctly
        def test_track_delay(self, prop_loop):
            initial_Vg = prop_loop.cav_res.Vg
            initial_theta_g = prop_loop.cav_res.theta_g
            prop_loop.track()
            assert prop_loop.cav_res.Vg == initial_Vg
            assert prop_loop.cav_res.theta_g == initial_theta_g
            
        # track method updates cav_res.Vg and cav_res.theta_g correctly
        def test_track_update(self, prop_loop):
            initial_Vg = prop_loop.cav_res.Vg
            initial_theta_g = prop_loop.cav_res.theta_g
            prop_loop.track()
            prop_loop.track() # delay=2 is 3 turns to take action
            prop_loop.track()
            assert prop_loop.cav_res.Vg != initial_Vg
            assert prop_loop.cav_res.theta_g != initial_theta_g
    
        # Initialization with delay less than 1 raises ValueError
        def test_initialization_delay_less_than_one(self, demo_ring, cav_res):
            with pytest.raises(ValueError):
                ProportionalLoop(demo_ring, cav_res, gain_A=0.5, gain_P=0.5, delay=0)
    
        # Initialization with non-integer delay converts it to integer
        def test_initialization_non_integer_delay(self, demo_ring, cav_res):
            loop = ProportionalLoop(demo_ring, cav_res, gain_A=0.5, gain_P=0.5, delay=2.7)
            assert loop.delay == 2
            
    class TestTunerLoop:
        
        @pytest.fixture
        def tuner_loop(self, demo_ring, cav_res_tracking):
            return TunerLoop(demo_ring, cav_res_tracking, gain=1, avering_period=2, offset=0)
        
        def test_track_once(self, tuner_loop):
            initial_psi = tuner_loop.cav_res.psi
            tuner_loop.track()
            assert tuner_loop.diff != 0
            assert tuner_loop.count == 1
            assert tuner_loop.cav_res.psi == initial_psi
    
        # Track method correctly adjusts cav_res.psi when count equals avering_period
        def test_track_adjusts_psi(self, tuner_loop):
            initial_psi = tuner_loop.cav_res.psi
            for _ in range(tuner_loop.avering_period+1):
                tuner_loop.track()
            assert tuner_loop.diff == 0
            assert tuner_loop.cav_res.psi != initial_psi
    
        # Initialize with avering_period set to None and verify default calculation
        def test_default_averaging_period_calculation(self, demo_ring, cav_res_tracking):
            tuner_loop = TunerLoop(demo_ring, cav_res_tracking, avering_period=None)
            fs = demo_ring.synchrotron_tune(cav_res_tracking.Vc) * demo_ring.f0
            expected_period = int(2 / fs / demo_ring.T0)
            assert tuner_loop.avering_period == expected_period
    
        # Test track method with zero gain to ensure no changes to cav_res.psi
        def test_track_with_zero_gain(self, tuner_loop):
            tuner_loop.Pgain = 0
            initial_psi = tuner_loop.cav_res.psi
            for _ in range(tuner_loop.avering_period):
                tuner_loop.track()
            assert tuner_loop.cav_res.psi == initial_psi
    
    class TestProportionalIntegralLoop:
        
        @pytest.fixture
        def pi_loop(self, demo_ring, cav_res_tracking):
            loop = ProportionalIntegralLoop(demo_ring, cav_res_tracking, [0.5, 1e4], 8, 7, 5)
            return loop
    
        def test_track(self, pi_loop):
            initial_ig_phasor = pi_loop.ig_phasor.copy()
            generator_phasor_record_init = pi_loop.cav_res.generator_phasor_record.copy()
            pi_loop.track()
            assert not np.array_equal(initial_ig_phasor, pi_loop.ig_phasor)
            assert not np.array_equal(generator_phasor_record_init, pi_loop.cav_res.generator_phasor_record)
            
        # Ig2Vg method correctly updates generator phasor record and cavity parameters
        def test_Ig2Vg_updates_generator_phasor(self, pi_loop):
            generator_phasor_record_init = pi_loop.cav_res.generator_phasor_record.copy()
            Vg_init = pi_loop.cav_res.Vg
            theta_g_init = pi_loop.cav_res.theta_g
            pi_loop.Ig2Vg()
            assert not np.allclose(generator_phasor_record_init, pi_loop.cav_res.generator_phasor_record)
            assert pi_loop.cav_res.Vg != Vg_init
            assert pi_loop.cav_res.theta_g != theta_g_init
    
        # IIR filter processes input correctly and returns expected output
        def test_IIR_filter_output(self, pi_loop):
            input_signal = np.array([1.0 + 1.0j])
            output = pi_loop.IIR(input_signal)
            assert output is not None
    
        # Init_FFconst initializes feedforward constant based on FF flag
        def test_init_FFconst_with_FF_true(self, pi_loop):
            pi_loop.init_FFconst()
            assert pi_loop.FFconst != 0
            
        def test_parameters(self, pi_loop):
            assert isinstance(pi_loop.Vg2Ig(1e6), complex)
            pi_loop.IIR_init(1e9)
            assert isinstance(pi_loop.IIRcutoff, float)
            assert isinstance(pi_loop.IIR(1.0 + 1.0j), complex)
    
    class TestDirectFeedback:
        
        @pytest.fixture
        def drf_fb(self, demo_ring, cav_res_tracking):
            drf_fb = DirectFeedback(DFB_gain=1.0, 
                                  DFB_phase_shift=0.1, 
                                  ring=demo_ring, 
                                  cav_res=cav_res_tracking, 
                                  gain=[0.5, 1e4], 
                                  sample_num=8, 
                                  every=7, 
                                  delay=5)
            return drf_fb
        
        def test_properties(self, drf_fb):
            assert isinstance(drf_fb.DFB_phase_shift, float)
            assert isinstance(drf_fb.phase_shift, float)
            assert isinstance(drf_fb.DFB_psi, float)
            assert isinstance(drf_fb.DFB_alpha, float)
            assert isinstance(drf_fb.DFB_gamma, float)
            assert isinstance(drf_fb.DFB_Rs, float)
            
        def test_methods(self, drf_fb):
            vg_main, vg_drf = drf_fb.DFB_Vg()
            assert isinstance(vg_main, complex)
            assert isinstance(vg_drf, complex)
            assert isinstance(drf_fb.DFB_fs(), float)
    
        def test_track(self, drf_fb):
            initial_ig_phasor_record = drf_fb.ig_phasor_record.copy()
            initial_dfb_ig_phasor = drf_fb.DFB_ig_phasor.copy()
            generator_phasor_record_init = drf_fb.cav_res.generator_phasor_record.copy()
            drf_fb.track()
            assert not np.array_equal(drf_fb.ig_phasor_record, initial_ig_phasor_record)
            assert not np.array_equal(drf_fb.DFB_ig_phasor, initial_dfb_ig_phasor)
            assert not np.array_equal(generator_phasor_record_init, drf_fb.cav_res.generator_phasor_record)
    
        # Set DFB parameters using DFB_parameter_set and verify changes in DFB_ig_phasor
        def test_set_dfb_parameters(self, drf_fb):
            initial_DFB_ig_phasor = drf_fb.DFB_ig_phasor.copy()
            initial_ig_phasor = drf_fb.ig_phasor.copy()
            
            drf_fb.DFB_parameter_set(DFB_gain=2.0, DFB_phase_shift=0.2)
            
            assert not np.array_equal(drf_fb.DFB_ig_phasor, initial_DFB_ig_phasor)
            assert not np.array_equal(drf_fb.ig_phasor, initial_ig_phasor)
            assert drf_fb.DFB_gain == 2.0
            assert drf_fb.DFB_phase_shift == 0.2