Skip to content

Commit d2e369f

Browse files
authored
[EXPERIMENTAL] Add interactive PID step response analysis window (#43)
* EXPERIMENTAL: Generates an estimated step response and uncertainty from flight data
1 parent e68efb0 commit d2e369f

2 files changed

Lines changed: 271 additions & 0 deletions

File tree

autotune/data_selection_window.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from data_extractor import DataExtractor
44
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
55
from matplotlib.widgets import SpanSelector
6+
from pid_analyse_window import PIDAnalyseWindow
67
from PyQt5.QtWidgets import (
78
QComboBox,
89
QDialog,
@@ -54,6 +55,18 @@ def __init__(self, filename):
5455
"output": "vehicle_angular_velocity/xyz[2].0",
5556
"input_legacy": "actuator_controls_1/control[2].0",
5657
},
58+
"Rollrate(closed-loop)": {
59+
"input": "vehicle_rates_setpoint/roll.0",
60+
"output": "vehicle_angular_velocity/xyz[0].0",
61+
},
62+
"Pitchrate(closed-loop)": {
63+
"input": "vehicle_rates_setpoint/pitch.0",
64+
"output": "vehicle_angular_velocity/xyz[1].0",
65+
},
66+
"Yawrate(closed-loop)": {
67+
"input": "vehicle_rates_setpoint/yaw.0",
68+
"output": "vehicle_angular_velocity/xyz[2].0",
69+
},
5770
}
5871

5972
self.presets = {}
@@ -106,6 +119,10 @@ def __init__(self, filename):
106119
btn_ok.clicked.connect(self.loadSelection)
107120
layout_v.addWidget(btn_ok)
108121

122+
pid_btn_ok = QPushButton("Analyse Current Tuning")
123+
pid_btn_ok.clicked.connect(self.plotPIDAnalysis)
124+
layout_v.addWidget(pid_btn_ok)
125+
109126
self.setLayout(layout_v)
110127

111128
if filename:
@@ -291,6 +308,38 @@ def onselect(self, xmin, xmax):
291308

292309
self.plotCoherence()
293310

311+
def plotPIDAnalysis(self):
312+
if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0:
313+
return
314+
315+
# Get input/output data
316+
if (
317+
self.t_start is not None
318+
and self.t_stop is not None
319+
and self.t_stop > self.t_start
320+
):
321+
t_sel, u_sel, y_sel, _ = self.data_extractor.getInputOutputData(
322+
self.topics[self.index_u],
323+
self.topics[self.index_y],
324+
self.t_start,
325+
self.t_stop,
326+
)
327+
else:
328+
t_sel, u_sel, y_sel, _ = self.data_extractor.getInputOutputData(
329+
self.topics[self.index_u], self.topics[self.index_y]
330+
)
331+
332+
# Create analysis window if not open yet
333+
if not hasattr(self, "pid_window") or self.pid_window is None:
334+
self.pid_window = PIDAnalyseWindow(self)
335+
336+
# Show
337+
self.pid_window.show()
338+
self.pid_window.raise_()
339+
340+
# Update plot
341+
self.pid_window.generate_step_response(u_sel, y_sel, t_sel)
342+
294343
def plotCoherence(self):
295344
if len(self.t) == 0 or len(self.u) == 0 or len(self.y) == 0:
296345
return

