diff --git a/mbtrack2/tracking/feedback.py b/mbtrack2/tracking/feedback.py index 23287b836243301b20990f95ed27620cbd9f78b2..c16afbdbfca7679d033206a0261b0af53fca9db0 100644 --- a/mbtrack2/tracking/feedback.py +++ b/mbtrack2/tracking/feedback.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ -This module defines both simple and FIR based bunch by bunch feedback for -tracking. +This module defines both exponential and FIR based bunch by bunch damper +feedback for tracking. """ import numpy as np import matplotlib.pyplot as plt @@ -15,12 +15,12 @@ class ExponentialDamper(Element): ---------- ring : Synchrotron object Synchrotron to use. - plane : bool array of shape (3,) - Allow to choose on which planes the damper is active. - damping_time : array of shape (3,) - Contains the desired damping time in [s] for each plane. + plane : {"x","y","s"} + Allow to choose on which plane the damper is active. + damping_time : float + Damping time in [s]. phase_diff : array of shape (3,) - Contains the desired phase in degree for each plane. + Phase difference between the damper and the position monitor in [rad]. """ def __init__(self, ring, plane, damping_time, phase_diff): @@ -28,7 +28,21 @@ class ExponentialDamper(Element): self.damping_time = damping_time self.phase_diff = phase_diff self.plane = plane - + if self.plane == "x": + self.action = "xp" + self.damp_idx = 0 + self.mean_idx = 1 + elif self.plane == "y": + self.action = "yp" + self.damp_idx = 1 + self.mean_idx = 3 + elif self.plane == "s": + self.action = "delta" + self.damp_idx = 2 + self.mean_idx = 5 + else: + raise ValueError(f"plane should be x, y or s, not {self.plane}") + @Element.parallel def track(self, bunch): """ @@ -40,71 +54,60 @@ class ExponentialDamper(Element): ---------- bunch : Bunch or Beam object """ - if(self.plane[0] == True): - bunch["xp"] -= (2*self.ring.T0/self.damping_time[0])*np.sin(self.phase_diff[0])*bunch.mean[1] - - if(self.plane[1] == True): - bunch["yp"] -= (2*self.ring.T0/self.damping_time[1])*np.sin(self.phase_diff[1])*bunch.mean[3] - - if(self.plane[2] == True): - bunch["delta"] -= (2*self.ring.T0/self.damping_time[2])*np.sin(self.phase_diff[1])*bunch.mean[5] + bunch[self.action] -= (2*self.ring.T0/ + self.damping_time[self.damp_idx]* + np.sin(self.phase_diff)* + bunch.mean[self.mean_idx]) -class FIRDamper(Element) : +class FIRDamper(Element): """ Bunch by bunch damper feedback system based on FIR filters. + FIR computation is based on [1]. + Parameters ---------- ring : Synchrotron object Synchrotron to use. - plane : list of str - Allow to choose on which planes the damper is active. - Options are: {"x","y","s"}. - tune : float array of shape (3,) - Reference (betatron or synchrotron) tunes for which the damper system + plane : {"x","y","s"} + Allow to choose on which plane the damper is active. + tune : float + Reference (betatron or synchrotron) tune for which the damper system is set. - turn_delay : int array of shape (3,) - Number of turn delay before applying kick. - tap_number : int array of shape (3,) - Number of tap for the FIR filters. - gain : float array of shape (3,) - Gain of the FIR filters. - phase : float array of shape (3,) - Phase of the FIR filters in degree. - bpm_error : float array of shape (3,), optional - Stores the BPM error in [m] for horizontal and vertical, and in [s] for - longitudinal. - max_kick : float array of shape (3,), optional - Stores the maximum kick limitation. + turn_delay : int + Number of turn delay before the kick is applied. + tap_number : int + Number of tap for the FIR filter. + gain : float + Gain of the FIR filter. + phase : float + Phase of the FIR filter in [degree]. + meas_error : float, optional + RMS measurement error applied to the computed mean position. + Unit is [m] if the plane is "x" or "y" and [s] if the plane is "s". + The default is None. + max_kick : float, optional + Maximum kick strength limitation. + Unit is [rad] if the plane is "x" or "y" and no unit (delta) if the + plane is "s". + The default is None. Attributes ---------- - x_pos : array - Stored horizontal beam postion. - y_pos : array - Stored vertical beam postion. - tau_pos : array - Stored longitudinal beam postion. - kick_x : array - Stored horizontal kicks. - kick_y : array - Stored vertical kicks. - kick_tau : array - Stored longitudinal kicks. - coef_x : array - Coefficients of the FIR filter for the horizontal plane. - coef_y : array - Coefficients of the FIR filter for the vertical plane. - coef_tau : array - Coefficients of the FIR filter for the longitudinal plane. + pos : array + Stored beam postions. + kick : array + Stored damper kicks. + coef : array + Coefficients of the FIR filter. Methods ------- get_fir(tap_number, tune, phase, turn_delay, gain) - Initialize the FIR filter for the desired plane and return an array - containing the FIR coefficients. - plot_fir(plane) - Plot the gain and the phase of the selected FIR filter. + Initialize the FIR filter and return an array containing the FIR + coefficients. + plot_fir() + Plot the gain and the phase of the FIR filter. track(beam_or_bunch) Tracking method. @@ -116,8 +119,7 @@ class FIRDamper(Element) : """ def __init__(self, ring, plane, tune, turn_delay, tap_number, gain, phase, - bpm_error = np.zeros((3,)), - max_kick = np.zeros((3,))): + bpm_error=None, max_kick=None): self.ring = ring self.tune = tune @@ -129,29 +131,36 @@ class FIRDamper(Element) : self.max_kick = max_kick self.plane = plane - self.plane_dict = {"x":0 , "y":1 , "s":2} + if self.plane == "x": + self.action = "xp" + self.damp_idx = 0 + self.mean_idx = 0 + elif self.plane == "y": + self.action = "yp" + self.damp_idx = 1 + self.mean_idx = 2 + elif self.plane == "s": + self.action = "delta" + self.damp_idx = 2 + self.mean_idx = 4 + self.beam_no_mpi = False - self.x_pos = np.zeros((self.tap_number[0],1)) - self.y_pos = np.zeros((self.tap_number[1],1)) - self.tau_pos = np.zeros((self.tap_number[2],1)) - self.kick_x = np.zeros((self.turn_delay[0]+1,1)) - self.kick_y = np.zeros((self.turn_delay[1]+1,1)) - self.kick_tau = np.zeros((self.turn_delay[2]+1,1)) - self.coef_x = self.get_fir(self.tap_number[0], self.tune[0], self.phase[0], self.turn_delay[0], self.gain[0]) - self.coef_y = self.get_fir(self.tap_number[1], self.tune[1], self.phase[1], self.turn_delay[1], self.gain[1]) - self.coef_tau = self.get_fir(self.tap_number[2], self.tune[2], self.phase[2], self.turn_delay[2], self.gain[2]) + self.pos = np.zeros((self.tap_number,1)) + self.kick = np.zeros((self.turn_delay+1,1)) + self.coef = self.get_fir(self.tap_number, self.tune, self.phase, + self.turn_delay, self.gain) def get_fir(self, tap_number, tune, phase, turn_delay, gain): """ - Compute the FIR coefficients for the selected plane(s). - This method is based on the FIR filter design algorithm developped by - T.Nakamura. + Compute the FIR coefficients. + + FIR computation is based on [1]. Returns ------- FIR_coef : array - Array containing the FIR coefficients for the selected plane(s). + Array containing the FIR coefficients. """ it = np.zeros((tap_number,)) CC = np.zeros((5, tap_number,)) @@ -176,45 +185,36 @@ class FIRDamper(Element) : FIR_coef = gain*(D[1][:]*np.cos(zeta) + D[2][:]*np.sin(zeta)) return FIR_coef - def plot_fir(self, plane): + def plot_fir(self): """ - Plot the gain and the phase of the FIR filter for the desired plane. - - Parameters - ---------- - plane : string - States "x" for the horizontal plane, "y" for the vertical and - "long" for the longitudinal one. + Plot the gain and the phase of the FIR filter. + + Returns + ------- + fig : Figure + Plot of the gain and phase. """ tune = np.arange(0, 1, 0.0001) - - index_plane = self.plane_dict[plane] - if(plane == "x"): - coef = self.coef_x - elif(plane == "y"): - coef = self.coef_y - elif(plane == "s"): - coef = self.coef_tau H_FIR = 0 - for k in range(len(coef)): - H_FIR += coef[k]*np.exp(-1j*2*np.pi*(k)*tune) - latency = np.exp(-1j*2*np.pi*tune*self.turn_delay[index_plane]) + for k in range(len(self.coef)): + H_FIR += self.coef[k]*np.exp(-1j*2*np.pi*(k)*tune) + latency = np.exp(-1j*2*np.pi*tune*self.turn_delay) H_tot = H_FIR * latency gain = np.abs(H_tot) phase = np.angle(H_tot, deg = True) - fig, [ax1, ax2] = plt.subplots(1,2) + fig, [ax1, ax2] = plt.subplots(2,1) ax1.plot(tune, gain) - ax1.set_title("Gain") - ax1.set_xlabel("Tune") + ax1.set_ylabel("Gain") ax2.plot(tune, phase) - ax2.set_title("Phase in degree") ax2.set_xlabel("Tune") - ax2.set_ylabel("Degree") + ax2.set_ylabel("Phase in degree") + + return fig def track(self, beam_or_bunch): """ @@ -251,12 +251,8 @@ class FIRDamper(Element) : """ n_bunch = len(beam) - self.x_pos = np.zeros((self.tap_number[0], n_bunch)) - self.y_pos = np.zeros((self.tap_number[1], n_bunch)) - self.tau_pos = np.zeros((self.tap_number[2], n_bunch)) - self.kick_x = np.zeros((self.turn_delay[0]+1, n_bunch)) - self.kick_y = np.zeros((self.turn_delay[1]+1, n_bunch)) - self.kick_tau = np.zeros((self.turn_delay[2]+1, n_bunch)) + self.pos = np.zeros((self.tap_number, n_bunch)) + self.kick = np.zeros((self.turn_delay+1, n_bunch)) self.beam_no_mpi = True def track_sb(self, bunch, bunch_number=0): @@ -268,50 +264,26 @@ class FIRDamper(Element) : bunch : Bunch Bunch to track. bunch_number : int, optional - Number of bunch in beam.not_empty. The default is 0. + Number of bunch in beam.not_empty. + The default is 0. """ - for plane in self.plane: - if plane == "x": - pos = self.x_pos - kick_record = self.kick_x - coef = self.coef_x - idx = 0 - mean = 0 - action = "xp" - elif plane == "y": - pos = self.y_pos - kick_record = self.kick_y - coef = self.coef_y - idx = 1 - mean = 2 - action = "yp" - elif plane == "s": - pos = self.tau_pos - kick_record = self.kick_tau - coef = self.coef_tau - idx = 2 - mean = 4 - action = "delta" - else: - raise ValueError("plane can only be x, y or s.") + self.pos[0, bunch_number] = bunch.mean[self.mean_idx] + if self.bpm_error is not None: + self.pos[0, bunch_number] += np.random.normal(0, self.bpm_error) - pos[0, bunch_number] = bunch.mean[mean] - if self.bpm_error[idx] != 0: - pos[0, bunch_number] += np.random.normal(0, self.bpm_error[idx]) - - kick = 0 - for k in range(self.tap_number[idx]): - kick += coef[k]*pos[k, bunch_number] - - if self.max_kick[idx] != 0: - if kick > self.max_kick[idx]: - kick = self.max_kick[idx] - elif kick < -1*self.max_kick[idx]: - kick = -1*self.max_kick[idx] - - kick_record[-1, bunch_number] = kick - bunch[action] += kick_record[0, bunch_number] + kick = 0 + for k in range(self.tap_number): + kick += self.coef[k]*self.pos[k, bunch_number] - pos[:, bunch_number] = np.roll(pos[:, bunch_number], 1) - kick_record[:, bunch_number] = np.roll(kick_record[:, bunch_number], -1) \ No newline at end of file + if self.max_kick is not None: + if kick > self.max_kick: + kick = self.max_kick + elif kick < -1*self.max_kick: + kick = -1*self.max_kick + + self.kick[-1, bunch_number] = kick + bunch[self.action] += self.kick[0, bunch_number] + + self.pos[:, bunch_number] = np.roll(self.pos[:, bunch_number], 1) + self.kick[:, bunch_number] = np.roll(self.kick[:, bunch_number], -1) \ No newline at end of file