feat(jobs): Phase 2 Job Persistence (DB-backed + Restart Recovery + Cleanup)#94
Conversation
JobStore Protocol mit DI als saubere Boundary zwischen PrintQueue und DB.
SQLite-Backed-Implementation persistiert jede State-Transition synchron.
PrintQueue.start() ruft mark_interrupted + list_pending fuer Restart-Recovery
(PRINTING -> FAILED printer_interrupted, QUEUED bleibt FIFO erhalten).
Neuer GET /api/batches/{id} Snapshot-Endpoint liefert Initial-State fuer
Hangar Result-Page (entblockt strausmann/hangar#81).
CleanupTask raeumt terminal Jobs aelter als retention_days (Default 30).
Brainstorming-Q&A Entscheidungen:
- Scope: Alles drei (Persist + Recovery + Snapshot)
- PRINTING-Recovery: FAILED mit printer_interrupted
- Write-Granularitaet: Jede Transition synchron
- Retention: Konfigurierbar, Default 30 Tage
- SSE-Replay: Snapshot-GET + Live-SSE (kein Event-Replay)
- Approach: JobStore-Protocol mit DI
Refs #93
10 Tasks mit TDD-Schritten: jobs_repo helpers, JobStore Protocol,
SQLiteJobStore, PrintQueue Persistierung, PrintService + create_queued,
Recovery in start(), CleanupTask, GET /api/batches/{id}, Lifespan,
manuelle Verification + PR.
Erratum-Update zur Spec im selben Commit (Errata-Sektion ergaenzt):
zwei Job-Klassen, jobs_repo Wiederverwendung, FAILED_RESTART semantic.
Refs #93
C1: recover_inflight_jobs() aus Lifespan-Startup muss entfernt werden —
kollidiert mit Phase-2-Recovery in PrintQueue.start()
C2: _jobs dict bleibt als In-Memory-Store für Dataclass-Jobs; wird nach
terminal bereinigt; JobStore ist parallel Source of Truth für DB
C3: Skip-Check if job.state != QUEUED muss VOR mark_printing im
Worker-Pseudocode explizit gezeigt werden
M1: BatchRead.created_by ist str (SSO-Email oder API-Key-ID), kein UUID
M2: list_by_ids akzeptiert list[str | UUID], normalisiert intern zu UUID
M3: BatchSummary.failed zählt FAILED + FAILED_RESTART zusammen
M4+Err5: _rerender_from_db_job nutzt LabelData.model_validate()
statt rohes dict; Payload-Schema explizit dokumentiert
M5+Err6: stop() ruft nach Dataclass-Transition auch
store.mark_failed(job.id, "shutdown") auf
M6: eigene list_pending(printer_id) in jobs_repo die QUEUED+PRINTING+
PAUSED zurückgibt; list_active nicht erweitern
m4: Router-Prefix @router.get("/{batch_id}") mit prefix="/api/batches"
— kein Doppel-/api in Route-Dekorator
Refs #93
…M8, m1) C1: job_store.py Import-Klarstellung (from sqlalchemy import update) C2+M8: async_session_factory-Fixture in Root-conftest statt Integration-conftest C3: Worker-Insert konkret mit Zeilennummern aus print_queue.py C4: LabelData.model_validate(payload["label_data"]) statt direktes Dict an renderer C5: BatchRead.created_by als str | None (PrintBatch.created_by ist str) C6: Fixture-Definitionen explizit — client statt auth_client, Fake-Auth-Pattern M1: Schritt-Reihenfolge korrigiert — bestehende Tests zuerst anpassen, dann neue gruen M3: hasattr-Anti-Pattern entfernt, Protocol-isinstance durch duck-typing ersetzt M4: from sqlalchemy import update (nicht from sqlmodel) M5: Lifespan-Snippet mit echtem async_session + PrintQueue(printers=[printer]) M6: asyncio.sleep(0.05) in CleanupTask-Tests durch stop()-Wait ersetzt M7: evict-Test nutzt direkte get()-Asserts statt list_active+get-Verkettung m1: Step 5a mit explizitem Edit-Schritt fuer delete/update Import ergaenzt Refs #93
- mark_printing_as_failed_restart(printer_id) — nur PRINTING affected - list_active(printer_id=None) — optionaler Filter - evict_terminal_older_than(age) — Cleanup-Helper Refs #93
- list_active: WHERE-Klauseln zuerst, danach order_by (Lesbarkeit) - evict_terminal_older_than: dokumentiere NULL-finished_at Verhalten Refs #93
JobStore ist die Persistierungs-Boundary die PrintQueue nutzt um Lifecycle-Transitionen zu speichern. MemoryJobStore ist die Test-Impl mit gleicher Semantik wie späterer SQLiteJobStore. Refs #93
- I-1: MemoryJobStore erbt explizit von JobStore (Protocol-Konformität für zukünftige Methoden-Erweiterungen durch mypy prüfbar) - I-2: _make_job mit vollständigen Typ-Annotationen; alle test_*-Funktionen mit -> None; assert-not-None-Guards vor union-attr-Zugriffen - I-3: mark_printing/mark_done/mark_failed Docstrings dokumentieren silent-no-op-Semantik bei fehlendem job_id (Race-against-eviction) - I-4: PRINTING- und DONE-Jobs in Tests über save_queued + mark_printing / mark_done eingebracht statt save_queued mit falschem State aufzurufen mypy --strict: 0 Fehler auf beiden Dateien Tests: 5/5 (job_store) + 706/706 (unit gesamt) grün Refs #93
Per-operation Sessions via async_sessionmaker. Implementiert das gleiche JobStore Protocol wie MemoryJobStore — Protocol-Conformance durch isinstance-Check verifiziert. async_session_factory Fixture in tests/conftest.py (root) damit Unit- und Integration-Tests sie sehen. Refs #93
- mark_printing/done/failed: pre-check statt try/except, logger.warning bei falschem State
- result={} Pattern in mark_done dokumentiert
- async_session_factory Fixture: FK-Disable Trade-off im Docstring begründet
Refs #93
…se 2) QUEUED -> PRINTING -> DONE/FAILED wird synchron in der DB persistiert. Konstruktor bekommt store: JobStore als optionalen Parameter (Default: MemoryJobStore für Rückwärtskompatibilität). Worker bridget dataclass.id (str) via UUID(job.id) an den Store. Skip-Check bleibt vor mark_printing. _jobs dict bleibt für laufende Drucke mit image_payload erhalten. Refs #93
Schließt Spec-Errata C2 und Code-Quality-Finding C-1. Plus Header-Docstring Phase-2-aktualisiert und __init__ Docstring zur store-Optional-Entscheidung ergänzt. Refs #93
…se 2) Neue submit_with_id() und submit_paused_with_id() in PrintQueue erlauben extern-generierte job_id. PrintService legt erst die DB-Row an (store.save_queued) und reicht die UUID durch — Hand-off ist atomisch und persistiert. submit_print_job gibt jetzt UUID statt str zurück; Route konvertiert zu str für Response-Schema-Kompatibilität. Refs #93
Adressiert C-1 (PAUSED als QUEUED in DB) und I-1 (kein Rollback). C-1: on_tape_mismatch=queue Pfad ruft save_queued nicht mehr auf — PAUSED-Jobs bleiben in-memory-only (Trade-off: Hub-Restart löscht sie, Phase 3 wird PAUSED in JobState enum aufnehmen). I-1: happy-path try/except um queue.submit_with_id ergänzt — bei failure wird DB-Job via mark_failed auf FAILED gesetzt damit keine stale QUEUED-Rows ohne Worker bleiben. Refs #93
mark_interrupted(printer_id) markiert verloren-gegangene PRINTING-Jobs als FAILED_RESTART. QUEUED-Jobs werden aus template_key+payload neu gerendert und in FIFO-Reihenfolge re-enqueued. PrintQueue.__init__ erhält optionale Parameter renderer + loader für Recovery. _rerender_from_db_job() wirft RuntimeError wenn nicht verdrahtet (sichere Fehlerbehandlung ohne renderer/loader). Bestehende Persistence-Tests konfigurieren mark_interrupted.return_value=0 und list_pending.return_value=[] damit start() mit AsyncMock-Stores sauber durchläuft. Refs #93
… und Race absichern C-1: KeyError in _rerender_from_db_job (fehlendes label_data) wird nun per try/except in der Recovery-Loop gefangen — betroffener Job wird als FAILED markiert, alle anderen QUEUED-Jobs laufen weiter. I-2: TemplateNotFoundError (gelöschtes Template) erhält die gleiche Behandlung im selben try/except-Block. I-1: self._running wird jetzt als erstes nach dem Guard gesetzt (vor dem ersten await), sodass ein gleichzeitiger zweiter start()-Aufruf sicher abgewiesen wird. try/finally setzt _running bei Recovery-Fehler zurück. S-1: db_job.state-Vergleich nutzt jetzt DbJobState.QUEUED.value statt String-Literal "queued" — konsistente StrEnum-Verwendung. Neue Tests: - test_recovery_skips_jobs_with_missing_label_data - test_recovery_skips_jobs_with_deleted_template Refs #93
…Phase 2) Background asyncio task läuft initial bei start() und dann alle 24h, ruft store.evict_terminal_older_than(retention). Fail-soft bei DB-Errors — Loop survives. Refs #93
- asyncio.TimeoutError → TimeoutError (built-in, py312) in stop() und _loop() - logger.warning-Zeile auf 4 Zeilen aufgeteilt (> 100 Zeichen) - unbenutztes `import asyncio` aus test_cleanup_task.py entfernt - -> None Return-Type auf alle async def test_*-Funktionen ergänzt - Single-Use-Hinweis in CleanupTask-Docstring ergänzt Refs #93
Liefert Batch-Metadaten + alle Job-States. summary.all_terminal sagt
Hangar ob noch ein SSE-Stream geöffnet werden muss. Job-Reihenfolge
entspricht batch.job_ids; cleanup-geister werden übersprungen.
- jobs_repo.list_by_ids() Bulk-Fetch via SQL IN
- BatchRead + BatchSummary Schemas (model_validator für all_terminal)
- GET /api/batches/{batch_id} Route mit require_read Auth
- Router in main.py registriert
- Integration-conftest patcht app.db.session.async_session (Task-8-Fix)
- ruff I001/E501/RUF100 in print_queue.py + print_service.py behoben
Entblockt strausmann/hangar#81.
Refs #93
- BatchSummary.cancelled: int als neues Pflichtfeld (ISSUE-1) - Endpoint zählt cancelled Jobs korrekt (CANCELLED-State) - Ghost-ID-Test-Assertion präzisiert: total == 1 statt < len() (ISSUE-2) - Neuer Test: failed-Counter zählt FAILED + FAILED_RESTART zusammen Refs #93
SQLiteJobStore aus async_session; CleanupTask läuft beim Start sofort und danach alle 24h. PrintQueue und PrintService erhalten store via DI für Recovery und Persistierung. recover_inflight_jobs() entfernt — PrintQueue.start() übernimmt Recovery mit korrekter QUEUED/PRINTING-Differenzierung (Spec R1-C1). Stub-Printer-Row für Mock-Backend / CI: wenn upsert_runtime_printer keinen Host-konfigurierten Drucker anlegt, wird eine Stub-Row mit eindeutigem slug=str(uuid) eingefügt damit jobs.printer_id (FK) nicht verletzt wird. Test test_phase6b_sse_with_batch angepasst (upsert statt blindes create, weil Stub-Row bereits existiert). Refs #93
M-1: Doppelten Kommentar-Block vor dem Stub-Row-if entfernt;
vollständiger innerer Block (mit slug-UNIQUE-Detail) bleibt.
N-1: Defensiven Idempotency-Kommentar am existing-Check ergänzt,
damit Mock-Path-Logik nicht als tote Logik erscheint.
N-2: Cleanup-Reihenfolge korrigiert: queue.stop() vor
cleanup_task.stop(), sodass kein SQLAlchemy-Warning entsteht
wenn CleanupTask während evict_terminal_older_than gecancelt wird.
Refs #93
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces robust job persistence to the printing service, transitioning from purely in-memory job tracking to a database-backed system. By integrating a Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #94 +/- ##
==========================================
- Coverage 90.20% 89.92% -0.29%
==========================================
Files 84 89 +5
Lines 3684 4008 +324
Branches 314 343 +29
==========================================
+ Hits 3323 3604 +281
- Misses 279 315 +36
- Partials 82 89 +7
... and 1 file with indirect coverage changes
Flags with carried forward coverage won't be shown. Click here to find out more. Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request implements Phase 2 of job persistence for the label printer hub, introducing a JobStore protocol with SQLite and in-memory implementations to persist print job state transitions. It refactors PrintQueue and PrintService to save jobs before submission, adds a restart-recovery mechanism to re-enqueue queued jobs upon startup, introduces a periodic CleanupTask to evict old terminal jobs, and adds a new GET /api/batches/{batch_id} snapshot endpoint. The reviewer's feedback focuses on improving robustness and efficiency, suggesting that database state transitions in the worker loop be wrapped in try-except blocks to prevent queue blockage, that potential TypeError and AttributeError exceptions be handled defensively during recovery, and that redundant database queries in SQLiteJobStore be eliminated. Additionally, the reviewer recommends explicitly awaiting canceled tasks in CleanupTask to avoid resource leaks and wrapping rollback database calls in PrintService to prevent exception masking.
| # Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str) | ||
| await self._store.mark_printing(UUID(job.id)) | ||
|
|
||
| try: | ||
| _from = job.state | ||
| JobStateMachine.transition(job, JobState.PRINTING) |
There was a problem hiding this comment.
Wenn self._store.mark_printing eine Exception auslöst (z. B. aufgrund einer temporären Datenbanksperre bei SQLite), wird diese außerhalb des try-Blocks nicht abgefangen. Dies führt dazu, dass der gesamte Worker-Task (_worker) unerwartet beendet wird und die Druckerwarteschlange dauerhaft blockiert. Es wird empfohlen, diesen Aufruf in den try-Block zu verschieben, damit Fehler abgefangen und protokolliert werden können, ohne den Worker-Loop zu beenden.
| # Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str) | |
| await self._store.mark_printing(UUID(job.id)) | |
| try: | |
| _from = job.state | |
| JobStateMachine.transition(job, JobState.PRINTING) | |
| try: | |
| # Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str) | |
| await self._store.mark_printing(UUID(job.id)) | |
| _from = job.state | |
| JobStateMachine.transition(job, JobState.PRINTING) |
| try: | ||
| image = await self._rerender_from_db_job(db_job) | ||
| except (KeyError, ValidationError, TemplateNotFoundError) as exc: | ||
| logger.warning( | ||
| "Recovery: Job %s rerender fehlgeschlagen (%s), FAILED", | ||
| db_job.id, | ||
| exc.__class__.__name__, | ||
| ) | ||
| await self._store.mark_failed(db_job.id, f"recovery_rerender_failed: {exc}") | ||
| continue |
There was a problem hiding this comment.
Falls db_job.payload in der Datenbank beschädigt oder None ist, kann _rerender_from_db_job eine TypeError auslösen (da None nicht indizierbar ist). Da TypeError nicht in der Liste der abgefangenen Ausnahmen enthalten ist, würde dies den gesamten Recovery-Prozess beim Start abbrechen. Es wird empfohlen, TypeError ebenfalls abzufangen.
| try: | |
| image = await self._rerender_from_db_job(db_job) | |
| except (KeyError, ValidationError, TemplateNotFoundError) as exc: | |
| logger.warning( | |
| "Recovery: Job %s rerender fehlgeschlagen (%s), FAILED", | |
| db_job.id, | |
| exc.__class__.__name__, | |
| ) | |
| await self._store.mark_failed(db_job.id, f"recovery_rerender_failed: {exc}") | |
| continue | |
| try: | |
| image = await self._rerender_from_db_job(db_job) | |
| except (KeyError, ValidationError, TemplateNotFoundError, TypeError) as exc: | |
| logger.warning( | |
| "Recovery: Job %s rerender fehlgeschlagen (%s), FAILED", | |
| db_job.id, | |
| exc.__class__.__name__, | |
| ) | |
| await self._store.mark_failed(db_job.id, f"recovery_rerender_failed: {exc}") | |
| continue |
| wrapper = Job( | ||
| id=str(db_job.id), | ||
| printer_id=db_job.printer_id, | ||
| image_payload=payload_bytes, | ||
| tape_mm=db_job.payload.get("tape_mm"), | ||
| options=db_job.payload.get("options", {}), | ||
| ) |
There was a problem hiding this comment.
Falls db_job.payload in der Datenbank aus irgendeinem Grund None ist (z. B. bei fehlerhaften oder unvollständigen Altdaten), führt der direkte Aufruf von db_job.payload.get(...) außerhalb des try-Blocks zu einem AttributeError und bringt den gesamten Recovery-Prozess zum Absturz. Es wird empfohlen, den Payload defensiv zu behandeln.
| wrapper = Job( | |
| id=str(db_job.id), | |
| printer_id=db_job.printer_id, | |
| image_payload=payload_bytes, | |
| tape_mm=db_job.payload.get("tape_mm"), | |
| options=db_job.payload.get("options", {}), | |
| ) | |
| payload = db_job.payload or {} | |
| wrapper = Job( | |
| id=str(db_job.id), | |
| printer_id=db_job.printer_id, | |
| image_payload=payload_bytes, | |
| tape_mm=payload.get("tape_mm"), | |
| options=payload.get("options", {}), | |
| ) |
| async def mark_printing(self, job_id: UUID) -> None: | ||
| """Transition QUEUED -> PRINTING. Silently no-op if job not found.""" | ||
| async with self._session_factory() as session: | ||
| job = await jobs_repo.get(session, job_id) | ||
| if job is None: | ||
| return # silent no-op (Protocol contract) | ||
| if job.state != JobState.QUEUED.value: | ||
| logger.warning( | ||
| "mark_printing called on job %s in state %s (expected queued)", | ||
| job_id, | ||
| job.state, | ||
| ) | ||
| return | ||
| await jobs_repo.mark_printing(session, job_id) |
There was a problem hiding this comment.
In SQLiteJobStore.mark_printing wird zuerst jobs_repo.get aufgerufen, um den Job-Status zu prüfen, und anschließend jobs_repo.mark_printing, welches intern den Job erneut aus der Datenbank lädt. Dies führt zu zwei redundanten Datenbankabfragen pro Zustandsübergang. Da jobs_repo.mark_printing bei Fehlern (Job nicht gefunden oder ungültiger Zustand) eine ValueError auslöst, kann dies effizienter mit einem einzigen try-except-Block gelöst werden. Das gleiche Muster lässt sich auch auf mark_done und mark_failed anwenden.
async def mark_printing(self, job_id: UUID) -> None:
"""Transition QUEUED -> PRINTING. Silently no-op if job not found."""
async with self._session_factory() as session:
try:
await jobs_repo.mark_printing(session, job_id)
except ValueError as exc:
if "not found" in str(exc):
return
logger.warning(str(exc))| async def mark_done(self, job_id: UUID) -> None: | ||
| """Transition PRINTING -> DONE. | ||
|
|
||
| Delegiert an jobs_repo.mark_done mit result={} — JobStore-Protocol | ||
| speichert kein structured result (Phase-2 YAGNI; ergänzbar via | ||
| `set_result` Methode wenn Hangar das später braucht). | ||
| """ | ||
| async with self._session_factory() as session: | ||
| job = await jobs_repo.get(session, job_id) | ||
| if job is None: | ||
| return # silent no-op (Protocol contract) | ||
| if job.state != JobState.PRINTING.value: | ||
| logger.warning( | ||
| "mark_done called on job %s in state %s (expected printing)", | ||
| job_id, | ||
| job.state, | ||
| ) | ||
| return | ||
| await jobs_repo.mark_done(session, job_id, result={}) |
There was a problem hiding this comment.
Analog zu mark_printing führt auch mark_done zwei redundante Datenbankabfragen durch. Durch die direkte Nutzung von jobs_repo.mark_done innerhalb eines try-except-Blocks lässt sich die Datenbanklast halbieren.
async def mark_done(self, job_id: UUID) -> None:
"""Transition PRINTING -> DONE.
Delegiert an jobs_repo.mark_done mit result={} — JobStore-Protocol
speichert kein structured result (Phase-2 YAGNI; ergänzbar via
`set_result` Methode wenn Hangar das später braucht).
"""
async with self._session_factory() as session:
try:
await jobs_repo.mark_done(session, job_id, result={})
except ValueError as exc:
if "not found" in str(exc):
return
logger.warning(str(exc))| async def mark_failed(self, job_id: UUID, error: str) -> None: | ||
| """Transition any non-terminal -> FAILED. Silently no-op if job not found.""" | ||
| _terminal = { | ||
| JobState.DONE.value, | ||
| JobState.FAILED.value, | ||
| JobState.CANCELLED.value, | ||
| JobState.FAILED_RESTART.value, | ||
| } | ||
| async with self._session_factory() as session: | ||
| job = await jobs_repo.get(session, job_id) | ||
| if job is None: | ||
| return # silent no-op (Protocol contract) | ||
| if job.state in _terminal: | ||
| logger.warning( | ||
| "mark_failed called on job %s in terminal state %s", | ||
| job_id, | ||
| job.state, | ||
| ) | ||
| return | ||
| await jobs_repo.mark_failed(session, job_id, error) |
There was a problem hiding this comment.
Auch in mark_failed lässt sich die doppelte Datenbankabfrage durch direktes Aufrufen von jobs_repo.mark_failed mit entsprechender Ausnahmebehandlung vermeiden.
| async def mark_failed(self, job_id: UUID, error: str) -> None: | |
| """Transition any non-terminal -> FAILED. Silently no-op if job not found.""" | |
| _terminal = { | |
| JobState.DONE.value, | |
| JobState.FAILED.value, | |
| JobState.CANCELLED.value, | |
| JobState.FAILED_RESTART.value, | |
| } | |
| async with self._session_factory() as session: | |
| job = await jobs_repo.get(session, job_id) | |
| if job is None: | |
| return # silent no-op (Protocol contract) | |
| if job.state in _terminal: | |
| logger.warning( | |
| "mark_failed called on job %s in terminal state %s", | |
| job_id, | |
| job.state, | |
| ) | |
| return | |
| await jobs_repo.mark_failed(session, job_id, error) | |
| async def mark_failed(self, job_id: UUID, error: str) -> None: | |
| """Transition any non-terminal -> FAILED. Silently no-op if job not found.""" | |
| async with self._session_factory() as session: | |
| try: | |
| await jobs_repo.mark_failed(session, job_id, error) | |
| except ValueError as exc: | |
| if "not found" in str(exc): | |
| return | |
| logger.warning(str(exc)) |
| async def stop(self, timeout_s: float = 5.0) -> None: | ||
| """Signalisiert dem Loop zu stoppen und wartet bis zu timeout_s Sekunden. | ||
|
|
||
| Nach Ablauf des Timeouts wird der Task gecancelled. | ||
| """ | ||
| self._stopping.set() | ||
| if self._task is not None: | ||
| try: | ||
| await asyncio.wait_for(self._task, timeout=timeout_s) | ||
| except TimeoutError: | ||
| self._task.cancel() | ||
| logger.warning( | ||
| "CleanupTask hat sich nicht in %ss beendet, Task gecancelled", | ||
| timeout_s, | ||
| ) | ||
| self._task = None |
There was a problem hiding this comment.
Nach dem Aufruf von self._task.cancel() im Fehlerfall wird der Task nicht mit await abgewartet. Dies kann dazu führen, dass der Task beim Beenden der Anwendung noch kurzzeitig im Hintergrund läuft oder Ressourcen nicht rechtzeitig freigegeben werden. Es wird empfohlen, den Task nach dem Abbruch explizit abzuwarten und eine eventuelle asyncio.CancelledError abzufangen.
| async def stop(self, timeout_s: float = 5.0) -> None: | |
| """Signalisiert dem Loop zu stoppen und wartet bis zu timeout_s Sekunden. | |
| Nach Ablauf des Timeouts wird der Task gecancelled. | |
| """ | |
| self._stopping.set() | |
| if self._task is not None: | |
| try: | |
| await asyncio.wait_for(self._task, timeout=timeout_s) | |
| except TimeoutError: | |
| self._task.cancel() | |
| logger.warning( | |
| "CleanupTask hat sich nicht in %ss beendet, Task gecancelled", | |
| timeout_s, | |
| ) | |
| self._task = None | |
| async def stop(self, timeout_s: float = 5.0) -> None: | |
| """Signalisiert dem Loop zu stoppen und wartet bis zu timeout_s Sekunden. | |
| Nach Ablauf des Timeouts wird der Task gecancelled. | |
| """ | |
| self._stopping.set() | |
| if self._task is not None: | |
| try: | |
| await asyncio.wait_for(self._task, timeout=timeout_s) | |
| except TimeoutError: | |
| self._task.cancel() | |
| try: | |
| await self._task | |
| except asyncio.CancelledError: | |
| pass | |
| logger.warning( | |
| "CleanupTask hat sich nicht in %ss beendet, Task gecancelled", | |
| timeout_s, | |
| ) | |
| self._task = None |
| except Exception as exc: | ||
| # I-1-Fix: in-memory Submit fehlgeschlagen nach DB-Persist — Rollback. | ||
| # Ohne diesen Rollback bliebe eine stale QUEUED-Row in der DB ohne | ||
| # Worker-Gegenstück, die nach Hub-Restart fälschlicherweise re-enqueued | ||
| # würde. mark_failed markiert die Row als FAILED und verhindert das. | ||
| await self._store.mark_failed( | ||
| db_job.id, | ||
| f"submit_failed: {exc.__class__.__name__}: {exc}", | ||
| ) | ||
| raise |
There was a problem hiding this comment.
Wenn self._store.mark_failed im except-Block fehlschlägt (z. B. wegen Verbindungsproblemen zur DB), wird die ursprüngliche Exception von submit_with_id maskiert. Es wird empfohlen, den Rollback-Datenbankaufruf in ein eigenes try-except zu hüllen, um sicherzustellen, dass die primäre Fehlerursache immer korrekt an den Aufrufer propagiert wird.
| except Exception as exc: | |
| # I-1-Fix: in-memory Submit fehlgeschlagen nach DB-Persist — Rollback. | |
| # Ohne diesen Rollback bliebe eine stale QUEUED-Row in der DB ohne | |
| # Worker-Gegenstück, die nach Hub-Restart fälschlicherweise re-enqueued | |
| # würde. mark_failed markiert die Row als FAILED und verhindert das. | |
| await self._store.mark_failed( | |
| db_job.id, | |
| f"submit_failed: {exc.__class__.__name__}: {exc}", | |
| ) | |
| raise | |
| except Exception as exc: | |
| # I-1-Fix: in-memory Submit fehlgeschlagen nach DB-Persist — Rollback. | |
| # Ohne diesen Rollback bliebe eine stale QUEUED-Row in der DB ohne | |
| # Worker-Gegenstück, die nach Hub-Restart fälschlicherweise re-enqueued | |
| # würde. mark_failed markiert die Row als FAILED und verhindert das. | |
| try: | |
| await self._store.mark_failed( | |
| db_job.id, | |
| f"submit_failed: {exc.__class__.__name__}: {exc}", | |
| ) | |
| except Exception as db_exc: | |
| logger.error("Rollback-mark_failed fehlgeschlagen für Job %s: %s", db_job.id, db_exc) | |
| raise |
There was a problem hiding this comment.
Pull request overview
This PR adds Phase 2 DB-backed job persistence so submitted jobs survive restarts, batch/job snapshots can be served from SQLite, and terminal jobs are cleaned up by retention.
Changes:
- Adds
JobStoreabstractions, SQLite-backed persistence, and print-queue recovery/transition writes. - Adds
GET /api/batches/{id}snapshot support with batch summary state. - Wires retention cleanup and expands unit/integration coverage for persistence, recovery, and snapshots.
Reviewed changes
Copilot reviewed 26 out of 27 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/superpowers/specs/2026-05-31-phase-2-job-persistence-design.md | Adds Phase 2 persistence design/spec. |
| docs/superpowers/plans/2026-05-31-phase-2-job-persistence.md | Adds implementation plan and task breakdown. |
| backend/app/services/job_store.py | Introduces JobStore protocol and memory implementation. |
| backend/app/services/job_store_sqlite.py | Adds SQLite-backed JobStore. |
| backend/app/services/cleanup_task.py | Adds retention cleanup background task. |
| backend/app/services/print_queue.py | Persists queue state transitions and performs restart recovery. |
| backend/app/services/print_service.py | Persists jobs before queue handoff. |
| backend/app/repositories/jobs.py | Adds repository helpers for recovery, lookup, and cleanup. |
| backend/app/api/routes/batches.py | Adds batch snapshot endpoint. |
| backend/app/api/routes/print.py | Preserves string job IDs in API response. |
| backend/app/schemas/batch_read.py | Adds batch snapshot response schemas. |
| backend/app/main.py | Wires store, cleanup task, recovery renderer, and new route. |
| backend/app/config.py | Adds job retention setting. |
| backend/tests/conftest.py | Adds shared async session factory fixture. |
| backend/tests/unit/repositories/conftest.py | Adds repository test DB fixtures. |
| backend/tests/unit/repositories/init.py | Adds repository test package marker. |
| backend/tests/unit/repositories/test_jobs_phase2.py | Tests new jobs repository helpers. |
| backend/tests/unit/services/test_job_store_memory.py | Tests memory store behavior. |
| backend/tests/unit/services/test_job_store_sqlite.py | Tests SQLite store behavior. |
| backend/tests/unit/services/test_cleanup_task.py | Tests cleanup task lifecycle/fail-soft behavior. |
| backend/tests/unit/services/test_print_queue_persistence.py | Tests queue persistence callbacks. |
| backend/tests/unit/services/test_print_service.py | Updates print service tests for UUID/persistence flow. |
| backend/tests/integration/conftest.py | Patches integration DB session binding. |
| backend/tests/integration/test_print_service_persistence.py | Tests service-level DB persistence before queueing. |
| backend/tests/integration/test_print_queue_recovery.py | Tests restart recovery behavior. |
| backend/tests/integration/test_batch_snapshot_endpoint.py | Tests batch snapshot endpoint summaries/order. |
| backend/tests/integration/test_phase6b_sse_with_batch.py | Adjusts printer setup for Phase 2 stub row behavior. |
| # Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str) | ||
| await self._store.mark_printing(UUID(job.id)) |
| api_key_id=None, # TODO: aus AuthContext wenn Endpoint-Layer angepasst | ||
| source_ip=None, # TODO: aus AuthContext wenn Endpoint-Layer angepasst |
| if db_printer_id is None: | ||
| # Wenn kein Host konfiguriert ist (Mock-Backend / CI), liefert | ||
| # upsert_runtime_printer None zurück und fügt keine Printer-Row ein. | ||
| # make_queue_printer erzeugt dann eine neue uuid4. Damit | ||
| # jobs.printer_id (FK → printers.id) bei save_queued nicht verletzt | ||
| # wird, legen wir hier eine Stub-Row an. slug wird auf str(id) gesetzt | ||
| # (eindeutig durch UUID), damit der UNIQUE-Constraint nicht verletzt wird. | ||
| _stub_slug = str(printer.id) |
Private git.strausmann.de Links durch Text-Referenzen ersetzt. Refs #93
## 0.8.0 (2026-06-01) * Merge pull request #94 from strausmann/feat/phase-2-job-persistence ([ef8fefd](ef8fefd)), closes [#94](#94) * fix(batch): cancelled-Feld in BatchSummary ergänzt + Tests verschärft ([ddbe0d4](ddbe0d4)), closes [#93](#93) * fix(cleanup-task): ruff/mypy strict-Blocker beheben ([71b2b28](71b2b28)), closes [#93](#93) * fix(docs): private Domain-URLs aus Spec entfernt (Privacy-Scan CI) ([77e4450](77e4450)), closes [#93](#93) * fix(job-store): Quality-Review-Findings I-1 bis I-4 behoben ([f2e2a4e](f2e2a4e)), closes [#93](#93) * fix(plan): Phase-2-Plan adressiert alle Reviewer-Findings (C1–C6, M1–M8, m1) ([261fc85](261fc85)), closes [#93](#93) * fix(print_queue): Recovery-Loop gegen KeyError, TemplateNotFoundError und Race absichern ([735a184](735a184)), closes [#93](#93) * fix(queue): stop() persistiert PRINTING-Jobs als FAILED in DB ([002ef54](002ef54)), closes [#93](#93) * fix(service): submit_print_job rollback + paused-jobs nicht persistieren ([ca88c52](ca88c52)), closes [#93](#93) * style(tests): ruff-Befunde Phase 2 behoben (E501, I001, F401, C408, N806) ([16676fd](16676fd)), closes [#93](#93) * refactor(main): Fix-Round Phase-2 Review (M-1, N-1, N-2) ([34f7e79](34f7e79)), closes [#93](#93) * refactor(repo): jobs_repo Code-Quality-Findings Task 1 ([1b1cc1e](1b1cc1e)), closes [#93](#93) * refactor(services): SQLiteJobStore Code-Quality-Findings Task 3 ([223a0cb](223a0cb)), closes [#93](#93) * feat(api): GET /api/batches/{id} Snapshot-Endpoint (Phase 2) ([1344873](1344873)), closes [#93](#93) * feat(lifespan): wire JobStore + CleanupTask in App-Startup (Phase 2) ([d027884](d027884)), closes [#93](#93) * feat(queue): PrintQueue ruft JobStore bei jeder State-Transition (Phase 2) ([7aa9027](7aa9027)), closes [#93](#93) * feat(queue): PrintQueue.start() Recovery (Phase 2) ([935a79a](935a79a)), closes [#93](#93) * feat(repo): jobs_repo Helper für Phase 2 JobStore ([c80bda5](c80bda5)), closes [#93](#93) * feat(service): PrintService persistiert Job-Row vor queue.submit (Phase 2) ([af20365](af20365)), closes [#93](#93) * feat(services): CleanupTask + PRINTER_HUB_JOB_RETENTION_DAYS config (Phase 2) ([3c7a27d](3c7a27d)), closes [#93](#93) * feat(services): JobStore Protocol + MemoryJobStore (Phase 2) ([d39c795](d39c795)), closes [#93](#93) * feat(services): SQLiteJobStore delegiert an jobs_repo (Phase 2) ([d524859](d524859)), closes [#93](#93) * docs(plan): Phase 2 Job Persistence Implementation Plan ([a8289c8](a8289c8)), closes [#93](#93) * docs(spec): adressiere R1 Critical + Major Findings (Review 2026-05-31) ([6a94f11](6a94f11)), closes [#93](#93) * docs(spec): Phase 2 — Job Persistence Design ([a59b187](a59b187)), closes [strausmann/hangar#81](https://github.com/strausmann/hangar/issues/81) [#93](#93) [skip ci]
Summary
Hub Phase 2 — Jobs werden jetzt in der
jobs-Tabelle persistiert statt nur im In-Memory-Zustand gehalten. Der Neustart des Services erholt sich automatisch: PRINTING-Jobs werden als FAILED_RESTART markiert, QUEUED-Jobs werden re-enqueued. EinCleanupTaskentfernt terminal Jobs nach konfigurierbarer Retention-Zeit.Contributor License Agreement (CLA)
By opening this pull request you affirm that you have read and agree to the
project's Contributor License Agreement for the contribution(s)
included here.
Linked issue
Closes #93
Type of change
feat!/fix!) — describe migration path belowÄnderungen im Überblick
JobStoreProtocol mitMemoryJobStore(In-Memory, default) undSQLiteJobStore(DB-backed)PrintQueueruftstore.mark_*bei jeder State-Transition synchron aufPrintServicelegt erst die DB-Row an, dannqueue.submit(atomisch mit Rollback bei queue-Fehler)PrintQueue.start()Recovery: PRINTING → FAILED_RESTART, QUEUED → re-enqueued via RerenderCleanupTaskräumt terminal Jobs älter alsPRINTER_HUB_JOB_RETENTION_DAYS(Default 30 Tage)GET /api/batches/{id}Snapshot-Endpoint mitsummary.all_terminalrecover_inflight_jobs()aus Lifespan entfernt — Phase 2 ersetzt die Funktion mit korrekter QUEUED/PRINTING-DifferenzierungHardware tested on
Test coverage
~50 neue Tests (Unit + Integration), 870 Tests grün, 5 expected Skips.
Checklist
feat(...): ...etc.)Spec & Plan
docs/superpowers/specs/2026-05-31-phase-2-job-persistence-design.mddocs/superpowers/plans/2026-05-31-phase-2-job-persistence.mdBeide gemerged auf den Branch mit Reviewer-Patches aus 2 unabhängigen Spec+Plan-Reviews.
Migration / breaking change notes
Keine Breaking Changes. Neues optionales Env-Var
PRINTER_HUB_JOB_RETENTION_DAYS(Default: 30). Bestehende Deployments ohne diese Variable verhalten sich wie bisher (In-Memory-Store bleibt Default bisDATABASE_URLgesetzt ist).Screenshots / output
Entblockt strausmann/hangar#81 (Result-Page Live-Updates).