autotune/pid_analyse_window.py

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
import matplotlib.pyplot as plt
2+
import numpy as np
3+
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
4+
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
5+
from numpy.lib.stride_tricks import sliding_window_view
6+
from PyQt5.QtWidgets import QDialog, QVBoxLayout
7+
from scipy.ndimage import gaussian_filter1d
8+
9+
10+
class PIDAnalyseWindow(QDialog):
11+
"""Dialog window to show estimated step response."""
12+
13+
def __init__(self, parent=None):
14+
super().__init__(parent)
15+
self.setWindowTitle("Estimated Step Response")
16+
17+
# Figure and canvas
18+
self.figure, self.ax = plt.subplots(figsize=(8, 5), constrained_layout=True)
19+
self.canvas = FigureCanvas(self.figure)
20+
21+
# Layout
22+
layout = QVBoxLayout()
23+
layout.addWidget(NavigationToolbar(self.canvas, self))
24+
layout.addWidget(self.canvas)
25+
self.setLayout(layout)
26+
27+
def generate_step_response(
28+
self, u: np.ndarray, y: np.ndarray, t: np.ndarray
29+
) -> dict:
30+
"""Compute and plot step response."""
31+
self.ax.clear()
32+
33+
time, responses = compute_step_responses(u, y, t)
34+
if responses is None:
35+
return {}
36+
37+
metrics = compute_step_response_metrics(time, responses)
38+
plot_step_responses(time, responses, metrics, ax=self.ax)
39+
40+
self.canvas.draw()
41+
return metrics
42+
43+
44+
def compute_step_responses(
45+
u: np.ndarray,
46+
y: np.ndarray,
47+
t: np.ndarray,
48+
cutfreq: float = 25.0,
49+
window_duration: float = 1.0,
50+
):
51+
"""Compute step responses from input/output signals."""
52+
dt = np.diff(t).mean()
53+
if np.isclose(dt, 0):
54+
return None, None
55+
fs = 1 / dt
56+
57+
frame_samples = int(window_duration * fs)
58+
shift = frame_samples // 16
59+
response_window_samples = int(0.5 * fs)
60+
time_response = t[:response_window_samples] - t[0]
61+
62+
# Sliding windows
63+
u_windows = sliding_window_view(u, frame_samples)[::shift]
64+
y_windows = sliding_window_view(y, frame_samples)[::shift]
65+
66+
# Apply Hanning window
67+
window_func = np.hanning(frame_samples)
68+
u_windows = u_windows * window_func
69+
y_windows = y_windows * window_func
70+
71+
# Deconvolution
72+
deconvolved = wiener_deconvolution(u_windows, y_windows, cutfreq, fs)
73+
step_responses = deconvolved[:, :response_window_samples].cumsum(axis=1)
74+
75+
# Remove outlier windows
76+
peaks = step_responses.max(axis=1)
77+
median_peak = np.median(peaks)
78+
mad = np.median(np.abs(peaks - median_peak))
79+
mask = np.abs(peaks - median_peak) < 3 * mad
80+
step_responses = step_responses[mask]
81+
82+
return time_response, step_responses
83+
84+
85+
def compute_step_response_metrics(time: np.ndarray, responses: np.ndarray) -> dict:
86+
"""Compute rise time, settling time, overshoot, steady-state error."""
87+
rise_times, settling_times, overshoots, steady_state_errors = [], [], [], []
88+
89+
for resp in responses:
90+
final_val = resp[-1]
91+
92+
# Rise time (10% -> 90%)
93+
try:
94+
t10 = time[np.where(resp >= 0.1 * final_val)[0][0]]
95+
t90 = time[np.where(resp >= 0.9 * final_val)[0][0]]
96+
rise_times.append(t90 - t10)
97+
except IndexError:
98+
rise_times.append(np.nan)
99+
100+
# Settling time (within ±10% of final value)
101+
tolerance = 0.1 * final_val
102+
within_bounds = np.where(np.abs(resp - final_val) <= tolerance)[0]
103+
if len(within_bounds) > 0:
104+
for idx in within_bounds:
105+
# check that response stays within bounds till the end
106+
if np.all(np.abs(resp[idx:] - final_val) <= tolerance):
107+
settling_times.append(time[idx])
108+
break
109+
else:
110+
settling_times.append(np.nan)
111+
else:
112+
settling_times.append(np.nan)
113+
114+
# Overshoot
115+
peak_val = np.max(resp[: int(len(resp) * 0.8)]) # ignore drift at end
116+
overshoot = (peak_val - final_val) / final_val * 100
117+
overshoots.append(overshoot)
118+
119+
# Steady-state error (final value vs ideal 1.0)
120+
steady_state_errors.append(final_val - 1)
121+
122+
metrics = {
123+
"rise_time (s)": (np.nanmean(rise_times), np.nanstd(rise_times)),
124+
"settling_time (s)": (np.nanmean(settling_times), np.nanstd(settling_times)),
125+
"overshoot (%)": (np.nanmean(overshoots), np.nanstd(overshoots)),
126+
"steady_state_error": (
127+
np.nanmean(steady_state_errors),
128+
np.nanstd(steady_state_errors),
129+
),
130+
}
131+
132+
return metrics
133+
134+
135+
def plot_step_responses(
136+
time: np.ndarray, responses: np.ndarray, metrics: dict = None, ax=None
137+
):
138+
"""Plot step responses and show metrics."""
139+
mean_resp = responses.mean(axis=0)
140+
std_resp = responses.std(axis=0)
141+
142+
if ax is None:
143+
fig, ax = plt.subplots(figsize=(8, 5))
144+
145+
# Plot all responses lightly
146+
ax.plot(time, responses.T, alpha=0.2, color="gray")
147+
ax.plot(time, mean_resp, color="blue", linewidth=2, label="Mean response")
148+
ax.fill_between(
149+
time,
150+
mean_resp - std_resp,
151+
mean_resp + std_resp,
152+
color="blue",
153+
alpha=0.2,
154+
label="±1 std",
155+
)
156+
157+
# Reference step input
158+
ax.plot([time[0], 0, time[-1]], [0, 1, 1], "k--", label="Step Input")
159+
160+
ax.set_xlabel("Time [s]")
161+
ax.set_ylabel("Step Response")
162+
ax.set_title("Estimated Step Response")
163+
ax.legend(loc="upper right")
164+
ax.grid(True)
165+
166+
if metrics:
167+
metrics_text = "\n".join(
168+
f"{k}: {v[0]:.2f} ± {v[1]:.2f}"
169+
for k, v in metrics.items()
170+
if not np.isnan(v[0])
171+
)
172+
ax.text(
173+
0.98,
174+
0.02,
175+
metrics_text,
176+
ha="right",
177+
va="bottom",
178+
transform=ax.transAxes,
179+
fontsize=9,
180+
color="gray",
181+
)
182+
183+
return ax
184+
185+
186+
def wiener_deconvolution(
187+
input_: np.ndarray,
188+
output: np.ndarray,
189+
cutoff_freq: float,
190+
fs: float,
191+
epsilon: float = 1e-3,
192+
) -> np.ndarray:
193+
"""Wiener deconvolution on input/output signals."""
194+
n_samples = input_.shape[1]
195+
n_fft = 2 ** int(np.ceil(np.log2(n_samples)))
196+
197+
# Pad for FFT
198+
input_padded = np.pad(input_, ((0, 0), (0, n_fft - n_samples)), mode="constant")
199+
output_padded = np.pad(output, ((0, 0), (0, n_fft - n_samples)), mode="constant")
200+
201+
# FFT
202+
H = np.fft.fft(input_padded, axis=-1)
203+
G = np.fft.fft(output_padded, axis=-1)
204+
205+
# Wiener filter
206+
snr = create_frequency_mask(n_fft, cutoff_freq, fs)
207+
H_conj = np.conj(H)
208+
deconv_freq = (H_conj * G) / (H * H_conj + epsilon / snr[None, :])
209+
210+
# IFFT to get impulse response
211+
deconvolved = np.real(np.fft.ifft(deconv_freq, axis=-1))
212+
return deconvolved[:, :n_samples]
213+
214+
215+
def create_frequency_mask(
216+
n_samples: int, cutoff_freq: float, fs: float, sigma_factor: float = 6.0
217+
) -> np.ndarray:
218+
"""Create a smooth low-pass Gaussian frequency mask."""
219+
freqs = np.fft.fftfreq(n_samples, 1 / fs)
220+
mask = np.exp(-0.5 * (freqs / cutoff_freq) ** 2)
221+
mask = gaussian_filter1d(mask, sigma=n_samples / sigma_factor)
222+
return np.clip(mask, 1e-3, 1.0)

0 commit comments

Comments
 (0)