Skip to content
Snippets Groups Projects
Commit fc48bffa authored by Alexis GAMELIN's avatar Alexis GAMELIN
Browse files

Bug fix and reformat the feedback module

[Fix] Bug when FIRDamper was used to track a Beam without MPI.
Various docstring modifications and code reformating.

remove FeedBack.py file

Add feedback.py file
parent 7d37efa0
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 8 16:24:19 2022
@author: sauret
"""
from mbtrack2.tracking.element import Element
import numpy as np
class SimpleFB(Element) :
"""
Simple feedback system proceeding bunch by bunch correction to counter beam
instabilities
Parameters
----------
ring : Synchrotron object
damping_time : array of shape (3,)
Contains the desired damping time in seconds for each plane in the same
order sepcified in plane.
phase_diff : array of shape (3,)
Contains the desired phase in degree for each plane in the same
order sepcified in plane.
plane : bool array of shape (3,), optional
Allow to choose on which plane the feedback is active. State True to
turn on the feedback or False otherwise. Element 0 of the array defines
the longitunal plane as 1 and 2 define the horizontal and vertical plane
respectively.
"""
def __init__(self, ring,phase_diff, damping_time, plane = np.zeros((3,), dtype= bool)):
self.ring = ring
self.damping_time = damping_time
self.phase_diff = phase_diff
self.plane = plane
@Element.parallel
def track(self, bunch):
"""
Tracking method for the feedback system
No bunch to bunch interaction, so written for Bunch object and
@Element.parallel is used to handle Beam object.
Parameters
----------
bunch : Bunch or Beam object
"""
if(self.plane[0] == True):
bunch["xp"] -= (2*self.ring.T0/self.damping_time[0])*np.sin(self.phase_diff[0])*bunch.mean[1]
if(self.plane[1] == True):
bunch["yp"] -= (2*self.ring.T0/self.damping_time[1])*np.sin(self.phase_diff[1])*bunch.mean[3]
if(self.plane[2] == True):
bunch["delta"] -= (2*self.ring.T0/self.damping_time[2])*np.sin(self.phase_diff[1])*bunch.mean[5]
\ No newline at end of file
......@@ -20,3 +20,5 @@ from mbtrack2.tracking.aperture import (CircularAperture,
from mbtrack2.tracking.wakepotential import (WakePotential,
LongRangeResistiveWall)
from mbtrack2.tracking.feedback import (ExponentialDamper,
FIRDamper)
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
This module defines both simple and FIR based feedback for tracking.
This module defines both simple and FIR based bunch by bunch feedback for
tracking.
"""
from mbtrack2.tracking.element import Element
import numpy as np
import matplotlib.pyplot as plt
from mbtrack2.tracking import Element, Beam, Bunch
class SimpleFB(Element) :
class ExponentialDamper(Element):
"""
Simple feedback system proceeding bunch by bunch correction to counter beam
instabilities
Parameters
----------
ring : Synchrotron object
damping_time : array of shape (3,)
Contains the desired damping time in seconds for each plane. Element 0 of
the array defines the horizontal plane as 1 and 2 define the vertical
and longitudinal plane respectively.
phase_diff : array of shape (3,)
Contains the desired phase in degree for each plane. Element 0 of the
array defines the horizontal plane as 1 and 2 define the vertical and
longitudinal plane respectively.
plane : bool array of shape (3,), optional
Allow to choose on which plane the feedback is active. State True to
turn on the feedback or False otherwise. Element 0 of the array defines
the horizontal plane as 1 and 2 define the vertical and longitudinal plane
respectively.
"""
Simple bunch by bunch damper feedback system based on exponential damping.
def __init__(self, ring, damping_time, phase_diff, plane = np.zeros((3,), dtype= bool)):
Parameters
----------
ring : Synchrotron object
Synchrotron to use.
plane : bool array of shape (3,)
Allow to choose on which planes the damper is active.
damping_time : array of shape (3,)
Contains the desired damping time in [s] for each plane.
phase_diff : array of shape (3,)
Contains the desired phase in degree for each plane.
"""
def __init__(self, ring, plane, damping_time, phase_diff):
self.ring = ring
self.damping_time = damping_time
self.phase_diff = phase_diff
self.plane = plane
self.plane = plane
@Element.parallel
def track(self, bunch):
......@@ -55,83 +49,64 @@ class SimpleFB(Element) :
if(self.plane[2] == True):
bunch["delta"] -= (2*self.ring.T0/self.damping_time[2])*np.sin(self.phase_diff[1])*bunch.mean[5]
class FIR_Feedback(Element) :
class FIRDamper(Element) :
"""
Feedback system based on FIR filters, proceeding bunch by bunch correction
to counter beam instabilities.
Bunch by bunch damper feedback system based on FIR filters.
Parameters
----------
ring : Synchrotron object
Synchrotron to use.
plane : list of str
Allow to choose on which planes the damper is active.
Options are: {"x","y","s"}.
tune : float array of shape (3,)
Betatron tune for horizontal and vertical plane in the first and second
elements of the array. Synchrotron tune for the third element.
Reference (betatron or synchrotron) tunes for which the damper system
is set.
turn_delay : int array of shape (3,)
Number of turn delay before applying kick. Element 0 of the array
defines the horizontal plane as 1 and 2 define the vertical and
longitudinal plane respectively.
Number of turn delay before applying kick.
tap_number : int array of shape (3,)
Number of tap for the FIR filters. Element 0 of the array defines
the horizontal plane as 1 and 2 define the vertical and longitudinal
plane respectively.
Number of tap for the FIR filters.
gain : float array of shape (3,)
Gain of the FIR filters. Element 0 of the array defines
the horizontal plane as 1 and 2 define the vertical and longitudinal
plane respectively.
Gain of the FIR filters.
phase : float array of shape (3,)
Phase of the FIR filters in degree. Element 0 of the array defines
the horizontal plane as 1 and 2 define the vertical and longitudinal
plane respectively.
Phase of the FIR filters in degree.
bpm_error : float array of shape (3,), optional
Stores the BPM error [m] for horizontal and vertical, [s] for longitudinal.
First, second and third element corespond to the horizontal, vertical
and longitudinal plane respectively.
Stores the BPM error in [m] for horizontal and vertical, and in [s] for
longitudinal.
max_kick : float array of shape (3,), optional
Stores the maximum kick limitation. First, second and third
element correspond to the horizontal, vertical and longitudinal plane
respectively.
max_kick_state : bool array of shape (3,), optional
Allow to choose on which plane the max kick limitation is active. State
True to turn on the max kick limitation or False otherwise. Element 0
of the array defines the horizontal plane as 1 and 2 define the vertical
and longitudinal plane respectively.
plane : bool array of shape (3,), optional
Allow to choose on which plane the feedback is active. State True to
turn on the feedback or False otherwise. Element 0 of the array defines
the horizontal plane as 1 and 2 define the vertical and longitudinal
plane respectively.
Stores the maximum kick limitation.
Attributes
----------
x_pos : array
Retrieves and stores the position of the bunch, measured by the BPM,
in horizontal plane.
y_pos : array
Retrieves and stores the position of the bunch, measured by the BPM,
in vertical plane.
tau_pos : array
Retrieves and stores the position of the bunch, measured by the BPM,
in longitudinal plane.
kick_x : array
Stores the numerical values of the kicks for the horizontal plane.
kick_y : array
Stores the numerical values of the kicks for the vertical plane.
kick_tau : array
Stores the numerical values of the kicks for the longitudinal plane.
coef_x : array
Stores the coefficients of the FIR filter for the horizontal plane.
coef_y : array
Stores the coefficients of the FIR filter for the vertical plane.
coef_tau : array
Stores the coefficients of the FIR filter for the longitudinal plane.
x_pos : array
Stored horizontal beam postion.
y_pos : array
Stored vertical beam postion.
tau_pos : array
Stored longitudinal beam postion.
kick_x : array
Stored horizontal kicks.
kick_y : array
Stored vertical kicks.
kick_tau : array
Stored longitudinal kicks.
coef_x : array
Coefficients of the FIR filter for the horizontal plane.
coef_y : array
Coefficients of the FIR filter for the vertical plane.
coef_tau : array
Coefficients of the FIR filter for the longitudinal plane.
Methods
-------
get_fir(tap_number, tune, phase, turn_delay, gain)
Initialize the FIR filter for the desired plane and return an array
containing the FIR coefficients.
Initialize the FIR filter for the desired plane and return an array
containing the FIR coefficients.
plot_fir(plane)
Plot the gain and the phase of the FIR filter selected.
Plot the gain and the phase of the selected FIR filter.
track(beam_or_bunch)
Tracking method.
References
----------
......@@ -140,8 +115,9 @@ class FIR_Feedback(Element) :
storage ring.
"""
def __init__(self, ring, tune, turn_delay, tap_number, gain, phase, bpm_error = np.zeros((3,)),
max_kick = np.zeros((3,)),plane = np.zeros((3,), dtype = bool)):
def __init__(self, ring, plane, tune, turn_delay, tap_number, gain, phase,
bpm_error = np.zeros((3,)),
max_kick = np.zeros((3,))):
self.ring = ring
self.tune = tune
......@@ -152,12 +128,16 @@ class FIR_Feedback(Element) :
self.bpm_error = bpm_error
self.max_kick = max_kick
self.plane = plane
self.x_pos = np.zeros((self.tap_number[0],))
self.y_pos = np.zeros((self.tap_number[1],))
self.tau_pos = np.zeros((self.tap_number[2],))
self.kick_x = np.zeros((self.turn_delay[0]+1,))
self.kick_y = np.zeros((self.turn_delay[1]+1,))
self.kick_tau = np.zeros((self.turn_delay[2]+1,))
self.plane_dict = {"x":0 , "y":1 , "s":2}
self.beam_no_mpi = False
self.x_pos = np.zeros((self.tap_number[0],1))
self.y_pos = np.zeros((self.tap_number[1],1))
self.tau_pos = np.zeros((self.tap_number[2],1))
self.kick_x = np.zeros((self.turn_delay[0]+1,1))
self.kick_y = np.zeros((self.turn_delay[1]+1,1))
self.kick_tau = np.zeros((self.turn_delay[2]+1,1))
self.coef_x = self.get_fir(self.tap_number[0], self.tune[0], self.phase[0], self.turn_delay[0], self.gain[0])
self.coef_y = self.get_fir(self.tap_number[1], self.tune[1], self.phase[1], self.turn_delay[1], self.gain[1])
self.coef_tau = self.get_fir(self.tap_number[2], self.tune[2], self.phase[2], self.turn_delay[2], self.gain[2])
......@@ -198,101 +178,140 @@ class FIR_Feedback(Element) :
def plot_fir(self, plane):
"""
Plot the gain and the phase of the FIR filter for the desired plane
Plot the gain and the phase of the FIR filter for the desired plane.
Parameters
----------
plane : string
States "x" for the horizontal plane, "y" for the vertical and
"long" for the longitudinal one.
Returns
-------
None.
"""
tune = np.arange(0,1,0.0001)
plane_dict = {"x":0 , "y":1 , "long":2}
index_plane = plane_dict[plane]
t = np.array([len(self.coef_x), len(self.coef_y), len(self.coef_tau)])
latency = np.exp(-1j*2*np.pi*tune*self.turn_delay[index_plane])
H_FIR = 0
liste = []
tune = np.arange(0, 1, 0.0001)
index_plane = self.plane_dict[plane]
if(plane == "x"):
liste = self.coef_x
coef = self.coef_x
elif(plane == "y"):
liste = self.coef_y
elif(plane == "long"):
liste = self.coef_tau
for k in range(t[index_plane]):
H_FIR += liste[k]*np.exp(-1j*2*np.pi*(k)*tune)
coef = self.coef_y
elif(plane == "s"):
coef = self.coef_tau
H_FIR = 0
for k in range(len(coef)):
H_FIR += coef[k]*np.exp(-1j*2*np.pi*(k)*tune)
latency = np.exp(-1j*2*np.pi*tune*self.turn_delay[index_plane])
H_tot = H_FIR * latency
gain = np.abs(H_tot)
phase = np.angle(H_tot, deg = True)
plt.figure("Gain"+str(index_plane))
plt.plot(tune, gain)
plt.title("Gain" + str(index_plane))
plt.xlabel("Tune")
plt.figure("Phase"+str(index_plane))
plt.plot(tune, phase)
plt.title("Phase in degree" + str(index_plane))
plt.xlabel("Tune")
plt.ylabel("Degree")
fig, [ax1, ax2] = plt.subplots(1,2)
ax1.plot(tune, gain)
ax1.title("Gain")
ax1.xlabel("Tune")
ax2.plot(tune, phase)
ax2.title("Phase in degree")
ax2.xlabel("Tune")
ax2.ylabel("Degree")
@Element.parallel
def track(self, bunch):
def track(self, beam_or_bunch):
"""
Tracking method for the feedback system
No bunch to bunch interaction, so written for Bunch object and
@Element.parallel is used to handle Beam object.
Tracking method.
Parameters
----------
bunch : Bunch or Beam object
"""
if(self.plane[0] == True):
self.x_pos[0] = bunch.mean[0] + np.random.normal(0, self.bpm_error[0])
x_kick = 0
for k in range(self.tap_number[0]):
x_kick += self.coef_x[k]*self.x_pos[k]
if(self.max_kick[0] != 0 and x_kick > self.max_kick[0]):
x_kick = self.max_kick[0]
if(self.max_kick[0] != 0 and x_kick < -1*self.max_kick[0]):
x_kick = -1*self.max_kick[0]
self.kick_x[-1] = x_kick
bunch["xp"] += self.kick_x[0]
self.x_pos = np.roll(self.x_pos,1)
self.kick_x = np.roll(self.kick_x, -1)
beam_or_bunch : Beam or Bunch
Data to track.
if(self.plane[1] == True):
self.y_pos[0] = bunch.mean[2] + np.random.normal(0, self.bpm_error[1])
y_kick = 0
"""
if isinstance(beam_or_bunch, Bunch):
self.track_sb(beam_or_bunch)
elif isinstance(beam_or_bunch, Beam):
beam = beam_or_bunch
if (beam.mpi_switch == True):
self.track_sb(beam[beam.mpi.bunch_num])
else:
if self.beam_no_mpi is False:
self.init_beam_no_mpi(beam)
for i, bunch in enumerate(beam.not_empty):
self.track_sb(bunch, i)
else:
TypeError("beam_or_bunch must be a Beam or Bunch")
for k in range(self.tap_number[1]):
y_kick += self.coef_y[k]*self.y_pos[k]
if(self.max_kick[1] != 0 and y_kick > self.max_kick[1]):
y_kick = self.max_kick[1]
if(self.max_kick[1] != 0 and y_kick < -1*self.max_kick[1]):
y_kick = -1*self.max_kick[1]
self.kick_y[-1] = y_kick
bunch["yp"] += self.kick_y[0]
self.y_pos = np.roll(self.y_pos,1)
self.kick_y = np.roll(self.kick_y, -1)
def init_beam_no_mpi(self, beam):
"""
Change array sizes if Beam is used without mpi.
Parameters
----------
beam : Beam
Beam to track.
"""
n_bunch = len(beam)
self.x_pos = np.zeros((self.tap_number[0], n_bunch))
self.y_pos = np.zeros((self.tap_number[1], n_bunch))
self.tau_pos = np.zeros((self.tap_number[2], n_bunch))
self.kick_x = np.zeros((self.turn_delay[0]+1, n_bunch))
self.kick_y = np.zeros((self.turn_delay[1]+1, n_bunch))
self.kick_tau = np.zeros((self.turn_delay[2]+1, n_bunch))
self.beam_no_mpi = True
def track_sb(self, bunch, bunch_number=0):
"""
Core of the tracking method.
Parameters
----------
bunch : Bunch
Bunch to track.
bunch_number : int, optional
Number of bunch in beam.not_empty. The default is 0.
if(self.plane[2] == True):
self.tau_pos[0] = bunch.mean[4] + np.random.normal(0, self.bpm_error[2])
tau_kick = 0
"""
for plane in self.plane:
if plane == "x":
pos = self.x_pos
kick_record = self.kick_x
coef = self.coef_x
idx = 0
mean = 0
action = "xp"
elif plane == "y":
pos = self.y_pos
kick_record = self.kick_y
coef = self.coef_y
idx = 1
mean = 2
action = "yp"
elif plane == "s":
pos = self.tau_pos
kick_record = self.kick_tau
coef = self.coef_tau
idx = 2
mean = 4
action = "delta"
else:
raise ValueError("plane can only be x, y or s.")
for k in range(self.tap_number[2]):
tau_kick += self.coef_tau[k]*self.tau_pos[k]
if(self.max_kick[2] != 0 and tau_kick > self.max_kick[2]):
tau_kick = self.max_kick[2]
if(self.max_kick[2] != 0 and tau_kick < -1*self.max_kick[2]):
tau_kick = -1*self.max_kick[2]
pos[0, bunch_number] = bunch.mean[mean]
if self.bpm_error[idx] != 0:
pos[0, bunch_number] += np.random.normal(0, self.bpm_error[idx])
self.kick_tau[-1] = tau_kick
bunch["delta"] += self.kick_tau[0]
self.tau_pos = np.roll(self.tau_pos,1)
self.kick_tau = np.roll(self.kick_tau, -1)
\ No newline at end of file
kick = 0
for k in range(self.tap_number[idx]):
kick += coef[k]*pos[k, bunch_number]
if self.max_kick[idx] != 0:
if kick > self.max_kick[idx]:
kick = self.max_kick[idx]
elif kick < -1*self.max_kick[idx]:
kick = -1*self.max_kick[idx]
kick_record[-1, bunch_number] = kick
bunch[action] += kick_record[0, bunch_number]
pos[:, bunch_number] = np.roll(pos[:, bunch_number], 1)
kick_record[:, bunch_number] = np.roll(kick_record[:, bunch_number], -1)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment