From 4055d563bcc1b121cf7bb9960ae5f3df751a7e31 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Tue, 5 May 2026 17:52:37 +0200 Subject: [PATCH 01/18] working camera calibration, needs cleaning --- village/calibration/camera_calibration.py | 358 +++++++++++++ .../calibration/camera_calibration_grid.py | 72 +++ village/gui/camera_calibration_layout.py | 492 ++++++++++++++++++ village/gui/gui_window.py | 8 + village/gui/layout.py | 28 + 5 files changed, 958 insertions(+) create mode 100644 village/calibration/camera_calibration.py create mode 100644 village/calibration/camera_calibration_grid.py create mode 100644 village/gui/camera_calibration_layout.py diff --git a/village/calibration/camera_calibration.py b/village/calibration/camera_calibration.py new file mode 100644 index 000000000..50b6382ae --- /dev/null +++ b/village/calibration/camera_calibration.py @@ -0,0 +1,358 @@ +from __future__ import annotations + +import json +import traceback +from pathlib import Path +from threading import Thread + +import cv2 +import numpy as np + +from village.scripts.log import log +from village.settings import settings + + +class CameraCalibration: + def __init__(self, spacing_mm: float, + grid_size: tuple[int, int] | None = None) -> None: + self.spacing_mm = spacing_mm + self.measured_spacing_mm: float | None = None + self.grid_size = grid_size + + self.running = False + self.error = False + self.result: dict | None = None + self.diagnostic_data: dict | None = None + self.live_metrics: dict | None = None + self.last_spacing_px: float | None = None + + self._spacing_px: float | None = None + self._detector = _make_blob_detector() + self._obj_points: list[np.ndarray] = [] + self._img_points: list[np.ndarray] = [] + self._image_size: tuple[int, int] | None = None + + @property + def spacing_px(self) -> float | None: + return self._spacing_px + + @spacing_px.setter + def spacing_px(self, value: float | None) -> None: + if value != self._spacing_px: + self._spacing_px = value + self._detector = _make_blob_detector(value) + + @property + def n_detected(self) -> int: + return len(self._obj_points) + + def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, bool]: + annotated = frame.copy() + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + if self._image_size is None: + self._image_size = (gray.shape[1], gray.shape[0]) + + obj_pts, img_pts = _detect_grid(gray, self._detector, + self.spacing_mm, self.grid_size) + found = obj_pts is not None + if found: + self._obj_points.append(obj_pts) + self._img_points.append(img_pts) + _draw_grid(annotated, img_pts, self.grid_size) + self.last_spacing_px = _compute_spacing_px(img_pts, self.grid_size) + if self.result is not None: + K = np.array(self.result["camera_matrix"]) + dist = np.array(self.result["dist_coeffs"]) + self.live_metrics = _compute_live_metrics(obj_pts, img_pts, + K, dist) + + return annotated, found + + def clear(self) -> None: + self._obj_points.clear() + self._img_points.clear() + self._frame_idx = 0 + self.result = None + self.diagnostic_data = None + self.live_metrics = None + + def calibrate_in_thread(self) -> None: + if self.running or self.n_detected < 4: + return + self.running = True + self.error = False + self.result = None + Thread(target=self._run, daemon=True).start() + + def save(self, out_path: Path) -> None: + if self.result is None: + return + keys = ("camera_matrix", "dist_coeffs", "reprojection_error_px", + "image_size_wh", "n_images_used", "spacing_mm") + data = {k: self.result[k] for k in keys if k in self.result} + with open(out_path, "w") as f: + json.dump(data, f, indent=2) + + def _run(self) -> None: + try: + K, dist, rvecs, tvecs, err = _run_calibration( + self._obj_points, self._img_points, self._image_size) + d = dist.ravel() + w, h = self._image_size + _tang = float(np.sqrt(d[2] ** 2 + d[3] ** 2)) if len(d) >= 4 else 0.0 + self.result = {"camera_matrix": K.tolist(), "dist_coeffs": d.tolist(), + "reprojection_error_px": float(err), + "image_size_wh": list(self._image_size), + "n_images_used": self.n_detected, + "spacing_mm": self.spacing_mm, + "k1": float(d[0]), + "tangential": _tang, + "cx_offset_px": float(K[0, 2] - w / 2), + "cy_offset_px": float(K[1, 2] - h / 2)} + + self.diagnostic_data = _compute_diagnostic_data( + self._obj_points, self._img_points, + K, dist, rvecs, tvecs, self._image_size, + spacing_mm=self.measured_spacing_mm or self.spacing_mm, + grid_size=self.grid_size) + except Exception: + log.error("Camera calibration error", + exception=traceback.format_exc()) + self.error = True + finally: + self.running = False + + +def _compute_spacing_px(img_pts: np.ndarray, + grid_size: tuple[int, int] | None) -> float: + pts = img_pts.reshape(-1, 2) + if grid_size is not None: + cols, rows = grid_size + pg = pts.reshape(rows, cols, 2) + h = np.linalg.norm(np.diff(pg, axis=1), axis=2) + v = np.linalg.norm(np.diff(pg, axis=0), axis=2) + return float(np.mean(np.concatenate([h.ravel(), v.ravel()]))) + if len(pts) < 2: + return 0.0 + dists = np.linalg.norm(pts[:, None, :] - pts[None, :, :], axis=2) + np.fill_diagonal(dists, np.inf) + return float(np.median(np.min(dists, axis=1))) + + +def _make_blob_detector(spacing_px: float | None = None) -> cv2.SimpleBlobDetector: + params = cv2.SimpleBlobDetector_Params() + params.filterByColor = True + params.blobColor = 0 + params.filterByArea = True + if spacing_px is not None: + r = spacing_px / 4.0 + params.minArea = float(np.pi * (r / 2) ** 2) + params.maxArea = float(np.pi * (r * 2) ** 2) + else: + params.minArea = 20 + params.maxArea = 5000 + params.filterByCircularity = True + params.minCircularity = 0.85 + params.filterByConvexity = False + params.filterByInertia = False + return cv2.SimpleBlobDetector_create(params) + + +def _detect_grid(gray: np.ndarray, detector: cv2.SimpleBlobDetector, + spacing_mm: float, grid_size: tuple[int, int] | None + ) -> tuple[np.ndarray | None, np.ndarray | None]: + if grid_size is not None: + found, corners = cv2.findCirclesGrid(gray, grid_size, + flags=cv2.CALIB_CB_SYMMETRIC_GRID, + blobDetector=detector) + if not found: + return None, None + cols, rows = grid_size + obj_pts = np.zeros((cols * rows, 3), np.float32) + obj_pts[:, :2] = (np.mgrid[0:cols, 0:rows].T.reshape(-1, 2) * spacing_mm) + return obj_pts, corners + + return _cluster_grid(gray, detector, spacing_mm) + + +def _cluster_grid(gray: np.ndarray, detector: cv2.SimpleBlobDetector, + spacing_mm: float) -> tuple[np.ndarray | None, np.ndarray | None]: + keypoints = detector.detect(gray) + if len(keypoints) < 4: + return None, None + + pts = np.array([kp.pt for kp in keypoints], dtype=np.float32) + pts = pts[np.argsort(pts[:, 1])] + + gaps = np.diff(pts[:, 1]) + if len(gaps) == 0: + return None, None + threshold = np.median(gaps) * 5 + rows = np.split(pts, np.where(gaps > threshold)[0] + 1) + + row_lens = [len(r) for r in rows] + n_cols = int(np.median(row_lens)) + if n_cols < 2 or any(abs(n - n_cols) > 1 for n in row_lens): + return None, None + + img_pts, obj_pts = [], [] + for ri, row in enumerate(rows): + row = row[np.argsort(row[:, 0])] + if len(row) != n_cols: + continue + for ci, pt in enumerate(row): + img_pts.append(pt) + obj_pts.append([ci * spacing_mm, ri * spacing_mm, 0.0]) + + if len(img_pts) < 4: + return None, None + + return (np.array(obj_pts, np.float32), + np.array(img_pts, np.float32).reshape(-1, 1, 2)) + + +def _draw_grid(frame: np.ndarray, img_pts: np.ndarray, + grid_size: tuple[int, int] | None) -> None: + if grid_size is not None: + cv2.drawChessboardCorners(frame, grid_size, + img_pts.reshape(-1, 1, 2), True) + pts = img_pts.reshape(-1, 2) + for pt in pts: + x, y = int(pt[0]), int(pt[1]) + cv2.circle(frame, (x, y), 7, (0, 255, 255), -1) + cv2.circle(frame, (x, y), 7, (0, 0, 0), 1) + + +def _run_calibration(obj_points: list[np.ndarray], img_points: list[np.ndarray], + image_size: tuple[int, int]) -> tuple[np.ndarray, np.ndarray, list, list, float]: + _, K, dist, rvecs, tvecs = cv2.calibrateCamera(obj_points, img_points, + image_size, None, None) + total = sum(cv2.norm(ip, cv2.projectPoints(op, rv, tv, K, dist)[0], cv2.NORM_L2) / len(ip) + for op, ip, rv, tv in zip(obj_points, img_points, rvecs, tvecs)) + return K, dist, rvecs, tvecs, total / len(obj_points) + + +def _compute_diagnostic_data(obj_points: list[np.ndarray], + img_points: list[np.ndarray], + K: np.ndarray, dist: np.ndarray, + rvecs: list, tvecs: list, + image_size: tuple[int, int], + spacing_mm: float = 17.0, + grid_size: tuple[int, int] | None = None) -> dict: + all_pts, all_errs = [], [] + for op, ip, rv, tv in zip(obj_points, img_points, rvecs, tvecs): + proj, _ = cv2.projectPoints(op, rv, tv, K, dist) + detected = ip.reshape(-1, 2) + all_pts.append(detected) + all_errs.append(proj.reshape(-1, 2) - detected) + + pts = np.concatenate(all_pts) + errs = np.concatenate(all_errs) + + w, h = image_size + step = max(w, h) // 20 + gx, gy = np.meshgrid(np.arange(step // 2, w, step), + np.arange(step // 2, h, step)) + grid_pts = np.stack([gx.ravel(), gy.ravel()], axis=1).astype(np.float32) + undist_pts = cv2.undistortPoints( + grid_pts.reshape(-1, 1, 2), K, dist, P=K, + ).reshape(-1, 2) + + # Alignment metrics from last frame + last_ip = img_points[-1].reshape(-1, 2) + last_op = obj_points[-1] + _, rvec, tvec = cv2.solvePnP(last_op, last_ip.reshape(-1, 1, 2), K, dist) + R, _ = cv2.Rodrigues(rvec) + tilt_total = float(np.degrees(np.arccos(np.clip(abs(R[2, 2]), 0.0, 1.0)))) + tilt_x_deg = float(np.degrees(np.arcsin(np.clip(R[0, 2], -1.0, 1.0)))) + tilt_y_deg = float(np.degrees(np.arcsin(np.clip(R[1, 2], -1.0, 1.0)))) + + y_min, y_max = last_ip[:, 1].min(), last_ip[:, 1].max() + top_row = last_ip[last_ip[:, 1] < y_min + (y_max - y_min) * 0.2] + if len(top_row) >= 2: + top_row = top_row[np.argsort(top_row[:, 0])] + dx, dy = top_row[-1] - top_row[0] + roll_rad = float(np.arctan2(dy, dx)) + roll_deg = float(abs(np.degrees(roll_rad))) + else: + roll_rad, roll_deg = 0.0, 0.0 + roll_top_row = top_row if len(top_row) >= 2 else None + + # px/cm scale heatmap data + scale_data = None + if grid_size is not None: + cols, rows = grid_size + pts_grid = last_ip.reshape(rows, cols, 2) + cm_per_dot = spacing_mm / 10.0 + scale_vals, scale_pos = [], [] + for i in range(rows): + for j in range(cols): + neighbours = [] + if j + 1 < cols: + neighbours.append( + np.linalg.norm(pts_grid[i, j + 1] - pts_grid[i, j])) + if i + 1 < rows: + neighbours.append( + np.linalg.norm(pts_grid[i + 1, j] - pts_grid[i, j])) + if neighbours: + scale_vals.append(np.mean(neighbours) / cm_per_dot) + scale_pos.append(pts_grid[i, j]) + scale_data = {"vals": np.array(scale_vals), "pos": np.array(scale_pos)} + + return {"pts": pts, "errs": errs, "grid_pts": grid_pts, + "disp": undist_pts - grid_pts, "image_size": image_size, + "last_pts": last_ip, "grid_size": grid_size, + "tilt_total": tilt_total, "tilt_x_deg": tilt_x_deg, + "tilt_y_deg": tilt_y_deg, "roll_deg": roll_deg, + "roll_rad": roll_rad, "roll_top_row": roll_top_row, + "scale_data": scale_data} + + +def _compute_live_metrics(obj_pts: np.ndarray, img_pts: np.ndarray, + K: np.ndarray, dist: np.ndarray) -> dict: + _, rvec, tvec = cv2.solvePnP(obj_pts, img_pts, K, dist) + + proj, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, dist) + reproj_err = float(np.mean(np.linalg.norm( + proj.reshape(-1, 2) - img_pts.reshape(-1, 2), axis=1))) + + R, _ = cv2.Rodrigues(rvec) + tilt_deg = float(np.degrees(np.arccos(np.clip(abs(R[2, 2]), 0.0, 1.0)))) + tilt_x_deg = float(np.degrees(np.arcsin(np.clip(R[0, 2], -1.0, 1.0)))) + tilt_y_deg = float(np.degrees(np.arcsin(np.clip(R[1, 2], -1.0, 1.0)))) + + flat = img_pts.reshape(-1, 2) + y_min, y_max = flat[:, 1].min(), flat[:, 1].max() + top_row = flat[flat[:, 1] < y_min + (y_max - y_min) * 0.2] + if len(top_row) >= 2: + top_row = top_row[np.argsort(top_row[:, 0])] + dx, dy = top_row[-1] - top_row[0] + roll_deg = float(abs(np.degrees(np.arctan2(dy, dx)))) + else: + roll_deg = 0.0 + + return {"reproj_err": reproj_err, "tilt_deg": tilt_deg, + "tilt_x_deg": tilt_x_deg, "tilt_y_deg": tilt_y_deg, + "roll_deg": roll_deg} + + +def undistort_image(img: np.ndarray, K: np.ndarray, + dist: np.ndarray) -> np.ndarray: + # Undistort an image using the given camera matrix and + # distortion coefficients. + h, w = img.shape[:2] + new_K, _ = cv2.getOptimalNewCameraMatrix(K, dist, (w, h), 1, (w, h)) + return cv2.undistort(img, K, dist, None, new_K) + + +def undistort_points(points: np.ndarray, K: np.ndarray, + dist: np.ndarray) -> np.ndarray: + # Undistort 2D points using the given camera matrix and + # distortion coefficients. + """points: Nx2 array of (x, y).""" + pts = np.array(points, dtype=np.float32).reshape(-1, 1, 2) + return cv2.undistortPoints(pts, K, dist, P=K).reshape(-1, 2) + + +def default_result_path() -> Path: + return Path(settings.get("DATA_DIRECTORY")) / "camera_calibration.json" diff --git a/village/calibration/camera_calibration_grid.py b/village/calibration/camera_calibration_grid.py new file mode 100644 index 000000000..f37d917be --- /dev/null +++ b/village/calibration/camera_calibration_grid.py @@ -0,0 +1,72 @@ +import math +from pathlib import Path + +import matplotlib.pyplot as plt + + +def make_circle_grid(page_w_mm: float = 210.0, page_h_mm: float = 297.0, + spacing_mm: float = 50.0, circle_radius_mm: float = 10.0, + margin_mm: float = 5.0, + out_path: str = "calibration_grid.pdf", dpi: int = 300, + rotated_45: bool = False) -> Path: + mm_per_inch = 25.4 + + fig, ax = plt.subplots(figsize=(page_w_mm / mm_per_inch, + page_h_mm / mm_per_inch)) + ax.set_xlim(0, page_w_mm) + ax.set_ylim(0, page_h_mm) + ax.set_aspect("equal") + ax.axis("off") + ax.set_facecolor("white") + fig.patch.set_facecolor("white") + + draw_w = page_w_mm - 2 * margin_mm + draw_h = page_h_mm - 2 * margin_mm + + if rotated_45: + diag = spacing_mm / math.sqrt(2) + cx_center = page_w_mm / 2 + cy_center = page_h_mm / 2 + temp = max(draw_w, draw_h) * math.sqrt(2) + half_extent = math.ceil(temp / diag) + 2 + for row in range(-half_extent, half_extent + 1): + for col in range(-half_extent, half_extent + 1): + cx = cx_center + (col * diag - row * diag) + cy = cy_center + (col * diag + row * diag) + cond1 = (margin_mm - circle_radius_mm + <= cx <= page_w_mm - margin_mm + circle_radius_mm) + cond2 = (margin_mm - circle_radius_mm + <= cy <= page_h_mm - margin_mm + circle_radius_mm) + if cond1 and cond2: + ax.add_patch(plt.Circle((cx, cy), circle_radius_mm, + color="black")) + else: + cols = int(draw_w / spacing_mm) + rows = int(draw_h / spacing_mm) + for row in range(rows + 2): + ax.plot([margin_mm - spacing_mm, margin_mm + (cols + 1)* spacing_mm], + [margin_mm + row * spacing_mm] * 2, color="lightgray", + linewidth=1, zorder=1) + for col in range(cols + 1): + cx = margin_mm + col * spacing_mm + cy = margin_mm + row * spacing_mm + ax.add_patch(plt.Circle((cx, cy), circle_radius_mm, + color="black")) + ax.plot([margin_mm + col * spacing_mm] * 2, + [margin_mm - spacing_mm, margin_mm + (rows + 1) * spacing_mm], + color="lightgray", linewidth=1, zorder=1) + + ax.plot([cx - circle_radius_mm, cx + circle_radius_mm], + [cy, cy], color="w", linewidth=1, zorder=2) + ax.plot([cx, cx], [cy - circle_radius_mm, cy + circle_radius_mm], + color="w", linewidth=1, zorder=2) + + out = Path(out_path) + fig.savefig(out, dpi=dpi, bbox_inches="tight", pad_inches=0) + plt.close(fig) + return out + + +if __name__ == "__main__": + make_circle_grid() + make_circle_grid(rotated_45=True, out_path="calibration_grid_rotated.pdf") diff --git a/village/gui/camera_calibration_layout.py b/village/gui/camera_calibration_layout.py new file mode 100644 index 000000000..7d95ca982 --- /dev/null +++ b/village/gui/camera_calibration_layout.py @@ -0,0 +1,492 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +import cv2 +import matplotlib.pyplot as plt +import numpy as np +from PyQt5.QtGui import QImage, QPixmap +from PyQt5.QtWidgets import QFileDialog, QLabel + +from village.calibration.camera_calibration import (CameraCalibration, + default_result_path) +from village.calibration.camera_calibration_grid import make_circle_grid +from village.classes.enums import State +from village.devices.camera import cam_box +from village.gui.layout import Layout +from village.manager import manager +from village.scripts.utils import create_pixmap +from village.settings import settings + +if TYPE_CHECKING: + from village.gui.gui_window import GuiWindow + + +def _frame_to_pixmap(frame: np.ndarray) -> QPixmap: + rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + h, w = rgb.shape[:2] + return QPixmap.fromImage(QImage(rgb.data, w, h, w * 3, QImage.Format_RGB888)) + + +def _make_diagnostic_plots(diag: dict, width_in: float, + height_in: float) -> plt.Figure: + pts = diag["pts"] + errs = diag["errs"] + mags = np.linalg.norm(errs, axis=1) + grid_pts = diag["grid_pts"] + disp = diag["disp"] + disp_mag = np.linalg.norm(disp, axis=1) + w, h = diag["image_size"] + + dpi = int(settings.get("MATPLOTLIB_DPI")) + fig, axes = plt.subplots(2, 2, figsize=(width_in, height_in), dpi=dpi) + cb_args = dict(fraction=0.03, pad=0.03) + + tilt_x = diag.get("tilt_x_deg", 0.0) + tilt_y = diag.get("tilt_y_deg", 0.0) + tilt_t = diag.get("tilt_total", 0.0) + roll = diag.get("roll_deg", 0.0) + fig.suptitle(f"Tilt X {tilt_x:+.1f}° Tilt Y {tilt_y:+.1f}° " + f"Tilt total {tilt_t:.1f}° Roll {roll:+.1f}°", fontsize=9) + + # (0,0) residual reprojection error + ax = axes[0, 0] + ax.tricontourf(pts[:, 0], pts[:, 1], mags, levels=10, cmap="hot", alpha=0.25) + q = ax.quiver(pts[:, 0], pts[:, 1], errs[:, 0], errs[:, 1], mags, + cmap="hot", angles="xy", scale_units="xy", scale=0.05) + fig.colorbar(q, ax=ax, label="error (px)", **cb_args) + ax.set_xlim(0, w) + ax.set_ylim(h, 0) + ax.set_aspect("equal") + ax.axis("off") + + # (0,1) lens distortion field + ax = axes[0, 1] + q2 = ax.quiver(grid_pts[:, 0], grid_pts[:, 1], + disp[:, 0], disp[:, 1], disp_mag, + cmap="cool", angles="xy", scale_units="xy", scale=0.3) + fig.colorbar(q2, ax=ax, label="displacement (px)", **cb_args) + ax.set_xlim(0, w) + ax.set_ylim(h, 0) + ax.set_aspect("equal") + ax.axis("off") + + # (1,0) tilt + roll + ax = axes[1, 0] + last_pts = diag.get("last_pts") + grid_size = diag.get("grid_size") + if last_pts is not None and grid_size is not None: + cols, rows = grid_size + pg = last_pts.reshape(rows, cols, 2) + tl, tr = pg[0, 0], pg[0, cols - 1] + bl, br = pg[rows - 1, 0], pg[rows - 1, cols - 1] + center = np.mean([tl, tr, bl, br], axis=0) + avg_w = (np.linalg.norm(tr - tl) + np.linalg.norm(br - bl)) / 2 + avg_h = (np.linalg.norm(bl - tl) + np.linalg.norm(br - tr)) / 2 + rr = diag.get("roll_rad", 0.0) + cr, sr = np.cos(rr), np.sin(rr) + hw, hh = avg_w / 2, avg_h / 2 + ideal = np.array([center + [-cr * hw + sr * hh, -sr * hw - cr * hh], + center + [ cr * hw + sr * hh, sr * hw - cr * hh], + center + [ cr * hw - sr * hh, sr * hw + cr * hh], + center + [-cr * hw - sr * hh, -sr * hw + cr * hh]]) + ax.scatter(last_pts[:, 0], last_pts[:, 1], s=3, c="steelblue", zorder=3) + ax.plot(*np.vstack([tl, tr, br, bl, tl]).T, "r-", lw=2, + label=f"Actual (tilt {tilt_t:.1f}°)") + ax.plot(*np.vstack([ideal, ideal[0]]).T, "g--", lw=2, label="Ideal") + roll_row = diag.get("roll_top_row") + if roll_row is not None and len(roll_row) >= 2: + ax.scatter(roll_row[:, 0], roll_row[:, 1], + s=20, c="orange", zorder=5, label="Roll ref row") + ax.plot([roll_row[0, 0], roll_row[-1, 0]], + [roll_row[0, 1], roll_row[-1, 1]], + color="orange", lw=1.5, linestyle="--", zorder=4) + ax.legend(fontsize=7, ncols=3) + ax.set_xlim(0, w) + ax.set_ylim(h, 0) + ax.set_aspect("equal") + else: + ax.text(0.5, 0.5, "Grid size not set", ha="center", va="center", + transform=ax.transAxes, color="gray") + ax.axis("off") + + # (1,1) px/cm scale heatmap + ax = axes[1, 1] + scale_data = diag.get("scale_data") + if scale_data is not None and len(scale_data["vals"]) > 3: + sp = scale_data["pos"] + sv = scale_data["vals"] + tcf2 = ax.tricontourf(sp[:, 0], sp[:, 1], sv, levels=15, cmap="viridis") + fig.colorbar(tcf2, ax=ax, label="px / cm", **cb_args) + ax.set_xlim(0, w) + ax.set_ylim(h, 0) + ax.set_aspect("equal") + else: + ax.text(0.5, 0.5, "Scale not available\n(set grid size)", ha="center", + va="center", transform=ax.transAxes, color="gray") + ax.axis("off") + + plt.tight_layout() + return fig + + +class DiagnosticPlotsLayout(Layout): + def __init__(self, window: GuiWindow, rows: int, columns: int) -> None: + super().__init__(window, stacked=True, rows=rows, columns=columns) + self.rows = rows + self.columns = columns + self.draw() + + def draw(self) -> None: + self.plot_label = QLabel() + self.plot_label.setStyleSheet( + "QLabel {border: 1px solid gray; background-color: white;}" + ) + self.addWidget(self.plot_label, 0, 0, self.rows, self.columns) + dpi = int(settings.get("MATPLOTLIB_DPI")) + self.plot_width = (self.columns * self.column_width - 10) / dpi + self.plot_height = (self.rows * self.row_height - 5) / dpi + + def update(self, diag: dict) -> None: + try: + fig = _make_diagnostic_plots(diag, self.plot_width, + self.plot_height) + self.plot_label.setPixmap(create_pixmap(fig)) + plt.close(fig) + except Exception: + pass + + +class CameraCalibrationLayout(Layout): + def __init__(self, window: GuiWindow) -> None: + super().__init__(window) + manager.state = State.MANUAL_MODE + manager.changing_settings = False + self.draw() + + def draw(self) -> None: + self.camera_calibration_button.setDisabled(True) + + self._result_path: Path = default_result_path() + self._result: dict | None = None + self._calib: CameraCalibration | None = None + self._last_annotated: np.ndarray | None = None + + self.create_and_add_label("GRID GENERATION", 5, 2, 40, 2, "black", + description="Generate a printable symmetric circle grid.\n" + "Print it and verify spacing with a ruler.") + self.create_and_add_label("PAGE W (mm)", 8, 2, 14, 2, "black", bold=False, + description="Page width in mm (e.g. 210 for A4)") + self.page_w_edit = self.create_and_add_line_edit("210", 10, 2, 8, 2, lambda _: None) + + self.create_and_add_label("PAGE H (mm)", 8, 12, 14, 2, "black", bold=False, + description="Page height in mm (e.g. 297 for A4)") + self.page_h_edit = self.create_and_add_line_edit("297", 10, 12, 8, 2, lambda _: None) + + self.create_and_add_label("SPACING (mm)", 8, 22, 16, 2, "black", bold=False, + description="Centre-to-centre distance between circles in mm") + self.spacing_edit = self.create_and_add_line_edit("50", 10, 22, 8, 2, lambda _: None) + + self.create_and_add_label("DOT RADIUS (mm)", 8, 32, 18, 2, "black", bold=False, + description="Radius of each printed circle in mm") + self.dot_radius_edit = self.create_and_add_line_edit("10", 10, 32, 8, 2, lambda _: None) + + self.create_and_add_label("MARGIN (mm)", 8, 42, 14, 2, "black", bold=False, + description="Empty border around the grid in mm") + self.margin_edit = self.create_and_add_line_edit("0", 10, 42, 8, 2, lambda _: None) + + self.generate_button = self.create_and_add_button("GENERATE GRID PDF", 13, 2, 30, 2, + self._generate_grid_clicked, + "Generate a printable circle grid PDF", + "powderblue") + self.grid_status_label = self.create_and_add_label("", 16, 2, 50, 2, "gray", bold=False) + + self.create_and_add_label("CALIBRATION DETECTION", 19, 2, 50, 2, "black", + description="Parameters of the actual grid in front of the camera.\n" + "Rows/cols may differ from the printed sheet if you stitched several.") + self.create_and_add_label("GRID COLS", 21, 2, 14, 2, "black", bold=False, + description="Columns in the stitched calibration grid") + self.grid_cols_edit = self.create_and_add_line_edit("30", 23, 2, 8, 2, lambda _: None) + + self.create_and_add_label("GRID ROWS", 21, 12, 14, 2, "black", bold=False, + description="Rows in the stitched calibration grid") + self.grid_rows_edit = self.create_and_add_line_edit("18", 23, 12, 8, 2, lambda _: None) + + self.create_and_add_label("SPACING (px)", 21, 22, 16, 2, "black", bold=False, + description="Expected centre-to-centre dot spacing in pixels.\n" + "Used to tune the blob detector area bounds.") + self.spacing_px_edit = self.create_and_add_line_edit("17", 23, 22, 8, 2, lambda _: None) + + self.create_and_add_label("MEASURED SPACING (mm)", 21, 34, 22, 2, "black", bold=False, + description="Actual printed dot spacing measured with a ruler.\n" + "Leave blank to use SPACING (mm) above.\n" + "Only affects the px/cm scale heatmap.") + self.measured_spacing_edit = self.create_and_add_line_edit("", 23, 34, 8, 2, lambda _: None) + + self.capture_button = self.create_and_add_button("CAPTURE FRAME", 26, 2, 28, 2, + self._capture_clicked, + "Grab current frame, detect grid, and run calibration", + "powderblue") + self.load_image_button = self.create_and_add_button("LOAD IMAGE", 26, 32, 20, 2, + self._load_image_clicked, + "Load a PNG/JPG image file and use it as a calibration frame", + "powderblue") + self.clear_button = self.create_and_add_button("CLEAR FRAMES", 26, 54, 20, 2, + self._clear_clicked, + "Discard all captured frames and start over", + "lightyellow") + self.frames_label = self.create_and_add_label("Frames: 0", 29, 2, 30, 2, "black", bold=False) + + self.calib_status_label = self.create_and_add_label("", 32, 2, 95, 2, "gray", bold=False) + + self.result_label = self.create_and_add_label("", 35, 2, 95, 5, "black", bold=False) + + self.metrics_label = self.create_and_add_label("", 41, 2, 95, 10, "black", bold=False) + + self.save_button = self.create_and_add_button("SAVE JSON", 52, 2, 20, 2, + self._save_clicked, + "Save calibration result to camera_calibration.json", + "powderblue") + self.save_button.setDisabled(True) + + self.preview_label = QLabel() + self.preview_label.setStyleSheet( + "QLabel {border: 1px solid gray; background-color: black;}" + ) + self.addWidget(self.preview_label, 6, 103, 20, 95) + + self.plot_layout = DiagnosticPlotsLayout(self.window, 24, 95) + self.addLayout(self.plot_layout, 28, 103, 24, 95) + + self._calib = CameraCalibration(spacing_mm=self._spacing_value(), + grid_size=self._grid_size_value()) + + def _spacing_value(self) -> float: + try: + return float(self.spacing_edit.text()) + except (ValueError, AttributeError): + return 50.0 + + def _spacing_px_value(self) -> float | None: + try: + v = float(self.spacing_px_edit.text()) + return v if v > 0 else None + except (ValueError, AttributeError): + return None + + def _measured_spacing_value(self) -> float | None: + try: + v = float(self.measured_spacing_edit.text()) + return v if v > 0 else None + except (ValueError, AttributeError): + return None + + def _grid_size_value(self) -> tuple[int, int] | None: + try: + cols = int(self.grid_cols_edit.text()) + rows = int(self.grid_rows_edit.text()) + if cols > 0 and rows > 0: + return (cols, rows) + except (ValueError, AttributeError): + pass + return None + + def _generate_grid_clicked(self) -> None: + try: + page_w = float(self.page_w_edit.text()) + page_h = float(self.page_h_edit.text()) + spacing = float(self.spacing_edit.text()) + dot_radius = float(self.dot_radius_edit.text()) + margin = float(self.margin_edit.text()) + out = Path(settings.get("DATA_DIRECTORY")) / "calibration_grid.pdf" + make_circle_grid(page_w_mm=page_w, page_h_mm=page_h, + spacing_mm=spacing, circle_radius_mm=dot_radius, + margin_mm=margin, out_path=str(out)) + self.grid_status_label.setText(f"Saved: {out}") + self.grid_status_label.setStyleSheet( + "QLabel {color: green; font-weight: normal}") + except Exception: + self.grid_status_label.setText("Error generating grid") + self.grid_status_label.setStyleSheet( + "QLabel {color: red; font-weight: normal}") + + def _capture_clicked(self) -> None: + if self._calib.running: + return + frame = cam_box.frame + if frame is None: + self.calib_status_label.setText("No frame available") + return + self._process_frame(frame) + + def _process_frame(self, frame: np.ndarray) -> None: + self._last_annotated = None + self._calib.spacing_mm = self._spacing_value() + self._calib.spacing_px = self._spacing_px_value() + self._calib.measured_spacing_mm = self._measured_spacing_value() + self._calib.grid_size = self._grid_size_value() + annotated, found = self._calib.process_frame(frame) + + if found: + self._last_annotated = annotated + + px = _frame_to_pixmap(annotated) + if not px.isNull(): + self.preview_label.setPixmap(px.scaled(self.preview_label.width(), + self.preview_label.height(), 1)) + + if not found: + self.calib_status_label.setText("No grid detected in this frame") + self.calib_status_label.setStyleSheet( + "QLabel {color: orange; font-weight: normal}") + return + + n = self._calib.n_detected + self.frames_label.setText(f"Frames: {n}") + if n < 4: + self.calib_status_label.setText( + f"Grid detected, need {4 - n} more frame(s) to calibrate") + self.calib_status_label.setStyleSheet( + "QLabel {color: gray; font-weight: normal}") + return + + self._calib.calibrate_in_thread() + self.calib_status_label.setText(f"Calibrating… ({n} frames)") + self.calib_status_label.setStyleSheet( + "QLabel {color: gray; font-weight: normal}") + self.capture_button.setDisabled(True) + + def _load_image_clicked(self) -> None: + if self._calib.running: + return + path, _ = QFileDialog.getOpenFileName( + None, "Open calibration image", "", + "Images (*.png *.jpg *.jpeg *.bmp *.tif *.tiff)") + if not path: + return + frame = cv2.imread(path) + if frame is None: + self.calib_status_label.setText(f"Could not load image: {path}") + self.calib_status_label.setStyleSheet( + "QLabel {color: red; font-weight: normal}") + return + self._process_frame(frame) + + def _clear_clicked(self) -> None: + self._calib.clear() + self._result = None + self._last_annotated = None + self.frames_label.setText("Frames: 0") + self.calib_status_label.setText("Frames cleared") + self.calib_status_label.setStyleSheet("QLabel {color: gray; font-weight: normal}") + self.result_label.setText("") + self.metrics_label.setText("") + self.save_button.setDisabled(True) + self.capture_button.setEnabled(True) + + def _save_clicked(self) -> None: + if self._result is None: + return + try: + self._calib.save(self._result_path) + self.calib_status_label.setText(f"Saved: {self._result_path}") + except Exception: + self.calib_status_label.setText("Error saving result") + + def _show_result(self, result: dict) -> None: + d = result["dist_coeffs"] + k1 = d[0] + dtype = "barrel" if k1 < 0 else "pincushion" + level = ("minimal" if abs(k1) < 0.05 + else "moderate" if abs(k1) < 0.15 + else "strong") + err = result["reprojection_error_px"] + quality = "good" if err < 1.0 else "high -> retake frames" + text = (f"Frames used: {result['n_images_used']}\n" + f"Calib reproj error (all frames): {err:.4f} px ({quality})\n" + f"Distortion: {level} {dtype} (k1={k1:+.4f})\n" + f"k1={d[0]:+.4f} k2={d[1]:+.4f} " + f"p1={d[2]:+.4f} p2={d[3]:+.4f}\n" + f"Principal pt offset: " + f"({result['cx_offset_px']:+.1f} px, " + f"{result['cy_offset_px']:+.1f} px) " + f"tangential: {result['tangential']:.5f}") + self.result_label.setText(text) + self.calib_status_label.setText("Calibration complete") + self.calib_status_label.setStyleSheet( + "QLabel {color: green; font-weight: normal}") + self.save_button.setEnabled(True) + if self._calib.diagnostic_data is not None: + self.plot_layout.update(self._calib.diagnostic_data) + + def _show_live_metrics(self, m: dict) -> None: + def tag(v, good, ok): + return "GOOD" if v <= good else "OK " if v <= ok else "POOR" + + def hint_x(deg): + if abs(deg) < 0.5: + return "" + return "→ raise right" if deg > 0 else "→ raise left" + + def hint_y(deg): + if abs(deg) < 0.5: + return "" + return "→ raise back" if deg > 0 else "→ raise front" + + def hint_roll(deg): + if abs(deg) < 0.3: + return "" + return "→ rotate CW" if deg > 0 else "→ rotate CCW" + + tx = m.get("tilt_x_deg", 0.0) + ty = m.get("tilt_y_deg", 0.0) + roll = m["roll_deg"] + lines = ["Physical adjustment", + f" Tilt total: {m['tilt_deg']:5.2f}° " + f"[{tag(m['tilt_deg'], 2.0, 5.0)}] (good<2°, ok<5°)", + f" Tilt X (L/R): {tx:+6.2f}° " + f"[{tag(abs(tx), 2.0, 5.0)}] {hint_x(tx)}", + f" Tilt Y (F/B): {ty:+6.2f}° " + f"[{tag(abs(ty), 2.0, 5.0)}] {hint_y(ty)}", + f" Roll: {roll:5.2f}° " + f"[{tag(roll, 1.0, 3.0)}] {hint_roll(roll)}", + "Pose quality (latest frame vs calibrated model)", + f" Reproj error: {m['reproj_err']:5.3f} px " + f"[{tag(m['reproj_err'], 0.5, 1.5)}] (good<0.5, ok<1.5)", + ] + self.metrics_label.setText("\n".join(lines)) + + def update_gui(self) -> None: + if self._calib is None: + return + self.update_status_label_buttons() + + display = self._last_annotated if self._last_annotated is not None else cam_box.frame + if display is not None: + try: + px = _frame_to_pixmap(display) + if not px.isNull(): + self.preview_label.setPixmap( + px.scaled(self.preview_label.width(), + self.preview_label.height(), 1)) + except Exception: + pass + + if self._calib.live_metrics is not None: + self._show_live_metrics(self._calib.live_metrics) + + if self._calib.error: + self.calib_status_label.setText("Calibration failed, check log") + self.calib_status_label.setStyleSheet( + "QLabel {color: red; font-weight: normal}") + self.capture_button.setEnabled(True) + self._calib.error = False + return + + if (not self._calib.running + and self._calib.result is not self._result + and self._calib.result is not None): + self._result = self._calib.result + self._show_result(self._result) + self.capture_button.setEnabled(True) diff --git a/village/gui/gui_window.py b/village/gui/gui_window.py index d0f3619c9..30e53ce2a 100644 --- a/village/gui/gui_window.py +++ b/village/gui/gui_window.py @@ -101,6 +101,14 @@ def create_water_calibration_layout(self) -> None: self.layout = WaterCalibrationLayout(self) self.setLayout(self.layout) + def create_camera_calibration_layout(self) -> None: + """Switches the current view to the Camera Calibration Layout.""" + from village.gui.camera_calibration_layout import CameraCalibrationLayout + utils.delete_all_elements_from_layout(self.layout) + QObjectCleanupHandler().add(self.layout) + self.layout = CameraCalibrationLayout(self) + self.setLayout(self.layout) + def create_sound_calibration_layout(self) -> None: """Switches the current view to the Sound Calibration Layout.""" utils.delete_all_elements_from_layout(self.layout) diff --git a/village/gui/layout.py b/village/gui/layout.py index 39a662dec..a32f01e81 100644 --- a/village/gui/layout.py +++ b/village/gui/layout.py @@ -328,6 +328,16 @@ def create_common_elements(self) -> None: "Go to the setting menu", ) + self.camera_calibration_button = self.create_and_add_button( + "CAMERA CALIBRATION", + 1, + 7 * size, + size, + 2, + self.camera_calibration_button_clicked, + "Go to the camera calibration menu", + ) + self.online_plots_button = self.create_and_add_button( "ONLINE PLOTS", 1, @@ -538,6 +548,24 @@ def sound_calibration_button_clicked(self) -> None: text, ) + def camera_calibration_button_clicked(self) -> None: + """Handles camera calibration button click.""" + if self.change_layout(): + if manager.state in [State.WAIT, State.MANUAL_MODE]: + manager.state = State.MANUAL_MODE + manager.reset_subject_task_training() + self.close_online_plot_window() + self.window.create_camera_calibration_layout() + else: + text = ( + "Camera calibration is not available while a subject" + " is in the box, a detection is ongoing," + " or data is syncing." + ) + QMessageBox.information( + self.window, "CAMERA CALIBRATION", text + ) + def settings_button_clicked(self) -> None: """Handles settings button click.""" if self.change_layout(): From 4569168653b9b4617f73adbf239360403c8d7443 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Mon, 11 May 2026 17:15:08 +0200 Subject: [PATCH 02/18] autonomouse params --- village/custom_classes/auto_no_mouse_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index 032d665d8..a96166058 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -10,8 +10,8 @@ class AutoNoMouse_Base: To be subclassed in task folder. """ - def __init__(self) -> None: - self.task = Task() + def __init__(self, task: Task=None) -> None: + self.task = task self._thread: threading.Thread | None = None self._stop_event = threading.Event() self.trace: deque = deque(maxlen=25 * 5) From 7821c178320f5347c2173e43adcda7eb41df9d57 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Thu, 14 May 2026 14:54:09 +0200 Subject: [PATCH 03/18] write autonomouse positions instead of cam positions --- village/custom_classes/auto_no_mouse_base.py | 37 ++++++++++++++++++++ village/gui/monitor_layout.py | 7 ++++ 2 files changed, 44 insertions(+) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index a96166058..cd885e6a5 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -1,7 +1,9 @@ +import bisect import threading from collections import deque from village.custom_classes.task import Task +from village.scripts.time_utils import time_utils class AutoNoMouse_Base: @@ -18,20 +20,53 @@ def __init__(self, task: Task=None) -> None: self.position: tuple | None = None self.accuracy_left: float = 1.0 self.accuracy_right: float = 1.0 + self._position_log: list[tuple[float, int, int]] = [] def start(self) -> None: self._stop_event.clear() self.trace.clear() self.position = None + self._position_log.clear() self._set_overlay(self) self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() def stop(self) -> None: + if self._stop_event.is_set() and self.position is None: + return self._stop_event.set() self._set_overlay(None) self.trace.clear() self.position = None + self.inject_positions() + + def inject_positions(self) -> None: + """Hacky: (I did not want to modify cam_box code to support this) + Replaces cam_box position (x, y) lists with AutoNoMouse positions. + Called when AutoNoMouse stops, just before cam_box.stop_recording(), + so cam_box.save_csv() picks them up and records them.""" + cam = self.task.cam_box + if not self._position_log or not hasattr(cam, "camera_timestamps"): + return + + ts_log = [t for t, _, _ in self._position_log] + xs_log = [x for _, x, _ in self._position_log] + ys_log = [y for _, _, y in self._position_log] + + # match each virtual position timestamp to nearest + # cam ts, then inject (x, y). + new_x, new_y = [], [] + for ts in cam.camera_timestamps: + i = bisect.bisect_right(ts_log, ts) - 1 + if i >= 0: + new_x.append(xs_log[i]) + new_y.append(ys_log[i]) + else: + new_x.append(-1) + new_y.append(-1) + + cam.x_positions = new_x + cam.y_positions = new_y def _set_overlay(self, instance: "AutoNoMouse_Base | None") -> None: try: @@ -51,6 +86,7 @@ def _run(self) -> None: and self.task.current_trial <= self.task.maximum_number_of_trials ): self.run_trial() + self.stop() def run_trial(self) -> None: """Override in subclass. Sequence of actions to perform for @@ -83,6 +119,7 @@ def set_position(self, x: float, y: float) -> None: pt = (int(x), int(y)) self.position = pt self.trace.append(pt) + self._position_log.append((time_utils.now_timestamp(), pt[0], pt[1])) def wait(self, seconds: float) -> None: """Sleep for *seconds*, waking early if stop() is called.""" diff --git a/village/gui/monitor_layout.py b/village/gui/monitor_layout.py index ecda0a3f6..ff429c16c 100644 --- a/village/gui/monitor_layout.py +++ b/village/gui/monitor_layout.py @@ -460,6 +460,7 @@ def on_tab_changed(self, index: int) -> None: def update_gui(self) -> None: """Updates the GUI and its components.""" self.update_status_label_buttons() + self.page4Layout.update_gui() if manager.actions == Actions.CORRIDOR: self.page1Layout.update_gui() match manager.info: @@ -1281,6 +1282,12 @@ def auto_no_mouse_clicked(self) -> None: self.auto_no_mouse_button.setText("■ AutoNoMouse") self.auto_no_mouse_button.setStyleSheet("background-color: salmon;") + def update_gui(self) -> None: + if self._anm is not None and not self._anm.running: + self._anm = None + self.auto_no_mouse_button.setText("▶ AutoNoMouse") + self.auto_no_mouse_button.setStyleSheet("background-color: lightblue;") + def _inject_trials(self) -> None: p_l, p_r = self._get_p_left(), self._get_p_right() injector = manager.auto_no_mouse From 2213435f28d55418fd942db408f1e43433b0682d Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Thu, 14 May 2026 15:24:26 +0200 Subject: [PATCH 04/18] backup fixes --- village/calibration/camera_calibration.py | 2 +- village/custom_classes/auto_no_mouse_base.py | 4 +++- village/devices/led_strip.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/village/calibration/camera_calibration.py b/village/calibration/camera_calibration.py index f30592418..e8d4c4169 100644 --- a/village/calibration/camera_calibration.py +++ b/village/calibration/camera_calibration.py @@ -16,7 +16,7 @@ class CameraCalibration(CalibrationBase): name = "camera_calibration" - def __init__(self, spacing_mm: float, + def __init__(self, spacing_mm: float = 50, grid_size: tuple[int, int] | None = None) -> None: self.spacing_mm = spacing_mm self.measured_spacing_mm: float | None = None diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index cd885e6a5..8e6a275b8 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -12,7 +12,7 @@ class AutoNoMouse_Base: To be subclassed in task folder. """ - def __init__(self, task: Task=None) -> None: + def __init__(self, task: Task = None) -> None: self.task = task self._thread: threading.Thread | None = None self._stop_event = threading.Event() @@ -93,6 +93,7 @@ def run_trial(self) -> None: one trial, e.g. pokes and position updates.""" self.wait(1.0) + # FIXME: use *args, **kwargs instead of fixed params and subclassed anyway. def inject_trial( self, p_correct_left: float = 1.0, p_correct_right: float = 1.0 ) -> None: @@ -100,6 +101,7 @@ def inject_trial( Append one mock trial row directly to session_df.""" pass + # FIXME: use *args, **kwargs instead of fixed params def inject_trials( self, n: int, p_correct_left: float = 1.0, p_correct_right: float = 1.0 ) -> None: diff --git a/village/devices/led_strip.py b/village/devices/led_strip.py index c4a8c04e7..b98ee213e 100644 --- a/village/devices/led_strip.py +++ b/village/devices/led_strip.py @@ -2,7 +2,7 @@ from pi5neo import Pi5Neo -from village.classes.abstract_classes import LEDStripBase +from village.classes.null_classes import LEDStripBase from village.scripts.log import log From 3362fe57d97871b7ee385a88e853e064f5e8c33f Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Thu, 14 May 2026 16:26:49 +0200 Subject: [PATCH 05/18] wrap task info label in qscroll for long descriptions --- village/gui/tasks_layout.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/village/gui/tasks_layout.py b/village/gui/tasks_layout.py index 7ccda395f..80a5e63b4 100644 --- a/village/gui/tasks_layout.py +++ b/village/gui/tasks_layout.py @@ -17,6 +17,7 @@ QTabWidget, QVBoxLayout, QWidget, + QLabel, ) from village.classes.enums import State @@ -225,12 +226,23 @@ def select_task(self, cls: Type, name: str) -> None: manager.task.name, 0, 2, 60, 2, "black" ) self.name_label.setProperty("type", "optional") - self.info_label = self.central_sub_layout.create_and_add_label( - manager.task.info, 2, 2, 60, 30, "black" + + self.info_label = QLabel(manager.task.info) + self.info_label.setStyleSheet( + "QLabel { font-family: 'Courier New'; font-size: 8pt;" + " font-weight: normal; color: black; }" ) self.info_label.setWordWrap(True) self.info_label.setProperty("type", "optional") self.info_label.setAlignment(Qt.AlignTop | Qt.AlignLeft) + self.info_scroll = QScrollArea() + self.info_scroll.setWidget(self.info_label) + self.info_scroll.setWidgetResizable(True) + self.info_scroll.setProperty("type", "optional") + col_w = self.central_sub_layout.column_width + row_h = self.central_sub_layout.row_height + self.info_scroll.setFixedSize(60 * col_w, 30 * row_h) + self.central_sub_layout.addWidget(self.info_scroll, 2, 2, 30, 60) self.create_gui_properties(testing_training=False) def training_button_clicked(self) -> None: From 2b3474c6d6cb70424d3a40086127a8fc8ce5e16b Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Wed, 20 May 2026 15:30:03 +0200 Subject: [PATCH 06/18] Better (?) AutoNoMouse handling: custom parameters auto added to GUI (int, float and bool ok), support for importing more than one AutoNoMouse (if you have >1 in task folder) and assigned to correct task. Better inject_trials --- village/custom_classes/auto_no_mouse_base.py | 64 +++++-- village/gui/monitor_layout.py | 168 ++++++++++++------- village/manager.py | 11 +- village/scripts/import_all.py | 7 +- 4 files changed, 164 insertions(+), 86 deletions(-) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index 8e6a275b8..53f4954b5 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -1,26 +1,49 @@ import bisect -import threading +from dataclasses import dataclass +from threading import Event as ThEvent, Thread from collections import deque from village.custom_classes.task import Task from village.scripts.time_utils import time_utils +@dataclass +class AutonomouseParam: + """Descriptor for a single inject_trial keyword argument.""" + name: str # name passed to inject_trial + type_: type # float or int for now TODO: support bool, str, etc. if needed, or infer from default + default: float # default value + label: str # UI label text + min_val: float = 0.0 + max_val: float = 1.0 + tooltip: str = "" + + def clamp(self, val): + if self.type_ is bool: + return bool(val) + return max(self.min_val, min(self.max_val, self.type_(val))) + + class AutoNoMouse_Base: """Base class for automated task execution without a real animal. To be subclassed in task folder. """ + TASK_NAME: str = "" # to be set in subclass to restrict to a specific task + PARAMS: list[AutonomouseParam] = [] + def __init__(self, task: Task = None) -> None: self.task = task - self._thread: threading.Thread | None = None - self._stop_event = threading.Event() + self._thread: Thread | None = None + self._inject_thread: Thread | None = None + self._stop_event = ThEvent() + self._inject_stop_event = ThEvent() self.trace: deque = deque(maxlen=25 * 5) self.position: tuple | None = None - self.accuracy_left: float = 1.0 - self.accuracy_right: float = 1.0 self._position_log: list[tuple[float, int, int]] = [] + for param in self.PARAMS: + setattr(self, param.name, param.default) def start(self) -> None: self._stop_event.clear() @@ -28,7 +51,7 @@ def start(self) -> None: self.position = None self._position_log.clear() self._set_overlay(self) - self._thread = threading.Thread(target=self._run, daemon=True) + self._thread = Thread(target=self._run, daemon=True) self._thread.start() def stop(self) -> None: @@ -80,6 +103,14 @@ def _set_overlay(self, instance: "AutoNoMouse_Base | None") -> None: def running(self) -> bool: return self._thread is not None and self._thread.is_alive() + @property + def injecting(self) -> bool: + return (self._inject_thread is not None + and self._inject_thread.is_alive()) + + def stop_inject(self) -> None: + self._inject_stop_event.set() + def _run(self) -> None: while ( not self._stop_event.is_set() @@ -93,20 +124,21 @@ def run_trial(self) -> None: one trial, e.g. pokes and position updates.""" self.wait(1.0) - # FIXME: use *args, **kwargs instead of fixed params and subclassed anyway. - def inject_trial( - self, p_correct_left: float = 1.0, p_correct_right: float = 1.0 - ) -> None: + def inject_trial(self, *args, **kwargs) -> None: """Override in subclass. Append one mock trial row directly to session_df.""" pass - # FIXME: use *args, **kwargs instead of fixed params - def inject_trials( - self, n: int, p_correct_left: float = 1.0, p_correct_right: float = 1.0 - ) -> None: - for _ in range(n): - self.inject_trial(p_correct_left, p_correct_right) + def inject_trials(self, n: int, interval: float = 1.0, **kwargs) -> None: + def _run(): + self._inject_stop_event.clear() + for _ in range(n): + if self._inject_stop_event.is_set(): + break + self.inject_trial(**kwargs) + self._inject_stop_event.wait(interval) + self._inject_thread = Thread(target=_run, daemon=True) + self._inject_thread.start() def poke(self, port: int, duration: float = 0.1) -> None: """Simulate a nose-poke in and out on port.""" diff --git a/village/gui/monitor_layout.py b/village/gui/monitor_layout.py index ff429c16c..3b2dfb6cc 100644 --- a/village/gui/monitor_layout.py +++ b/village/gui/monitor_layout.py @@ -8,6 +8,7 @@ from PyQt5.QtCore import Qt, QTimer from PyQt5.QtGui import QColor, QFont, QFontMetrics, QPixmap from PyQt5.QtWidgets import ( + QCheckBox, QDialog, QHBoxLayout, QHeaderView, @@ -1126,52 +1127,32 @@ def draw(self) -> None: "Start/stop the automated virtual-mouse agent", color="lightblue", ) + + # Build custom autonomouse parameters automatically + # based on the PARAMS list. + self._col_auto = col_auto + self._inject_param_widgets: dict = {} + self._inject_param_row_start = row_auto + 2 + row_after = self._build_inject_param_widgets() + + # Inject button first, then N / Interval below it. + self.inject_button = self.create_and_add_button( + "Inject Trials", row_after + 2, col_auto, 15, 2, + self._inject_trials, "Inject N mock trials", "lightgreen") + self.create_and_add_label( - "p correct L", - row_auto + 2, - col_auto, - 10, - 2, - "black", - description="P(correct | reward LEFT)", - ) - self.p_left_edit = self.create_and_add_line_edit( - "0.80", row_auto + 2, col_auto + 10, 4, 2, lambda: None - ) - self.create_and_add_label( - "p correct R", - row_auto + 4, - col_auto, - 10, - 2, - "black", - description="P(correct | reward RIGHT)", - ) - self.p_right_edit = self.create_and_add_line_edit( - "0.80", row_auto + 4, col_auto + 10, 4, 2, lambda: None - ) - self.create_and_add_label( - "N inject", - row_auto + 6, - col_auto, - 10, - 2, - "black", - description="Number of mock trials to inject", - ) + "N inject", row_after + 4, col_auto, 10, 2, "black", + description="Number of mock trials to inject") + self.n_inject_edit = self.create_and_add_line_edit( - "10", row_auto + 6, col_auto + 10, 4, 2, lambda: None - ) - self.create_and_add_button( - "Inject Trials", - row_auto + 8, - col_auto, - 15, - 2, - self._inject_trials, - "Inject N trials using p correct L/R into session_df", - color="lightgreen", - ) + "300", row_after + 4, col_auto + 10, 4, 2, lambda: None) + + self.create_and_add_label( + "Interval (s)", row_after + 6, col_auto, 10, 2, "black", + description="Interval (in s) between trial injections") + + self.interval_inject_edit = self.create_and_add_line_edit( + "0.1", row_after + 6, col_auto + 10, 4, 2, lambda: None) else: row_touch = 4 @@ -1243,19 +1224,44 @@ def touch_clicked(self) -> None: self.x_line_edit.setText("0") self.y_line_edit.setText("0") - def _get_p(self, edit, default: float = 0.80) -> float: - try: - v = float(edit.text()) - return max(0.0, min(1.0, v)) - except ValueError: - edit.setText(str(default)) - return default - - def _get_p_left(self) -> float: - return self._get_p(self.p_left_edit) - - def _get_p_right(self) -> float: - return self._get_p(self.p_right_edit) + def _build_inject_param_widgets(self) -> int: + """Create one label and lineedit per PARAMS entry + and then returns next free row for next labels.""" + col_auto = self._col_auto + self._inject_param_widgets.clear() + row = self._inject_param_row_start + for param in manager.auto_no_mouse.PARAMS: + self.create_and_add_label( + param.label, row, col_auto, 10, 2, "black", + description=param.tooltip) + if param.type_ is bool: + widget = QCheckBox() + widget.setChecked(bool(param.default)) + widget.setToolTip(param.tooltip) + self.addWidget(widget, row, col_auto + 10, 2, 4) + else: + widget = self.create_and_add_line_edit( + str(param.default), row, col_auto + 10, 4, 2, lambda: None) + self._inject_param_widgets[param.name] = widget + row += 2 + return row + + def _get_inject_kwargs(self) -> dict: + result = {} + for param in manager.auto_no_mouse.PARAMS: + widget = self._inject_param_widgets.get(param.name) + if widget is None: + result[param.name] = param.default + continue + if isinstance(widget, QCheckBox): + result[param.name] = widget.isChecked() + else: + try: + result[param.name] = param.clamp(widget.text()) + except (ValueError, TypeError): + widget.setText(str(param.default)) + result[param.name] = param.default + return result def _get_n_inject(self) -> int: try: @@ -1265,19 +1271,31 @@ def _get_n_inject(self) -> int: self.n_inject_edit.setText("10") return 10 + def _get_interval_inject(self) -> float: + try: + v = float(self.interval_inject_edit.text()) + return max(0.0, v) + except ValueError: + self.interval_inject_edit.setText("1.0") + return 1.0 + def auto_no_mouse_clicked(self) -> None: """Toggle AutoNoMouse on/off.""" if self._anm is not None and self._anm.running: self._anm.stop() self._anm = None self.auto_no_mouse_button.setText("▶ AutoNoMouse") - self.auto_no_mouse_button.setStyleSheet("background-color: lightblue;") + self.auto_no_mouse_button.setStyleSheet( + "background-color: lightblue;") return self._anm = manager.auto_no_mouse self._anm.task = manager.task - self._anm.accuracy_left = self._get_p_left() - self._anm.accuracy_right = self._get_p_right() + # Update autonomouse parameters based on the current values in the GUI. + kwargs = self._get_inject_kwargs() + for param in self._anm.PARAMS: + if param.name in kwargs: + setattr(self._anm, param.name, kwargs[param.name]) self._anm.start() self.auto_no_mouse_button.setText("■ AutoNoMouse") self.auto_no_mouse_button.setStyleSheet("background-color: salmon;") @@ -1286,15 +1304,35 @@ def update_gui(self) -> None: if self._anm is not None and not self._anm.running: self._anm = None self.auto_no_mouse_button.setText("▶ AutoNoMouse") - self.auto_no_mouse_button.setStyleSheet("background-color: lightblue;") + self.auto_no_mouse_button.setStyleSheet( + "background-color: lightblue;") + injector = manager.auto_no_mouse + if hasattr(self, "inject_button"): + if not injector.injecting and self.inject_button.text() == "■ Stop Inject": + self.inject_button.setText("Inject Trials") + self.inject_button.setStyleSheet("background-color: lightgreen;") def _inject_trials(self) -> None: - p_l, p_r = self._get_p_left(), self._get_p_right() injector = manager.auto_no_mouse + if injector.injecting: + injector.stop_inject() + self.inject_button.setText("Inject Trials") + self.inject_button.setStyleSheet("background-color: lightgreen;") + return + # Stop autonomouse if running so run_trial doesn't conflict + if self._anm is not None and self._anm.running: + self._anm.stop() + self._anm = None + self.auto_no_mouse_button.setText("▶ AutoNoMouse") + self.auto_no_mouse_button.setStyleSheet( + "background-color: lightblue;") + kwargs = self._get_inject_kwargs() injector.task = manager.task - injector.accuracy_left = p_l - injector.accuracy_right = p_r - injector.inject_trials(self._get_n_inject(), p_l, p_r) + injector.inject_trials(self._get_n_inject(), + interval=self._get_interval_inject(), + **kwargs) + self.inject_button.setText("■ Stop Inject") + self.inject_button.setStyleSheet("background-color: salmon;") class FunctionsLayout(Layout): diff --git a/village/manager.py b/village/manager.py index 67c7e60e6..533039efb 100644 --- a/village/manager.py +++ b/village/manager.py @@ -101,7 +101,9 @@ def __init__(self) -> None: self.change_cycle: ChangeCycleBase = ChangeCycleBase() self.camera_trigger: CameraTriggerBase = CameraTriggerBase() self.camera_draw: CameraDrawBase = CameraDrawBase() - self.auto_no_mouse: AutoNoMouse_Base = AutoNoMouse_Base() + self._auto_no_mouse_instances: dict[str, AutoNoMouse_Base] = { + "": AutoNoMouse_Base() + } self.state: State = State.WAIT self.previous_state_wait: bool = True self.calibrating: bool = False @@ -172,6 +174,13 @@ def __init__(self) -> None: self.direct_functions: DirectFunctionsBase = DirectFunctionsBase() self.calibrations: Calibrations = Calibrations() + @property + def auto_no_mouse(self) -> AutoNoMouse_Base: + """Return the AutoNoMouse instance for the current task, or the generic one.""" + task_name = getattr(self.task, "name", "") + return (self._auto_no_mouse_instances.get(task_name) + or self._auto_no_mouse_instances.get("", AutoNoMouse_Base())) + def create_collections(self) -> None: """Creates and initializes data collections for events, summaries, and measurements.""" diff --git a/village/scripts/import_all.py b/village/scripts/import_all.py index eddbd5adc..aa7ecd8b1 100644 --- a/village/scripts/import_all.py +++ b/village/scripts/import_all.py @@ -154,10 +154,9 @@ def import_all(manager) -> None: camera_draw_correct = True elif issubclass(cls, AutoNoMouse_Base) and cls != AutoNoMouse_Base: auto_no_mouse_found += 1 - if auto_no_mouse_found == 1: - d = cls() - manager.auto_no_mouse = d - auto_no_mouse_correct = True + instance = cls() + manager._auto_no_mouse_instances[cls.TASK_NAME] = instance + auto_no_mouse_correct = True elif issubclass(cls, CalibrationBase) and cls not in ( CalibrationBase, CameraCalibration, From 83bfaf6a64142a7dc32902f10cc7d01d35a666c3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 1 Jun 2026 14:44:28 +0000 Subject: [PATCH 07/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- village/calibration/camera_calibration.py | 210 ++++--- .../calibration/camera_calibration_grid.py | 76 ++- village/custom_classes/auto_no_mouse_base.py | 18 +- village/gui/camera_calibration_layout.py | 531 +++++++++++++----- village/gui/monitor_layout.py | 59 +- village/gui/tasks_layout.py | 2 +- village/manager.py | 5 +- 7 files changed, 613 insertions(+), 288 deletions(-) diff --git a/village/calibration/camera_calibration.py b/village/calibration/camera_calibration.py index e8d4c4169..a04cc0a02 100644 --- a/village/calibration/camera_calibration.py +++ b/village/calibration/camera_calibration.py @@ -8,16 +8,17 @@ import cv2 import numpy as np +from village.custom_classes.calibration_base import CalibrationBase from village.scripts.log import log from village.settings import settings -from village.custom_classes.calibration_base import CalibrationBase class CameraCalibration(CalibrationBase): name = "camera_calibration" - def __init__(self, spacing_mm: float = 50, - grid_size: tuple[int, int] | None = None) -> None: + def __init__( + self, spacing_mm: float = 50, grid_size: tuple[int, int] | None = None + ) -> None: self.spacing_mm = spacing_mm self.measured_spacing_mm: float | None = None self.grid_size = grid_size @@ -62,8 +63,9 @@ def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, bool]: if self._image_size is None: self._image_size = (gray.shape[1], gray.shape[0]) - obj_pts, img_pts = _detect_grid(gray, self._detector, - self.spacing_mm, self.grid_size) + obj_pts, img_pts = _detect_grid( + gray, self._detector, self.spacing_mm, self.grid_size + ) found = obj_pts is not None if found: self._obj_points.append(obj_pts) @@ -73,8 +75,7 @@ def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, bool]: if self.result is not None: K = np.array(self.result["camera_matrix"]) dist = np.array(self.result["dist_coeffs"]) - self.live_metrics = _compute_live_metrics(obj_pts, img_pts, - K, dist) + self.live_metrics = _compute_live_metrics(obj_pts, img_pts, K, dist) return annotated, found @@ -97,8 +98,14 @@ def calibrate_in_thread(self) -> None: def save(self, out_path: Path) -> None: if self.result is None: return - keys = ("camera_matrix", "dist_coeffs", "reprojection_error_px", - "image_size_wh", "n_images_used", "spacing_mm") + keys = ( + "camera_matrix", + "dist_coeffs", + "reprojection_error_px", + "image_size_wh", + "n_images_used", + "spacing_mm", + ) data = {k: self.result[k] for k in keys if k in self.result} with open(out_path, "w") as f: json.dump(data, f, indent=2) @@ -106,35 +113,45 @@ def save(self, out_path: Path) -> None: def _run(self) -> None: try: K, dist, rvecs, tvecs, err = _run_calibration( - self._obj_points, self._img_points, self._image_size) + self._obj_points, self._img_points, self._image_size + ) d = dist.ravel() w, h = self._image_size _tang = float(np.sqrt(d[2] ** 2 + d[3] ** 2)) if len(d) >= 4 else 0.0 - self.result = {"camera_matrix": K.tolist(), "dist_coeffs": d.tolist(), - "reprojection_error_px": float(err), - "image_size_wh": list(self._image_size), - "n_images_used": self.n_detected, - "spacing_mm": self.spacing_mm, - "k1": float(d[0]), - "tangential": _tang, - "cx_offset_px": float(K[0, 2] - w / 2), - "cy_offset_px": float(K[1, 2] - h / 2)} + self.result = { + "camera_matrix": K.tolist(), + "dist_coeffs": d.tolist(), + "reprojection_error_px": float(err), + "image_size_wh": list(self._image_size), + "n_images_used": self.n_detected, + "spacing_mm": self.spacing_mm, + "k1": float(d[0]), + "tangential": _tang, + "cx_offset_px": float(K[0, 2] - w / 2), + "cy_offset_px": float(K[1, 2] - h / 2), + } self.diagnostic_data = _compute_diagnostic_data( - self._obj_points, self._img_points, - K, dist, rvecs, tvecs, self._image_size, - spacing_mm=self.measured_spacing_mm or self.spacing_mm, - grid_size=self.grid_size) + self._obj_points, + self._img_points, + K, + dist, + rvecs, + tvecs, + self._image_size, + spacing_mm=self.measured_spacing_mm or self.spacing_mm, + grid_size=self.grid_size, + ) except Exception: - log.error("Camera calibration error", - exception=traceback.format_exc()) + log.error("Camera calibration error", exception=traceback.format_exc()) self.error = True finally: self.running = False -def _compute_spacing_px(img_pts: np.ndarray, - grid_size: tuple[int, int] | None) -> float: +def _compute_spacing_px( + img_pts: np.ndarray, grid_size: tuple[int, int] | None +) -> float: pts = img_pts.reshape(-1, 2) if grid_size is not None: cols, rows = grid_size @@ -168,25 +185,29 @@ def _make_blob_detector(spacing_px: float | None = None) -> cv2.SimpleBlobDetect return cv2.SimpleBlobDetector_create(params) -def _detect_grid(gray: np.ndarray, detector: cv2.SimpleBlobDetector, - spacing_mm: float, grid_size: tuple[int, int] | None - ) -> tuple[np.ndarray | None, np.ndarray | None]: +def _detect_grid( + gray: np.ndarray, + detector: cv2.SimpleBlobDetector, + spacing_mm: float, + grid_size: tuple[int, int] | None, +) -> tuple[np.ndarray | None, np.ndarray | None]: if grid_size is not None: - found, corners = cv2.findCirclesGrid(gray, grid_size, - flags=cv2.CALIB_CB_SYMMETRIC_GRID, - blobDetector=detector) + found, corners = cv2.findCirclesGrid( + gray, grid_size, flags=cv2.CALIB_CB_SYMMETRIC_GRID, blobDetector=detector + ) if not found: return None, None cols, rows = grid_size obj_pts = np.zeros((cols * rows, 3), np.float32) - obj_pts[:, :2] = (np.mgrid[0:cols, 0:rows].T.reshape(-1, 2) * spacing_mm) + obj_pts[:, :2] = np.mgrid[0:cols, 0:rows].T.reshape(-1, 2) * spacing_mm return obj_pts, corners return _cluster_grid(gray, detector, spacing_mm) -def _cluster_grid(gray: np.ndarray, detector: cv2.SimpleBlobDetector, - spacing_mm: float) -> tuple[np.ndarray | None, np.ndarray | None]: +def _cluster_grid( + gray: np.ndarray, detector: cv2.SimpleBlobDetector, spacing_mm: float +) -> tuple[np.ndarray | None, np.ndarray | None]: keypoints = detector.detect(gray) if len(keypoints) < 4: return None, None @@ -217,15 +238,17 @@ def _cluster_grid(gray: np.ndarray, detector: cv2.SimpleBlobDetector, if len(img_pts) < 4: return None, None - return (np.array(obj_pts, np.float32), - np.array(img_pts, np.float32).reshape(-1, 1, 2)) + return ( + np.array(obj_pts, np.float32), + np.array(img_pts, np.float32).reshape(-1, 1, 2), + ) -def _draw_grid(frame: np.ndarray, img_pts: np.ndarray, - grid_size: tuple[int, int] | None) -> None: +def _draw_grid( + frame: np.ndarray, img_pts: np.ndarray, grid_size: tuple[int, int] | None +) -> None: if grid_size is not None: - cv2.drawChessboardCorners(frame, grid_size, - img_pts.reshape(-1, 1, 2), True) + cv2.drawChessboardCorners(frame, grid_size, img_pts.reshape(-1, 1, 2), True) pts = img_pts.reshape(-1, 2) for pt in pts: x, y = int(pt[0]), int(pt[1]) @@ -233,22 +256,32 @@ def _draw_grid(frame: np.ndarray, img_pts: np.ndarray, cv2.circle(frame, (x, y), 7, (0, 0, 0), 1) -def _run_calibration(obj_points: list[np.ndarray], img_points: list[np.ndarray], - image_size: tuple[int, int]) -> tuple[np.ndarray, np.ndarray, list, list, float]: - _, K, dist, rvecs, tvecs = cv2.calibrateCamera(obj_points, img_points, - image_size, None, None) - total = sum(cv2.norm(ip, cv2.projectPoints(op, rv, tv, K, dist)[0], cv2.NORM_L2) / len(ip) - for op, ip, rv, tv in zip(obj_points, img_points, rvecs, tvecs)) +def _run_calibration( + obj_points: list[np.ndarray], + img_points: list[np.ndarray], + image_size: tuple[int, int], +) -> tuple[np.ndarray, np.ndarray, list, list, float]: + _, K, dist, rvecs, tvecs = cv2.calibrateCamera( + obj_points, img_points, image_size, None, None + ) + total = sum( + cv2.norm(ip, cv2.projectPoints(op, rv, tv, K, dist)[0], cv2.NORM_L2) / len(ip) + for op, ip, rv, tv in zip(obj_points, img_points, rvecs, tvecs) + ) return K, dist, rvecs, tvecs, total / len(obj_points) -def _compute_diagnostic_data(obj_points: list[np.ndarray], - img_points: list[np.ndarray], - K: np.ndarray, dist: np.ndarray, - rvecs: list, tvecs: list, - image_size: tuple[int, int], - spacing_mm: float = 17.0, - grid_size: tuple[int, int] | None = None) -> dict: +def _compute_diagnostic_data( + obj_points: list[np.ndarray], + img_points: list[np.ndarray], + K: np.ndarray, + dist: np.ndarray, + rvecs: list, + tvecs: list, + image_size: tuple[int, int], + spacing_mm: float = 17.0, + grid_size: tuple[int, int] | None = None, +) -> dict: all_pts, all_errs = [], [] for op, ip, rv, tv in zip(obj_points, img_points, rvecs, tvecs): proj, _ = cv2.projectPoints(op, rv, tv, K, dist) @@ -261,11 +294,13 @@ def _compute_diagnostic_data(obj_points: list[np.ndarray], w, h = image_size step = max(w, h) // 20 - gx, gy = np.meshgrid(np.arange(step // 2, w, step), - np.arange(step // 2, h, step)) + gx, gy = np.meshgrid(np.arange(step // 2, w, step), np.arange(step // 2, h, step)) grid_pts = np.stack([gx.ravel(), gy.ravel()], axis=1).astype(np.float32) undist_pts = cv2.undistortPoints( - grid_pts.reshape(-1, 1, 2), K, dist, P=K, + grid_pts.reshape(-1, 1, 2), + K, + dist, + P=K, ).reshape(-1, 2) # Alignment metrics from last frame @@ -300,31 +335,44 @@ def _compute_diagnostic_data(obj_points: list[np.ndarray], neighbours = [] if j + 1 < cols: neighbours.append( - np.linalg.norm(pts_grid[i, j + 1] - pts_grid[i, j])) + np.linalg.norm(pts_grid[i, j + 1] - pts_grid[i, j]) + ) if i + 1 < rows: neighbours.append( - np.linalg.norm(pts_grid[i + 1, j] - pts_grid[i, j])) + np.linalg.norm(pts_grid[i + 1, j] - pts_grid[i, j]) + ) if neighbours: scale_vals.append(np.mean(neighbours) / cm_per_dot) scale_pos.append(pts_grid[i, j]) scale_data = {"vals": np.array(scale_vals), "pos": np.array(scale_pos)} - return {"pts": pts, "errs": errs, "grid_pts": grid_pts, - "disp": undist_pts - grid_pts, "image_size": image_size, - "last_pts": last_ip, "grid_size": grid_size, - "tilt_total": tilt_total, "tilt_x_deg": tilt_x_deg, - "tilt_y_deg": tilt_y_deg, "roll_deg": roll_deg, - "roll_rad": roll_rad, "roll_top_row": roll_top_row, - "scale_data": scale_data} - - -def _compute_live_metrics(obj_pts: np.ndarray, img_pts: np.ndarray, - K: np.ndarray, dist: np.ndarray) -> dict: + return { + "pts": pts, + "errs": errs, + "grid_pts": grid_pts, + "disp": undist_pts - grid_pts, + "image_size": image_size, + "last_pts": last_ip, + "grid_size": grid_size, + "tilt_total": tilt_total, + "tilt_x_deg": tilt_x_deg, + "tilt_y_deg": tilt_y_deg, + "roll_deg": roll_deg, + "roll_rad": roll_rad, + "roll_top_row": roll_top_row, + "scale_data": scale_data, + } + + +def _compute_live_metrics( + obj_pts: np.ndarray, img_pts: np.ndarray, K: np.ndarray, dist: np.ndarray +) -> dict: _, rvec, tvec = cv2.solvePnP(obj_pts, img_pts, K, dist) proj, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, dist) - reproj_err = float(np.mean(np.linalg.norm( - proj.reshape(-1, 2) - img_pts.reshape(-1, 2), axis=1))) + reproj_err = float( + np.mean(np.linalg.norm(proj.reshape(-1, 2) - img_pts.reshape(-1, 2), axis=1)) + ) R, _ = cv2.Rodrigues(rvec) tilt_deg = float(np.degrees(np.arccos(np.clip(abs(R[2, 2]), 0.0, 1.0)))) @@ -341,13 +389,16 @@ def _compute_live_metrics(obj_pts: np.ndarray, img_pts: np.ndarray, else: roll_deg = 0.0 - return {"reproj_err": reproj_err, "tilt_deg": tilt_deg, - "tilt_x_deg": tilt_x_deg, "tilt_y_deg": tilt_y_deg, - "roll_deg": roll_deg} + return { + "reproj_err": reproj_err, + "tilt_deg": tilt_deg, + "tilt_x_deg": tilt_x_deg, + "tilt_y_deg": tilt_y_deg, + "roll_deg": roll_deg, + } -def undistort_image(img: np.ndarray, K: np.ndarray, - dist: np.ndarray) -> np.ndarray: +def undistort_image(img: np.ndarray, K: np.ndarray, dist: np.ndarray) -> np.ndarray: # Undistort an image using the given camera matrix and # distortion coefficients. h, w = img.shape[:2] @@ -355,8 +406,7 @@ def undistort_image(img: np.ndarray, K: np.ndarray, return cv2.undistort(img, K, dist, None, new_K) -def undistort_points(points: np.ndarray, K: np.ndarray, - dist: np.ndarray) -> np.ndarray: +def undistort_points(points: np.ndarray, K: np.ndarray, dist: np.ndarray) -> np.ndarray: # Undistort 2D points using the given camera matrix and # distortion coefficients. """points: Nx2 array of (x, y).""" diff --git a/village/calibration/camera_calibration_grid.py b/village/calibration/camera_calibration_grid.py index f37d917be..2525c002d 100644 --- a/village/calibration/camera_calibration_grid.py +++ b/village/calibration/camera_calibration_grid.py @@ -4,15 +4,19 @@ import matplotlib.pyplot as plt -def make_circle_grid(page_w_mm: float = 210.0, page_h_mm: float = 297.0, - spacing_mm: float = 50.0, circle_radius_mm: float = 10.0, - margin_mm: float = 5.0, - out_path: str = "calibration_grid.pdf", dpi: int = 300, - rotated_45: bool = False) -> Path: +def make_circle_grid( + page_w_mm: float = 210.0, + page_h_mm: float = 297.0, + spacing_mm: float = 50.0, + circle_radius_mm: float = 10.0, + margin_mm: float = 5.0, + out_path: str = "calibration_grid.pdf", + dpi: int = 300, + rotated_45: bool = False, +) -> Path: mm_per_inch = 25.4 - fig, ax = plt.subplots(figsize=(page_w_mm / mm_per_inch, - page_h_mm / mm_per_inch)) + fig, ax = plt.subplots(figsize=(page_w_mm / mm_per_inch, page_h_mm / mm_per_inch)) ax.set_xlim(0, page_w_mm) ax.set_ylim(0, page_h_mm) ax.set_aspect("equal") @@ -33,33 +37,55 @@ def make_circle_grid(page_w_mm: float = 210.0, page_h_mm: float = 297.0, for col in range(-half_extent, half_extent + 1): cx = cx_center + (col * diag - row * diag) cy = cy_center + (col * diag + row * diag) - cond1 = (margin_mm - circle_radius_mm - <= cx <= page_w_mm - margin_mm + circle_radius_mm) - cond2 = (margin_mm - circle_radius_mm - <= cy <= page_h_mm - margin_mm + circle_radius_mm) + cond1 = ( + margin_mm - circle_radius_mm + <= cx + <= page_w_mm - margin_mm + circle_radius_mm + ) + cond2 = ( + margin_mm - circle_radius_mm + <= cy + <= page_h_mm - margin_mm + circle_radius_mm + ) if cond1 and cond2: - ax.add_patch(plt.Circle((cx, cy), circle_radius_mm, - color="black")) + ax.add_patch(plt.Circle((cx, cy), circle_radius_mm, color="black")) else: cols = int(draw_w / spacing_mm) rows = int(draw_h / spacing_mm) for row in range(rows + 2): - ax.plot([margin_mm - spacing_mm, margin_mm + (cols + 1)* spacing_mm], - [margin_mm + row * spacing_mm] * 2, color="lightgray", - linewidth=1, zorder=1) + ax.plot( + [margin_mm - spacing_mm, margin_mm + (cols + 1) * spacing_mm], + [margin_mm + row * spacing_mm] * 2, + color="lightgray", + linewidth=1, + zorder=1, + ) for col in range(cols + 1): cx = margin_mm + col * spacing_mm cy = margin_mm + row * spacing_mm - ax.add_patch(plt.Circle((cx, cy), circle_radius_mm, - color="black")) - ax.plot([margin_mm + col * spacing_mm] * 2, - [margin_mm - spacing_mm, margin_mm + (rows + 1) * spacing_mm], - color="lightgray", linewidth=1, zorder=1) + ax.add_patch(plt.Circle((cx, cy), circle_radius_mm, color="black")) + ax.plot( + [margin_mm + col * spacing_mm] * 2, + [margin_mm - spacing_mm, margin_mm + (rows + 1) * spacing_mm], + color="lightgray", + linewidth=1, + zorder=1, + ) - ax.plot([cx - circle_radius_mm, cx + circle_radius_mm], - [cy, cy], color="w", linewidth=1, zorder=2) - ax.plot([cx, cx], [cy - circle_radius_mm, cy + circle_radius_mm], - color="w", linewidth=1, zorder=2) + ax.plot( + [cx - circle_radius_mm, cx + circle_radius_mm], + [cy, cy], + color="w", + linewidth=1, + zorder=2, + ) + ax.plot( + [cx, cx], + [cy - circle_radius_mm, cy + circle_radius_mm], + color="w", + linewidth=1, + zorder=2, + ) out = Path(out_path) fig.savefig(out, dpi=dpi, bbox_inches="tight", pad_inches=0) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index 53f4954b5..cab2853e2 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -1,7 +1,8 @@ import bisect -from dataclasses import dataclass -from threading import Event as ThEvent, Thread from collections import deque +from dataclasses import dataclass +from threading import Event as ThEvent +from threading import Thread from village.custom_classes.task import Task from village.scripts.time_utils import time_utils @@ -10,10 +11,11 @@ @dataclass class AutonomouseParam: """Descriptor for a single inject_trial keyword argument.""" - name: str # name passed to inject_trial - type_: type # float or int for now TODO: support bool, str, etc. if needed, or infer from default + + name: str # name passed to inject_trial + type_: type # float or int for now TODO: support bool, str, etc. if needed, or infer from default default: float # default value - label: str # UI label text + label: str # UI label text min_val: float = 0.0 max_val: float = 1.0 tooltip: str = "" @@ -30,7 +32,7 @@ class AutoNoMouse_Base: To be subclassed in task folder. """ - TASK_NAME: str = "" # to be set in subclass to restrict to a specific task + TASK_NAME: str = "" # to be set in subclass to restrict to a specific task PARAMS: list[AutonomouseParam] = [] def __init__(self, task: Task = None) -> None: @@ -105,8 +107,7 @@ def running(self) -> bool: @property def injecting(self) -> bool: - return (self._inject_thread is not None - and self._inject_thread.is_alive()) + return self._inject_thread is not None and self._inject_thread.is_alive() def stop_inject(self) -> None: self._inject_stop_event.set() @@ -137,6 +138,7 @@ def _run(): break self.inject_trial(**kwargs) self._inject_stop_event.wait(interval) + self._inject_thread = Thread(target=_run, daemon=True) self._inject_thread.start() diff --git a/village/gui/camera_calibration_layout.py b/village/gui/camera_calibration_layout.py index 7d95ca982..ae4c267cd 100644 --- a/village/gui/camera_calibration_layout.py +++ b/village/gui/camera_calibration_layout.py @@ -9,8 +9,10 @@ from PyQt5.QtGui import QImage, QPixmap from PyQt5.QtWidgets import QFileDialog, QLabel -from village.calibration.camera_calibration import (CameraCalibration, - default_result_path) +from village.calibration.camera_calibration import ( + CameraCalibration, + default_result_path, +) from village.calibration.camera_calibration_grid import make_circle_grid from village.classes.enums import State from village.devices.camera import cam_box @@ -29,8 +31,7 @@ def _frame_to_pixmap(frame: np.ndarray) -> QPixmap: return QPixmap.fromImage(QImage(rgb.data, w, h, w * 3, QImage.Format_RGB888)) -def _make_diagnostic_plots(diag: dict, width_in: float, - height_in: float) -> plt.Figure: +def _make_diagnostic_plots(diag: dict, width_in: float, height_in: float) -> plt.Figure: pts = diag["pts"] errs = diag["errs"] mags = np.linalg.norm(errs, axis=1) @@ -47,14 +48,26 @@ def _make_diagnostic_plots(diag: dict, width_in: float, tilt_y = diag.get("tilt_y_deg", 0.0) tilt_t = diag.get("tilt_total", 0.0) roll = diag.get("roll_deg", 0.0) - fig.suptitle(f"Tilt X {tilt_x:+.1f}° Tilt Y {tilt_y:+.1f}° " - f"Tilt total {tilt_t:.1f}° Roll {roll:+.1f}°", fontsize=9) + fig.suptitle( + f"Tilt X {tilt_x:+.1f}° Tilt Y {tilt_y:+.1f}° " + f"Tilt total {tilt_t:.1f}° Roll {roll:+.1f}°", + fontsize=9, + ) # (0,0) residual reprojection error ax = axes[0, 0] ax.tricontourf(pts[:, 0], pts[:, 1], mags, levels=10, cmap="hot", alpha=0.25) - q = ax.quiver(pts[:, 0], pts[:, 1], errs[:, 0], errs[:, 1], mags, - cmap="hot", angles="xy", scale_units="xy", scale=0.05) + q = ax.quiver( + pts[:, 0], + pts[:, 1], + errs[:, 0], + errs[:, 1], + mags, + cmap="hot", + angles="xy", + scale_units="xy", + scale=0.05, + ) fig.colorbar(q, ax=ax, label="error (px)", **cb_args) ax.set_xlim(0, w) ax.set_ylim(h, 0) @@ -63,9 +76,17 @@ def _make_diagnostic_plots(diag: dict, width_in: float, # (0,1) lens distortion field ax = axes[0, 1] - q2 = ax.quiver(grid_pts[:, 0], grid_pts[:, 1], - disp[:, 0], disp[:, 1], disp_mag, - cmap="cool", angles="xy", scale_units="xy", scale=0.3) + q2 = ax.quiver( + grid_pts[:, 0], + grid_pts[:, 1], + disp[:, 0], + disp[:, 1], + disp_mag, + cmap="cool", + angles="xy", + scale_units="xy", + scale=0.3, + ) fig.colorbar(q2, ax=ax, label="displacement (px)", **cb_args) ax.set_xlim(0, w) ax.set_ylim(h, 0) @@ -87,28 +108,54 @@ def _make_diagnostic_plots(diag: dict, width_in: float, rr = diag.get("roll_rad", 0.0) cr, sr = np.cos(rr), np.sin(rr) hw, hh = avg_w / 2, avg_h / 2 - ideal = np.array([center + [-cr * hw + sr * hh, -sr * hw - cr * hh], - center + [ cr * hw + sr * hh, sr * hw - cr * hh], - center + [ cr * hw - sr * hh, sr * hw + cr * hh], - center + [-cr * hw - sr * hh, -sr * hw + cr * hh]]) + ideal = np.array( + [ + center + [-cr * hw + sr * hh, -sr * hw - cr * hh], + center + [cr * hw + sr * hh, sr * hw - cr * hh], + center + [cr * hw - sr * hh, sr * hw + cr * hh], + center + [-cr * hw - sr * hh, -sr * hw + cr * hh], + ] + ) ax.scatter(last_pts[:, 0], last_pts[:, 1], s=3, c="steelblue", zorder=3) - ax.plot(*np.vstack([tl, tr, br, bl, tl]).T, "r-", lw=2, - label=f"Actual (tilt {tilt_t:.1f}°)") + ax.plot( + *np.vstack([tl, tr, br, bl, tl]).T, + "r-", + lw=2, + label=f"Actual (tilt {tilt_t:.1f}°)", + ) ax.plot(*np.vstack([ideal, ideal[0]]).T, "g--", lw=2, label="Ideal") roll_row = diag.get("roll_top_row") if roll_row is not None and len(roll_row) >= 2: - ax.scatter(roll_row[:, 0], roll_row[:, 1], - s=20, c="orange", zorder=5, label="Roll ref row") - ax.plot([roll_row[0, 0], roll_row[-1, 0]], - [roll_row[0, 1], roll_row[-1, 1]], - color="orange", lw=1.5, linestyle="--", zorder=4) + ax.scatter( + roll_row[:, 0], + roll_row[:, 1], + s=20, + c="orange", + zorder=5, + label="Roll ref row", + ) + ax.plot( + [roll_row[0, 0], roll_row[-1, 0]], + [roll_row[0, 1], roll_row[-1, 1]], + color="orange", + lw=1.5, + linestyle="--", + zorder=4, + ) ax.legend(fontsize=7, ncols=3) ax.set_xlim(0, w) ax.set_ylim(h, 0) ax.set_aspect("equal") else: - ax.text(0.5, 0.5, "Grid size not set", ha="center", va="center", - transform=ax.transAxes, color="gray") + ax.text( + 0.5, + 0.5, + "Grid size not set", + ha="center", + va="center", + transform=ax.transAxes, + color="gray", + ) ax.axis("off") # (1,1) px/cm scale heatmap @@ -123,8 +170,15 @@ def _make_diagnostic_plots(diag: dict, width_in: float, ax.set_ylim(h, 0) ax.set_aspect("equal") else: - ax.text(0.5, 0.5, "Scale not available\n(set grid size)", ha="center", - va="center", transform=ax.transAxes, color="gray") + ax.text( + 0.5, + 0.5, + "Scale not available\n(set grid size)", + ha="center", + va="center", + transform=ax.transAxes, + color="gray", + ) ax.axis("off") plt.tight_layout() @@ -150,8 +204,7 @@ def draw(self) -> None: def update(self, diag: dict) -> None: try: - fig = _make_diagnostic_plots(diag, self.plot_width, - self.plot_height) + fig = _make_diagnostic_plots(diag, self.plot_width, self.plot_height) self.plot_label.setPixmap(create_pixmap(fig)) plt.close(fig) except Exception: @@ -173,81 +226,225 @@ def draw(self) -> None: self._calib: CameraCalibration | None = None self._last_annotated: np.ndarray | None = None - self.create_and_add_label("GRID GENERATION", 5, 2, 40, 2, "black", - description="Generate a printable symmetric circle grid.\n" - "Print it and verify spacing with a ruler.") - self.create_and_add_label("PAGE W (mm)", 8, 2, 14, 2, "black", bold=False, - description="Page width in mm (e.g. 210 for A4)") - self.page_w_edit = self.create_and_add_line_edit("210", 10, 2, 8, 2, lambda _: None) - - self.create_and_add_label("PAGE H (mm)", 8, 12, 14, 2, "black", bold=False, - description="Page height in mm (e.g. 297 for A4)") - self.page_h_edit = self.create_and_add_line_edit("297", 10, 12, 8, 2, lambda _: None) - - self.create_and_add_label("SPACING (mm)", 8, 22, 16, 2, "black", bold=False, - description="Centre-to-centre distance between circles in mm") - self.spacing_edit = self.create_and_add_line_edit("50", 10, 22, 8, 2, lambda _: None) - - self.create_and_add_label("DOT RADIUS (mm)", 8, 32, 18, 2, "black", bold=False, - description="Radius of each printed circle in mm") - self.dot_radius_edit = self.create_and_add_line_edit("10", 10, 32, 8, 2, lambda _: None) - - self.create_and_add_label("MARGIN (mm)", 8, 42, 14, 2, "black", bold=False, - description="Empty border around the grid in mm") - self.margin_edit = self.create_and_add_line_edit("0", 10, 42, 8, 2, lambda _: None) - - self.generate_button = self.create_and_add_button("GENERATE GRID PDF", 13, 2, 30, 2, - self._generate_grid_clicked, - "Generate a printable circle grid PDF", - "powderblue") - self.grid_status_label = self.create_and_add_label("", 16, 2, 50, 2, "gray", bold=False) - - self.create_and_add_label("CALIBRATION DETECTION", 19, 2, 50, 2, "black", - description="Parameters of the actual grid in front of the camera.\n" - "Rows/cols may differ from the printed sheet if you stitched several.") - self.create_and_add_label("GRID COLS", 21, 2, 14, 2, "black", bold=False, - description="Columns in the stitched calibration grid") - self.grid_cols_edit = self.create_and_add_line_edit("30", 23, 2, 8, 2, lambda _: None) - - self.create_and_add_label("GRID ROWS", 21, 12, 14, 2, "black", bold=False, - description="Rows in the stitched calibration grid") - self.grid_rows_edit = self.create_and_add_line_edit("18", 23, 12, 8, 2, lambda _: None) - - self.create_and_add_label("SPACING (px)", 21, 22, 16, 2, "black", bold=False, - description="Expected centre-to-centre dot spacing in pixels.\n" - "Used to tune the blob detector area bounds.") - self.spacing_px_edit = self.create_and_add_line_edit("17", 23, 22, 8, 2, lambda _: None) - - self.create_and_add_label("MEASURED SPACING (mm)", 21, 34, 22, 2, "black", bold=False, - description="Actual printed dot spacing measured with a ruler.\n" - "Leave blank to use SPACING (mm) above.\n" - "Only affects the px/cm scale heatmap.") - self.measured_spacing_edit = self.create_and_add_line_edit("", 23, 34, 8, 2, lambda _: None) - - self.capture_button = self.create_and_add_button("CAPTURE FRAME", 26, 2, 28, 2, - self._capture_clicked, - "Grab current frame, detect grid, and run calibration", - "powderblue") - self.load_image_button = self.create_and_add_button("LOAD IMAGE", 26, 32, 20, 2, - self._load_image_clicked, - "Load a PNG/JPG image file and use it as a calibration frame", - "powderblue") - self.clear_button = self.create_and_add_button("CLEAR FRAMES", 26, 54, 20, 2, - self._clear_clicked, - "Discard all captured frames and start over", - "lightyellow") - self.frames_label = self.create_and_add_label("Frames: 0", 29, 2, 30, 2, "black", bold=False) - - self.calib_status_label = self.create_and_add_label("", 32, 2, 95, 2, "gray", bold=False) - - self.result_label = self.create_and_add_label("", 35, 2, 95, 5, "black", bold=False) - - self.metrics_label = self.create_and_add_label("", 41, 2, 95, 10, "black", bold=False) - - self.save_button = self.create_and_add_button("SAVE JSON", 52, 2, 20, 2, - self._save_clicked, - "Save calibration result to camera_calibration.json", - "powderblue") + self.create_and_add_label( + "GRID GENERATION", + 5, + 2, + 40, + 2, + "black", + description="Generate a printable symmetric circle grid.\n" + "Print it and verify spacing with a ruler.", + ) + self.create_and_add_label( + "PAGE W (mm)", + 8, + 2, + 14, + 2, + "black", + bold=False, + description="Page width in mm (e.g. 210 for A4)", + ) + self.page_w_edit = self.create_and_add_line_edit( + "210", 10, 2, 8, 2, lambda _: None + ) + + self.create_and_add_label( + "PAGE H (mm)", + 8, + 12, + 14, + 2, + "black", + bold=False, + description="Page height in mm (e.g. 297 for A4)", + ) + self.page_h_edit = self.create_and_add_line_edit( + "297", 10, 12, 8, 2, lambda _: None + ) + + self.create_and_add_label( + "SPACING (mm)", + 8, + 22, + 16, + 2, + "black", + bold=False, + description="Centre-to-centre distance between circles in mm", + ) + self.spacing_edit = self.create_and_add_line_edit( + "50", 10, 22, 8, 2, lambda _: None + ) + + self.create_and_add_label( + "DOT RADIUS (mm)", + 8, + 32, + 18, + 2, + "black", + bold=False, + description="Radius of each printed circle in mm", + ) + self.dot_radius_edit = self.create_and_add_line_edit( + "10", 10, 32, 8, 2, lambda _: None + ) + + self.create_and_add_label( + "MARGIN (mm)", + 8, + 42, + 14, + 2, + "black", + bold=False, + description="Empty border around the grid in mm", + ) + self.margin_edit = self.create_and_add_line_edit( + "0", 10, 42, 8, 2, lambda _: None + ) + + self.generate_button = self.create_and_add_button( + "GENERATE GRID PDF", + 13, + 2, + 30, + 2, + self._generate_grid_clicked, + "Generate a printable circle grid PDF", + "powderblue", + ) + self.grid_status_label = self.create_and_add_label( + "", 16, 2, 50, 2, "gray", bold=False + ) + + self.create_and_add_label( + "CALIBRATION DETECTION", + 19, + 2, + 50, + 2, + "black", + description="Parameters of the actual grid in front of the camera.\n" + "Rows/cols may differ from the printed sheet if you stitched several.", + ) + self.create_and_add_label( + "GRID COLS", + 21, + 2, + 14, + 2, + "black", + bold=False, + description="Columns in the stitched calibration grid", + ) + self.grid_cols_edit = self.create_and_add_line_edit( + "30", 23, 2, 8, 2, lambda _: None + ) + + self.create_and_add_label( + "GRID ROWS", + 21, + 12, + 14, + 2, + "black", + bold=False, + description="Rows in the stitched calibration grid", + ) + self.grid_rows_edit = self.create_and_add_line_edit( + "18", 23, 12, 8, 2, lambda _: None + ) + + self.create_and_add_label( + "SPACING (px)", + 21, + 22, + 16, + 2, + "black", + bold=False, + description="Expected centre-to-centre dot spacing in pixels.\n" + "Used to tune the blob detector area bounds.", + ) + self.spacing_px_edit = self.create_and_add_line_edit( + "17", 23, 22, 8, 2, lambda _: None + ) + + self.create_and_add_label( + "MEASURED SPACING (mm)", + 21, + 34, + 22, + 2, + "black", + bold=False, + description="Actual printed dot spacing measured with a ruler.\n" + "Leave blank to use SPACING (mm) above.\n" + "Only affects the px/cm scale heatmap.", + ) + self.measured_spacing_edit = self.create_and_add_line_edit( + "", 23, 34, 8, 2, lambda _: None + ) + + self.capture_button = self.create_and_add_button( + "CAPTURE FRAME", + 26, + 2, + 28, + 2, + self._capture_clicked, + "Grab current frame, detect grid, and run calibration", + "powderblue", + ) + self.load_image_button = self.create_and_add_button( + "LOAD IMAGE", + 26, + 32, + 20, + 2, + self._load_image_clicked, + "Load a PNG/JPG image file and use it as a calibration frame", + "powderblue", + ) + self.clear_button = self.create_and_add_button( + "CLEAR FRAMES", + 26, + 54, + 20, + 2, + self._clear_clicked, + "Discard all captured frames and start over", + "lightyellow", + ) + self.frames_label = self.create_and_add_label( + "Frames: 0", 29, 2, 30, 2, "black", bold=False + ) + + self.calib_status_label = self.create_and_add_label( + "", 32, 2, 95, 2, "gray", bold=False + ) + + self.result_label = self.create_and_add_label( + "", 35, 2, 95, 5, "black", bold=False + ) + + self.metrics_label = self.create_and_add_label( + "", 41, 2, 95, 10, "black", bold=False + ) + + self.save_button = self.create_and_add_button( + "SAVE JSON", + 52, + 2, + 20, + 2, + self._save_clicked, + "Save calibration result to camera_calibration.json", + "powderblue", + ) self.save_button.setDisabled(True) self.preview_label = QLabel() @@ -259,8 +456,9 @@ def draw(self) -> None: self.plot_layout = DiagnosticPlotsLayout(self.window, 24, 95) self.addLayout(self.plot_layout, 28, 103, 24, 95) - self._calib = CameraCalibration(spacing_mm=self._spacing_value(), - grid_size=self._grid_size_value()) + self._calib = CameraCalibration( + spacing_mm=self._spacing_value(), grid_size=self._grid_size_value() + ) def _spacing_value(self) -> float: try: @@ -300,16 +498,23 @@ def _generate_grid_clicked(self) -> None: dot_radius = float(self.dot_radius_edit.text()) margin = float(self.margin_edit.text()) out = Path(settings.get("DATA_DIRECTORY")) / "calibration_grid.pdf" - make_circle_grid(page_w_mm=page_w, page_h_mm=page_h, - spacing_mm=spacing, circle_radius_mm=dot_radius, - margin_mm=margin, out_path=str(out)) + make_circle_grid( + page_w_mm=page_w, + page_h_mm=page_h, + spacing_mm=spacing, + circle_radius_mm=dot_radius, + margin_mm=margin, + out_path=str(out), + ) self.grid_status_label.setText(f"Saved: {out}") self.grid_status_label.setStyleSheet( - "QLabel {color: green; font-weight: normal}") + "QLabel {color: green; font-weight: normal}" + ) except Exception: self.grid_status_label.setText("Error generating grid") self.grid_status_label.setStyleSheet( - "QLabel {color: red; font-weight: normal}") + "QLabel {color: red; font-weight: normal}" + ) def _capture_clicked(self) -> None: if self._calib.running: @@ -333,43 +538,52 @@ def _process_frame(self, frame: np.ndarray) -> None: px = _frame_to_pixmap(annotated) if not px.isNull(): - self.preview_label.setPixmap(px.scaled(self.preview_label.width(), - self.preview_label.height(), 1)) + self.preview_label.setPixmap( + px.scaled(self.preview_label.width(), self.preview_label.height(), 1) + ) if not found: self.calib_status_label.setText("No grid detected in this frame") self.calib_status_label.setStyleSheet( - "QLabel {color: orange; font-weight: normal}") + "QLabel {color: orange; font-weight: normal}" + ) return n = self._calib.n_detected self.frames_label.setText(f"Frames: {n}") if n < 4: self.calib_status_label.setText( - f"Grid detected, need {4 - n} more frame(s) to calibrate") + f"Grid detected, need {4 - n} more frame(s) to calibrate" + ) self.calib_status_label.setStyleSheet( - "QLabel {color: gray; font-weight: normal}") + "QLabel {color: gray; font-weight: normal}" + ) return self._calib.calibrate_in_thread() self.calib_status_label.setText(f"Calibrating… ({n} frames)") self.calib_status_label.setStyleSheet( - "QLabel {color: gray; font-weight: normal}") + "QLabel {color: gray; font-weight: normal}" + ) self.capture_button.setDisabled(True) def _load_image_clicked(self) -> None: if self._calib.running: return path, _ = QFileDialog.getOpenFileName( - None, "Open calibration image", "", - "Images (*.png *.jpg *.jpeg *.bmp *.tif *.tiff)") + None, + "Open calibration image", + "", + "Images (*.png *.jpg *.jpeg *.bmp *.tif *.tiff)", + ) if not path: return frame = cv2.imread(path) if frame is None: self.calib_status_label.setText(f"Could not load image: {path}") self.calib_status_label.setStyleSheet( - "QLabel {color: red; font-weight: normal}") + "QLabel {color: red; font-weight: normal}" + ) return self._process_frame(frame) @@ -379,7 +593,9 @@ def _clear_clicked(self) -> None: self._last_annotated = None self.frames_label.setText("Frames: 0") self.calib_status_label.setText("Frames cleared") - self.calib_status_label.setStyleSheet("QLabel {color: gray; font-weight: normal}") + self.calib_status_label.setStyleSheet( + "QLabel {color: gray; font-weight: normal}" + ) self.result_label.setText("") self.metrics_label.setText("") self.save_button.setDisabled(True) @@ -398,24 +614,27 @@ def _show_result(self, result: dict) -> None: d = result["dist_coeffs"] k1 = d[0] dtype = "barrel" if k1 < 0 else "pincushion" - level = ("minimal" if abs(k1) < 0.05 - else "moderate" if abs(k1) < 0.15 - else "strong") + level = ( + "minimal" if abs(k1) < 0.05 else "moderate" if abs(k1) < 0.15 else "strong" + ) err = result["reprojection_error_px"] quality = "good" if err < 1.0 else "high -> retake frames" - text = (f"Frames used: {result['n_images_used']}\n" - f"Calib reproj error (all frames): {err:.4f} px ({quality})\n" - f"Distortion: {level} {dtype} (k1={k1:+.4f})\n" - f"k1={d[0]:+.4f} k2={d[1]:+.4f} " - f"p1={d[2]:+.4f} p2={d[3]:+.4f}\n" - f"Principal pt offset: " - f"({result['cx_offset_px']:+.1f} px, " - f"{result['cy_offset_px']:+.1f} px) " - f"tangential: {result['tangential']:.5f}") + text = ( + f"Frames used: {result['n_images_used']}\n" + f"Calib reproj error (all frames): {err:.4f} px ({quality})\n" + f"Distortion: {level} {dtype} (k1={k1:+.4f})\n" + f"k1={d[0]:+.4f} k2={d[1]:+.4f} " + f"p1={d[2]:+.4f} p2={d[3]:+.4f}\n" + f"Principal pt offset: " + f"({result['cx_offset_px']:+.1f} px, " + f"{result['cy_offset_px']:+.1f} px) " + f"tangential: {result['tangential']:.5f}" + ) self.result_label.setText(text) self.calib_status_label.setText("Calibration complete") self.calib_status_label.setStyleSheet( - "QLabel {color: green; font-weight: normal}") + "QLabel {color: green; font-weight: normal}" + ) self.save_button.setEnabled(True) if self._calib.diagnostic_data is not None: self.plot_layout.update(self._calib.diagnostic_data) @@ -442,18 +661,19 @@ def hint_roll(deg): tx = m.get("tilt_x_deg", 0.0) ty = m.get("tilt_y_deg", 0.0) roll = m["roll_deg"] - lines = ["Physical adjustment", - f" Tilt total: {m['tilt_deg']:5.2f}° " - f"[{tag(m['tilt_deg'], 2.0, 5.0)}] (good<2°, ok<5°)", - f" Tilt X (L/R): {tx:+6.2f}° " - f"[{tag(abs(tx), 2.0, 5.0)}] {hint_x(tx)}", - f" Tilt Y (F/B): {ty:+6.2f}° " - f"[{tag(abs(ty), 2.0, 5.0)}] {hint_y(ty)}", - f" Roll: {roll:5.2f}° " - f"[{tag(roll, 1.0, 3.0)}] {hint_roll(roll)}", - "Pose quality (latest frame vs calibrated model)", - f" Reproj error: {m['reproj_err']:5.3f} px " - f"[{tag(m['reproj_err'], 0.5, 1.5)}] (good<0.5, ok<1.5)", + lines = [ + "Physical adjustment", + f" Tilt total: {m['tilt_deg']:5.2f}° " + f"[{tag(m['tilt_deg'], 2.0, 5.0)}] (good<2°, ok<5°)", + f" Tilt X (L/R): {tx:+6.2f}° " + f"[{tag(abs(tx), 2.0, 5.0)}] {hint_x(tx)}", + f" Tilt Y (F/B): {ty:+6.2f}° " + f"[{tag(abs(ty), 2.0, 5.0)}] {hint_y(ty)}", + f" Roll: {roll:5.2f}° " + f"[{tag(roll, 1.0, 3.0)}] {hint_roll(roll)}", + "Pose quality (latest frame vs calibrated model)", + f" Reproj error: {m['reproj_err']:5.3f} px " + f"[{tag(m['reproj_err'], 0.5, 1.5)}] (good<0.5, ok<1.5)", ] self.metrics_label.setText("\n".join(lines)) @@ -462,14 +682,18 @@ def update_gui(self) -> None: return self.update_status_label_buttons() - display = self._last_annotated if self._last_annotated is not None else cam_box.frame + display = ( + self._last_annotated if self._last_annotated is not None else cam_box.frame + ) if display is not None: try: px = _frame_to_pixmap(display) if not px.isNull(): self.preview_label.setPixmap( - px.scaled(self.preview_label.width(), - self.preview_label.height(), 1)) + px.scaled( + self.preview_label.width(), self.preview_label.height(), 1 + ) + ) except Exception: pass @@ -479,14 +703,17 @@ def update_gui(self) -> None: if self._calib.error: self.calib_status_label.setText("Calibration failed, check log") self.calib_status_label.setStyleSheet( - "QLabel {color: red; font-weight: normal}") + "QLabel {color: red; font-weight: normal}" + ) self.capture_button.setEnabled(True) self._calib.error = False return - if (not self._calib.running - and self._calib.result is not self._result - and self._calib.result is not None): + if ( + not self._calib.running + and self._calib.result is not self._result + and self._calib.result is not None + ): self._result = self._calib.result self._show_result(self._result) self.capture_button.setEnabled(True) diff --git a/village/gui/monitor_layout.py b/village/gui/monitor_layout.py index 3b2dfb6cc..12049df92 100644 --- a/village/gui/monitor_layout.py +++ b/village/gui/monitor_layout.py @@ -1137,22 +1137,43 @@ def draw(self) -> None: # Inject button first, then N / Interval below it. self.inject_button = self.create_and_add_button( - "Inject Trials", row_after + 2, col_auto, 15, 2, - self._inject_trials, "Inject N mock trials", "lightgreen") + "Inject Trials", + row_after + 2, + col_auto, + 15, + 2, + self._inject_trials, + "Inject N mock trials", + "lightgreen", + ) self.create_and_add_label( - "N inject", row_after + 4, col_auto, 10, 2, "black", - description="Number of mock trials to inject") + "N inject", + row_after + 4, + col_auto, + 10, + 2, + "black", + description="Number of mock trials to inject", + ) self.n_inject_edit = self.create_and_add_line_edit( - "300", row_after + 4, col_auto + 10, 4, 2, lambda: None) + "300", row_after + 4, col_auto + 10, 4, 2, lambda: None + ) self.create_and_add_label( - "Interval (s)", row_after + 6, col_auto, 10, 2, "black", - description="Interval (in s) between trial injections") + "Interval (s)", + row_after + 6, + col_auto, + 10, + 2, + "black", + description="Interval (in s) between trial injections", + ) self.interval_inject_edit = self.create_and_add_line_edit( - "0.1", row_after + 6, col_auto + 10, 4, 2, lambda: None) + "0.1", row_after + 6, col_auto + 10, 4, 2, lambda: None + ) else: row_touch = 4 @@ -1232,8 +1253,8 @@ def _build_inject_param_widgets(self) -> int: row = self._inject_param_row_start for param in manager.auto_no_mouse.PARAMS: self.create_and_add_label( - param.label, row, col_auto, 10, 2, "black", - description=param.tooltip) + param.label, row, col_auto, 10, 2, "black", description=param.tooltip + ) if param.type_ is bool: widget = QCheckBox() widget.setChecked(bool(param.default)) @@ -1241,7 +1262,8 @@ def _build_inject_param_widgets(self) -> int: self.addWidget(widget, row, col_auto + 10, 2, 4) else: widget = self.create_and_add_line_edit( - str(param.default), row, col_auto + 10, 4, 2, lambda: None) + str(param.default), row, col_auto + 10, 4, 2, lambda: None + ) self._inject_param_widgets[param.name] = widget row += 2 return row @@ -1285,8 +1307,7 @@ def auto_no_mouse_clicked(self) -> None: self._anm.stop() self._anm = None self.auto_no_mouse_button.setText("▶ AutoNoMouse") - self.auto_no_mouse_button.setStyleSheet( - "background-color: lightblue;") + self.auto_no_mouse_button.setStyleSheet("background-color: lightblue;") return self._anm = manager.auto_no_mouse @@ -1304,8 +1325,7 @@ def update_gui(self) -> None: if self._anm is not None and not self._anm.running: self._anm = None self.auto_no_mouse_button.setText("▶ AutoNoMouse") - self.auto_no_mouse_button.setStyleSheet( - "background-color: lightblue;") + self.auto_no_mouse_button.setStyleSheet("background-color: lightblue;") injector = manager.auto_no_mouse if hasattr(self, "inject_button"): if not injector.injecting and self.inject_button.text() == "■ Stop Inject": @@ -1324,13 +1344,12 @@ def _inject_trials(self) -> None: self._anm.stop() self._anm = None self.auto_no_mouse_button.setText("▶ AutoNoMouse") - self.auto_no_mouse_button.setStyleSheet( - "background-color: lightblue;") + self.auto_no_mouse_button.setStyleSheet("background-color: lightblue;") kwargs = self._get_inject_kwargs() injector.task = manager.task - injector.inject_trials(self._get_n_inject(), - interval=self._get_interval_inject(), - **kwargs) + injector.inject_trials( + self._get_n_inject(), interval=self._get_interval_inject(), **kwargs + ) self.inject_button.setText("■ Stop Inject") self.inject_button.setStyleSheet("background-color: salmon;") diff --git a/village/gui/tasks_layout.py b/village/gui/tasks_layout.py index 80a5e63b4..87f53f14c 100644 --- a/village/gui/tasks_layout.py +++ b/village/gui/tasks_layout.py @@ -9,6 +9,7 @@ from PyQt5.QtGui import QFont from PyQt5.QtWidgets import ( QComboBox, + QLabel, QLineEdit, QListWidget, QListWidgetItem, @@ -17,7 +18,6 @@ QTabWidget, QVBoxLayout, QWidget, - QLabel, ) from village.classes.enums import State diff --git a/village/manager.py b/village/manager.py index 533039efb..40b1be716 100644 --- a/village/manager.py +++ b/village/manager.py @@ -178,8 +178,9 @@ def __init__(self) -> None: def auto_no_mouse(self) -> AutoNoMouse_Base: """Return the AutoNoMouse instance for the current task, or the generic one.""" task_name = getattr(self.task, "name", "") - return (self._auto_no_mouse_instances.get(task_name) - or self._auto_no_mouse_instances.get("", AutoNoMouse_Base())) + return self._auto_no_mouse_instances.get( + task_name + ) or self._auto_no_mouse_instances.get("", AutoNoMouse_Base()) def create_collections(self) -> None: """Creates and initializes data collections for events, summaries, From 95cf607ec4166502c164d91ac39f15d1647e2c39 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Mon, 1 Jun 2026 17:05:32 +0200 Subject: [PATCH 08/18] autonomouse live param update --- village/custom_classes/auto_no_mouse_base.py | 6 ++++++ village/gui/monitor_layout.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index cab2853e2..eb3a62acb 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -120,6 +120,12 @@ def _run(self) -> None: self.run_trial() self.stop() + def update_params(self, **kwargs) -> None: + """Update param attributes on a running instance.""" + for param in self.PARAMS: + if param.name in kwargs: + setattr(self, param.name, param.clamp(kwargs[param.name])) + def run_trial(self) -> None: """Override in subclass. Sequence of actions to perform for one trial, e.g. pokes and position updates.""" diff --git a/village/gui/monitor_layout.py b/village/gui/monitor_layout.py index 12049df92..d96af9894 100644 --- a/village/gui/monitor_layout.py +++ b/village/gui/monitor_layout.py @@ -1260,10 +1260,12 @@ def _build_inject_param_widgets(self) -> int: widget.setChecked(bool(param.default)) widget.setToolTip(param.tooltip) self.addWidget(widget, row, col_auto + 10, 2, 4) + widget.stateChanged.connect(self._on_param_changed) else: widget = self.create_and_add_line_edit( str(param.default), row, col_auto + 10, 4, 2, lambda: None ) + widget.editingFinished.connect(self._on_param_changed) self._inject_param_widgets[param.name] = widget row += 2 return row @@ -1332,6 +1334,10 @@ def update_gui(self) -> None: self.inject_button.setText("Inject Trials") self.inject_button.setStyleSheet("background-color: lightgreen;") + def _on_param_changed(self) -> None: + if self._anm is not None and self._anm.running: + self._anm.update_params(**self._get_inject_kwargs()) + def _inject_trials(self) -> None: injector = manager.auto_no_mouse if injector.injecting: From ebfdca2935ed2963735ca074fbcb72e3ca0dd0a1 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Mon, 1 Jun 2026 17:57:14 +0200 Subject: [PATCH 09/18] linting --- village/calibration/camera_calibration.py | 2 + village/custom_classes/auto_no_mouse_base.py | 7 ++-- village/gui/camera_calibration_layout.py | 42 +++++++++++--------- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/village/calibration/camera_calibration.py b/village/calibration/camera_calibration.py index a04cc0a02..dc726bf09 100644 --- a/village/calibration/camera_calibration.py +++ b/village/calibration/camera_calibration.py @@ -112,6 +112,8 @@ def save(self, out_path: Path) -> None: def _run(self) -> None: try: + if self._image_size is None: + return K, dist, rvecs, tvecs, err = _run_calibration( self._obj_points, self._img_points, self._image_size ) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index eb3a62acb..c4dfdad2f 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -13,7 +13,7 @@ class AutonomouseParam: """Descriptor for a single inject_trial keyword argument.""" name: str # name passed to inject_trial - type_: type # float or int for now TODO: support bool, str, etc. if needed, or infer from default + type_: type # for now supports float, int, or bool default: float # default value label: str # UI label text min_val: float = 0.0 @@ -35,7 +35,7 @@ class AutoNoMouse_Base: TASK_NAME: str = "" # to be set in subclass to restrict to a specific task PARAMS: list[AutonomouseParam] = [] - def __init__(self, task: Task = None) -> None: + def __init__(self, task: Task | None = None) -> None: self.task = task self._thread: Thread | None = None self._inject_thread: Thread | None = None @@ -107,7 +107,8 @@ def running(self) -> bool: @property def injecting(self) -> bool: - return self._inject_thread is not None and self._inject_thread.is_alive() + return (self._inject_thread is not None and + self._inject_thread.is_alive()) def stop_inject(self) -> None: self._inject_stop_event.set() diff --git a/village/gui/camera_calibration_layout.py b/village/gui/camera_calibration_layout.py index ae4c267cd..f78cd0cf4 100644 --- a/village/gui/camera_calibration_layout.py +++ b/village/gui/camera_calibration_layout.py @@ -28,10 +28,12 @@ def _frame_to_pixmap(frame: np.ndarray) -> QPixmap: rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w = rgb.shape[:2] - return QPixmap.fromImage(QImage(rgb.data, w, h, w * 3, QImage.Format_RGB888)) + return QPixmap.fromImage(QImage(rgb.data, w, h, w * 3, + QImage.Format_RGB888)) -def _make_diagnostic_plots(diag: dict, width_in: float, height_in: float) -> plt.Figure: +def _make_diagnostic_plots(diag: dict, + width_in: float, height_in: float) -> plt.Figure: pts = diag["pts"] errs = diag["errs"] mags = np.linalg.norm(errs, axis=1) @@ -56,7 +58,8 @@ def _make_diagnostic_plots(diag: dict, width_in: float, height_in: float) -> plt # (0,0) residual reprojection error ax = axes[0, 0] - ax.tricontourf(pts[:, 0], pts[:, 1], mags, levels=10, cmap="hot", alpha=0.25) + ax.tricontourf(pts[:, 0], pts[:, 1], mags, + levels=10, cmap="hot", alpha=0.25) q = ax.quiver( pts[:, 0], pts[:, 1], @@ -116,7 +119,8 @@ def _make_diagnostic_plots(diag: dict, width_in: float, height_in: float) -> plt center + [-cr * hw - sr * hh, -sr * hw + cr * hh], ] ) - ax.scatter(last_pts[:, 0], last_pts[:, 1], s=3, c="steelblue", zorder=3) + ax.scatter(last_pts[:, 0], last_pts[:, 1], + s=3, c="steelblue", zorder=3) ax.plot( *np.vstack([tl, tr, br, bl, tl]).T, "r-", @@ -164,7 +168,8 @@ def _make_diagnostic_plots(diag: dict, width_in: float, height_in: float) -> plt if scale_data is not None and len(scale_data["vals"]) > 3: sp = scale_data["pos"] sv = scale_data["vals"] - tcf2 = ax.tricontourf(sp[:, 0], sp[:, 1], sv, levels=15, cmap="viridis") + tcf2 = ax.tricontourf(sp[:, 0], sp[:, 1], + sv, levels=15, cmap="viridis") fig.colorbar(tcf2, ax=ax, label="px / cm", **cb_args) ax.set_xlim(0, w) ax.set_ylim(h, 0) @@ -204,7 +209,8 @@ def draw(self) -> None: def update(self, diag: dict) -> None: try: - fig = _make_diagnostic_plots(diag, self.plot_width, self.plot_height) + fig = _make_diagnostic_plots(diag, + self.plot_width, self.plot_height) self.plot_label.setPixmap(create_pixmap(fig)) plt.close(fig) except Exception: @@ -223,7 +229,6 @@ def draw(self) -> None: self._result_path: Path = default_result_path() self._result: dict | None = None - self._calib: CameraCalibration | None = None self._last_annotated: np.ndarray | None = None self.create_and_add_label( @@ -327,8 +332,8 @@ def draw(self) -> None: 50, 2, "black", - description="Parameters of the actual grid in front of the camera.\n" - "Rows/cols may differ from the printed sheet if you stitched several.", + description="Parameters of the real grid in front of the camera.\n" + "Rows/cols may differ from the printed sheet if stitched several.", ) self.create_and_add_label( "GRID COLS", @@ -456,7 +461,7 @@ def draw(self) -> None: self.plot_layout = DiagnosticPlotsLayout(self.window, 24, 95) self.addLayout(self.plot_layout, 28, 103, 24, 95) - self._calib = CameraCalibration( + self._calib: CameraCalibration = CameraCalibration( spacing_mm=self._spacing_value(), grid_size=self._grid_size_value() ) @@ -539,8 +544,8 @@ def _process_frame(self, frame: np.ndarray) -> None: px = _frame_to_pixmap(annotated) if not px.isNull(): self.preview_label.setPixmap( - px.scaled(self.preview_label.width(), self.preview_label.height(), 1) - ) + px.scaled(self.preview_label.width(), + self.preview_label.height(), 1)) if not found: self.calib_status_label.setText("No grid detected in this frame") @@ -615,7 +620,9 @@ def _show_result(self, result: dict) -> None: k1 = d[0] dtype = "barrel" if k1 < 0 else "pincushion" level = ( - "minimal" if abs(k1) < 0.05 else "moderate" if abs(k1) < 0.15 else "strong" + "minimal" if abs(k1) < 0.05 + else "moderate" if abs(k1) < 0.15 + else "strong" ) err = result["reprojection_error_px"] quality = "good" if err < 1.0 else "high -> retake frames" @@ -683,17 +690,16 @@ def update_gui(self) -> None: self.update_status_label_buttons() display = ( - self._last_annotated if self._last_annotated is not None else cam_box.frame + self._last_annotated if self._last_annotated is not None + else cam_box.frame ) if display is not None: try: px = _frame_to_pixmap(display) if not px.isNull(): self.preview_label.setPixmap( - px.scaled( - self.preview_label.width(), self.preview_label.height(), 1 - ) - ) + px.scaled(self.preview_label.width(), + self.preview_label.height(), 1)) except Exception: pass From 7331d554de322310248efac2d78655517b59ad2f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 1 Jun 2026 15:57:54 +0000 Subject: [PATCH 10/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- village/custom_classes/auto_no_mouse_base.py | 3 +- village/gui/camera_calibration_layout.py | 35 ++++++++------------ 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index c4dfdad2f..ddb879fa7 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -107,8 +107,7 @@ def running(self) -> bool: @property def injecting(self) -> bool: - return (self._inject_thread is not None and - self._inject_thread.is_alive()) + return self._inject_thread is not None and self._inject_thread.is_alive() def stop_inject(self) -> None: self._inject_stop_event.set() diff --git a/village/gui/camera_calibration_layout.py b/village/gui/camera_calibration_layout.py index f78cd0cf4..9bc9e59bc 100644 --- a/village/gui/camera_calibration_layout.py +++ b/village/gui/camera_calibration_layout.py @@ -28,12 +28,10 @@ def _frame_to_pixmap(frame: np.ndarray) -> QPixmap: rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w = rgb.shape[:2] - return QPixmap.fromImage(QImage(rgb.data, w, h, w * 3, - QImage.Format_RGB888)) + return QPixmap.fromImage(QImage(rgb.data, w, h, w * 3, QImage.Format_RGB888)) -def _make_diagnostic_plots(diag: dict, - width_in: float, height_in: float) -> plt.Figure: +def _make_diagnostic_plots(diag: dict, width_in: float, height_in: float) -> plt.Figure: pts = diag["pts"] errs = diag["errs"] mags = np.linalg.norm(errs, axis=1) @@ -58,8 +56,7 @@ def _make_diagnostic_plots(diag: dict, # (0,0) residual reprojection error ax = axes[0, 0] - ax.tricontourf(pts[:, 0], pts[:, 1], mags, - levels=10, cmap="hot", alpha=0.25) + ax.tricontourf(pts[:, 0], pts[:, 1], mags, levels=10, cmap="hot", alpha=0.25) q = ax.quiver( pts[:, 0], pts[:, 1], @@ -119,8 +116,7 @@ def _make_diagnostic_plots(diag: dict, center + [-cr * hw - sr * hh, -sr * hw + cr * hh], ] ) - ax.scatter(last_pts[:, 0], last_pts[:, 1], - s=3, c="steelblue", zorder=3) + ax.scatter(last_pts[:, 0], last_pts[:, 1], s=3, c="steelblue", zorder=3) ax.plot( *np.vstack([tl, tr, br, bl, tl]).T, "r-", @@ -168,8 +164,7 @@ def _make_diagnostic_plots(diag: dict, if scale_data is not None and len(scale_data["vals"]) > 3: sp = scale_data["pos"] sv = scale_data["vals"] - tcf2 = ax.tricontourf(sp[:, 0], sp[:, 1], - sv, levels=15, cmap="viridis") + tcf2 = ax.tricontourf(sp[:, 0], sp[:, 1], sv, levels=15, cmap="viridis") fig.colorbar(tcf2, ax=ax, label="px / cm", **cb_args) ax.set_xlim(0, w) ax.set_ylim(h, 0) @@ -209,8 +204,7 @@ def draw(self) -> None: def update(self, diag: dict) -> None: try: - fig = _make_diagnostic_plots(diag, - self.plot_width, self.plot_height) + fig = _make_diagnostic_plots(diag, self.plot_width, self.plot_height) self.plot_label.setPixmap(create_pixmap(fig)) plt.close(fig) except Exception: @@ -544,8 +538,8 @@ def _process_frame(self, frame: np.ndarray) -> None: px = _frame_to_pixmap(annotated) if not px.isNull(): self.preview_label.setPixmap( - px.scaled(self.preview_label.width(), - self.preview_label.height(), 1)) + px.scaled(self.preview_label.width(), self.preview_label.height(), 1) + ) if not found: self.calib_status_label.setText("No grid detected in this frame") @@ -620,9 +614,7 @@ def _show_result(self, result: dict) -> None: k1 = d[0] dtype = "barrel" if k1 < 0 else "pincushion" level = ( - "minimal" if abs(k1) < 0.05 - else "moderate" if abs(k1) < 0.15 - else "strong" + "minimal" if abs(k1) < 0.05 else "moderate" if abs(k1) < 0.15 else "strong" ) err = result["reprojection_error_px"] quality = "good" if err < 1.0 else "high -> retake frames" @@ -690,16 +682,17 @@ def update_gui(self) -> None: self.update_status_label_buttons() display = ( - self._last_annotated if self._last_annotated is not None - else cam_box.frame + self._last_annotated if self._last_annotated is not None else cam_box.frame ) if display is not None: try: px = _frame_to_pixmap(display) if not px.isNull(): self.preview_label.setPixmap( - px.scaled(self.preview_label.width(), - self.preview_label.height(), 1)) + px.scaled( + self.preview_label.width(), self.preview_label.height(), 1 + ) + ) except Exception: pass From c0f2d87abc46d3c268229cc7550191195a258041 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Mon, 1 Jun 2026 18:17:17 +0200 Subject: [PATCH 11/18] linting --- village/custom_classes/auto_no_mouse_base.py | 13 ++++++++++++- village/gui/camera_calibration_layout.py | 8 ++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index c4dfdad2f..0ac800922 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -5,6 +5,7 @@ from threading import Thread from village.custom_classes.task import Task +from village.devices.camera import Camera from village.scripts.time_utils import time_utils @@ -70,8 +71,10 @@ def inject_positions(self) -> None: Replaces cam_box position (x, y) lists with AutoNoMouse positions. Called when AutoNoMouse stops, just before cam_box.stop_recording(), so cam_box.save_csv() picks them up and records them.""" + if self.task is None: + return cam = self.task.cam_box - if not self._position_log or not hasattr(cam, "camera_timestamps"): + if not self._position_log or not isinstance(cam, Camera): return ts_log = [t for t, _, _ in self._position_log] @@ -94,6 +97,8 @@ def inject_positions(self) -> None: cam.y_positions = new_y def _set_overlay(self, instance: "AutoNoMouse_Base | None") -> None: + if self.task is None: + return try: itd = self.task.cam_box.items_to_draw itd["auto_no_mouse"] = instance is not None @@ -114,6 +119,8 @@ def stop_inject(self) -> None: self._inject_stop_event.set() def _run(self) -> None: + if self.task is None: + return while ( not self._stop_event.is_set() and self.task.current_trial <= self.task.maximum_number_of_trials @@ -151,12 +158,16 @@ def _run(): def poke(self, port: int, duration: float = 0.1) -> None: """Simulate a nose-poke in and out on port.""" + if self.task is None: + return self.task.bpod.manual_override_input(f"Port{port}In") self._stop_event.wait(duration) self.task.bpod.manual_override_input(f"Port{port}Out") def set_position(self, x: float, y: float) -> None: """Update the virtual animal's position and trace.""" + if self.task is None: + return self.task.current_x = x self.task.current_y = y pt = (int(x), int(y)) diff --git a/village/gui/camera_calibration_layout.py b/village/gui/camera_calibration_layout.py index f78cd0cf4..e0d78caf9 100644 --- a/village/gui/camera_calibration_layout.py +++ b/village/gui/camera_calibration_layout.py @@ -15,7 +15,7 @@ ) from village.calibration.camera_calibration_grid import make_circle_grid from village.classes.enums import State -from village.devices.camera import cam_box +from village.devices.camera import Camera, cam_box from village.gui.layout import Layout from village.manager import manager from village.scripts.utils import create_pixmap @@ -524,10 +524,10 @@ def _generate_grid_clicked(self) -> None: def _capture_clicked(self) -> None: if self._calib.running: return - frame = cam_box.frame - if frame is None: + if not isinstance(cam_box, Camera): self.calib_status_label.setText("No frame available") return + frame = cam_box.frame self._process_frame(frame) def _process_frame(self, frame: np.ndarray) -> None: @@ -691,7 +691,7 @@ def update_gui(self) -> None: display = ( self._last_annotated if self._last_annotated is not None - else cam_box.frame + else cam_box.frame if isinstance(cam_box, Camera) else None ) if display is not None: try: From 00158e14ac8b14c48b0187f0269382356cc03005 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:20:04 +0000 Subject: [PATCH 12/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- village/gui/camera_calibration_layout.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/village/gui/camera_calibration_layout.py b/village/gui/camera_calibration_layout.py index 8923efcbe..b4a4fa5ff 100644 --- a/village/gui/camera_calibration_layout.py +++ b/village/gui/camera_calibration_layout.py @@ -682,7 +682,8 @@ def update_gui(self) -> None: self.update_status_label_buttons() display = ( - self._last_annotated if self._last_annotated is not None + self._last_annotated + if self._last_annotated is not None else cam_box.frame if isinstance(cam_box, Camera) else None ) if display is not None: From 13bc189e8062663ab9e27c0f66f0658cd4369288 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Mon, 1 Jun 2026 18:22:15 +0200 Subject: [PATCH 13/18] linting --- village/scripts/import_all.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/village/scripts/import_all.py b/village/scripts/import_all.py index aa7ecd8b1..65ceff732 100644 --- a/village/scripts/import_all.py +++ b/village/scripts/import_all.py @@ -56,7 +56,7 @@ def import_all(manager) -> None: for cal_cls in (BpodWaterCalibration, SoundCalibration, CameraCalibration): instance = cal_cls() - cal_cls._instance = instance + setattr(cal_cls, '_instance', instance) setattr(manager.calibrations, instance.name, instance) for root, _, files in os.walk(directory): From fa17fe567b43240c2fc5066a092cc7ae1b08f3f6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 1 Jun 2026 16:23:38 +0000 Subject: [PATCH 14/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- village/scripts/import_all.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/village/scripts/import_all.py b/village/scripts/import_all.py index 65ceff732..b373d055a 100644 --- a/village/scripts/import_all.py +++ b/village/scripts/import_all.py @@ -56,7 +56,7 @@ def import_all(manager) -> None: for cal_cls in (BpodWaterCalibration, SoundCalibration, CameraCalibration): instance = cal_cls() - setattr(cal_cls, '_instance', instance) + setattr(cal_cls, "_instance", instance) setattr(manager.calibrations, instance.name, instance) for root, _, files in os.walk(directory): From 1d5d4b394125e0a7aea437292c98fdeacb93b198 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Tue, 2 Jun 2026 16:41:40 +0200 Subject: [PATCH 15/18] fix circular import --- village/custom_classes/auto_no_mouse_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index 94849c782..b018fa073 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -5,7 +5,6 @@ from threading import Thread from village.custom_classes.task import Task -from village.devices.camera import Camera from village.scripts.time_utils import time_utils @@ -74,7 +73,7 @@ def inject_positions(self) -> None: if self.task is None: return cam = self.task.cam_box - if not self._position_log or not isinstance(cam, Camera): + if not self._position_log or not hasattr(cam, "camera_timestamps"): return ts_log = [t for t, _, _ in self._position_log] From 5ebdff437a3c310ffea7c7550be2bdfb1ffff184 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Tue, 2 Jun 2026 16:48:32 +0200 Subject: [PATCH 16/18] hasattr checks again --- village/custom_classes/auto_no_mouse_base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/village/custom_classes/auto_no_mouse_base.py b/village/custom_classes/auto_no_mouse_base.py index b018fa073..ed2dcd47a 100644 --- a/village/custom_classes/auto_no_mouse_base.py +++ b/village/custom_classes/auto_no_mouse_base.py @@ -73,7 +73,11 @@ def inject_positions(self) -> None: if self.task is None: return cam = self.task.cam_box - if not self._position_log or not hasattr(cam, "camera_timestamps"): + if not self._position_log or not ( + hasattr(cam, "camera_timestamps") + and hasattr(cam, "x_positions") + and hasattr(cam, "y_positions") + ): return ts_log = [t for t, _, _ in self._position_log] From 01c0dcc80f84dd383ef8f58402c87ba8b238710c Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Mon, 8 Jun 2026 14:45:01 +0200 Subject: [PATCH 17/18] fix ledstrip pixel type --- village/devices/led_strip.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/village/devices/led_strip.py b/village/devices/led_strip.py index 8b4a5ea23..9b508bc84 100644 --- a/village/devices/led_strip.py +++ b/village/devices/led_strip.py @@ -1,7 +1,7 @@ import traceback from typing import Any -from pi5neo import Pi5Neo +from pi5neo import Pi5Neo, EPixelType from village.classes.null_classes import NullLEDStrip from village.scripts.log import log @@ -12,7 +12,7 @@ def get_led_strip( spi_device: str = "/dev/spidev0.0", num_leds: int = 10, spi_speed_khz: int = 800, - pixel_type: str = "GRB", + pixel_type: EPixelType = EPixelType.RGB, quiet_mode: bool = True, ) -> Any | NullLEDStrip: """Factory function to get an instance of the LED strip. @@ -21,7 +21,9 @@ def get_led_strip( spi_device (str): SPI device path num_leds (int): Number of LEDs in the strip spi_speed_khz (int): SPI speed in kHz - pixel_type (str): Type of LED pixels 'GRB' or 'GRBW' or 'RGB' or 'RGBW' + pixel_type (EPixelType): Color channel order of the pixels. One of + EPixelType.RGB / GRB / RGBW / GRBW (RGB/GRB are 3-channel, + RGBW/GRBW have a dedicated white channel). quiet_mode (bool): Whether to suppress output Returns: NullLEDStrip: An instance of the LED strip class or @@ -48,6 +50,6 @@ def get_led_strip( spi_device=settings.get("SPI_DEVICE"), num_leds=settings.get("NUMBER_OF_LEDS"), spi_speed_khz=settings.get("SPI_SPEED_KHZ"), - pixel_type=settings.get("PIXEL_TYPE").value, + pixel_type=EPixelType[settings.get("PIXEL_TYPE").name], quiet_mode=True, ) From 8de86b5f87e00b6c7084c2bbdcceeb06e712dca3 Mon Sep 17 00:00:00 2001 From: HeathenToaster Date: Tue, 9 Jun 2026 12:14:54 +0200 Subject: [PATCH 18/18] to test: changed paintEvent to drawForeground for DrawOnFrame --- village/devices/camera.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/village/devices/camera.py b/village/devices/camera.py index 8bef4c8a1..5454c20d9 100644 --- a/village/devices/camera.py +++ b/village/devices/camera.py @@ -83,14 +83,17 @@ def render_request(self, completed_request) -> None: self._frame_counter = 0 super().render_request(completed_request) - def paintEvent(self, event) -> None: - """Renders the camera frame then draws screen-only overlays.""" - super().paintEvent(event) - if self._cam is not None: - painter = QPainter(self) - self._cam.task_is_running = manager.state.task_is_running() - manager.camera_draw.draw_preview(self._cam, painter) - painter.end() + def drawForeground(self, painter: QPainter, rect) -> None: + """Draws screen-only overlays on top of the camera frame + (which is a QGraphicsView in QPicamera2!).""" + super().drawForeground(painter, rect) + if self._cam is None: + return + painter.save() + painter.resetTransform() + self._cam.task_is_running = manager.state.task_is_running() + manager.camera_draw.draw_preview(self._cam, painter) + painter.restore() # the camera class