Skip to content

feat(jobs): Phase 2 Job Persistence (DB-backed + Restart Recovery + Cleanup)#94

Merged
strausmann merged 24 commits into
mainfrom
feat/phase-2-job-persistence
May 31, 2026
Merged

feat(jobs): Phase 2 Job Persistence (DB-backed + Restart Recovery + Cleanup)#94
strausmann merged 24 commits into
mainfrom
feat/phase-2-job-persistence

Conversation

@strausmann
Copy link
Copy Markdown
Owner

Summary

Hub Phase 2 — Jobs werden jetzt in der jobs-Tabelle persistiert statt nur im In-Memory-Zustand gehalten. Der Neustart des Services erholt sich automatisch: PRINTING-Jobs werden als FAILED_RESTART markiert, QUEUED-Jobs werden re-enqueued. Ein CleanupTask entfernt terminal Jobs nach konfigurierbarer Retention-Zeit.

Contributor License Agreement (CLA)

By opening this pull request you affirm that you have read and agree to the
project's Contributor License Agreement for the contribution(s)
included here.

Linked issue

Closes #93

Type of change

  • Bug fix (non-breaking)
  • New feature (non-breaking)
  • Breaking change (feat! / fix!) — describe migration path below
  • New printer model plugin
  • Documentation only
  • Refactor / chore (no behaviour change)
  • Test only
  • CI / release tooling

Änderungen im Überblick

  • JobStore Protocol mit MemoryJobStore (In-Memory, default) und SQLiteJobStore (DB-backed)
  • PrintQueue ruft store.mark_* bei jeder State-Transition synchron auf
  • PrintService legt erst die DB-Row an, dann queue.submit (atomisch mit Rollback bei queue-Fehler)
  • PrintQueue.start() Recovery: PRINTING → FAILED_RESTART, QUEUED → re-enqueued via Rerender
  • CleanupTask räumt terminal Jobs älter als PRINTER_HUB_JOB_RETENTION_DAYS (Default 30 Tage)
  • GET /api/batches/{id} Snapshot-Endpoint mit summary.all_terminal
  • recover_inflight_jobs() aus Lifespan entfernt — Phase 2 ersetzt die Funktion mit korrekter QUEUED/PRINTING-Differenzierung

Hardware tested on

  • Brother PT-Series (model: ___)
  • Brother QL-Series (model: ___)
  • Other: ___
  • No hardware impact

Test coverage

  • Added/updated unit tests
  • Added/updated integration tests
  • Hardware tests added (if applicable)
  • Existing tests still pass

~50 neue Tests (Unit + Integration), 870 Tests grün, 5 expected Skips.

Checklist

  • PR title follows Conventional Commits (feat(...): ... etc.)
  • All commits are signed off (if applicable)
  • CHANGELOG entry isn't needed (semantic-release generates it)
  • Documentation updated (if behaviour, API, or config changed)
  • No private IPs, hostnames, domains, real tokens, or PII in commits
  • CI is green (will be checked again after CI runs)

Spec & Plan

  • Spec: docs/superpowers/specs/2026-05-31-phase-2-job-persistence-design.md
  • Plan: docs/superpowers/plans/2026-05-31-phase-2-job-persistence.md

Beide gemerged auf den Branch mit Reviewer-Patches aus 2 unabhängigen Spec+Plan-Reviews.

Migration / breaking change notes

Keine Breaking Changes. Neues optionales Env-Var PRINTER_HUB_JOB_RETENTION_DAYS (Default: 30). Bestehende Deployments ohne diese Variable verhalten sich wie bisher (In-Memory-Store bleibt Default bis DATABASE_URL gesetzt ist).

Screenshots / output

870 passed, 5 skipped in 143.18s
mypy --strict app: Success: no issues found in 107 source files
ruff check app tests: All checks passed!

Entblockt strausmann/hangar#81 (Result-Page Live-Updates).

strausmann added 23 commits May 31, 2026 00:54
JobStore Protocol mit DI als saubere Boundary zwischen PrintQueue und DB.
SQLite-Backed-Implementation persistiert jede State-Transition synchron.
PrintQueue.start() ruft mark_interrupted + list_pending fuer Restart-Recovery
(PRINTING -> FAILED printer_interrupted, QUEUED bleibt FIFO erhalten).
Neuer GET /api/batches/{id} Snapshot-Endpoint liefert Initial-State fuer
Hangar Result-Page (entblockt strausmann/hangar#81).
CleanupTask raeumt terminal Jobs aelter als retention_days (Default 30).

Brainstorming-Q&A Entscheidungen:
- Scope: Alles drei (Persist + Recovery + Snapshot)
- PRINTING-Recovery: FAILED mit printer_interrupted
- Write-Granularitaet: Jede Transition synchron
- Retention: Konfigurierbar, Default 30 Tage
- SSE-Replay: Snapshot-GET + Live-SSE (kein Event-Replay)
- Approach: JobStore-Protocol mit DI

Refs #93
10 Tasks mit TDD-Schritten: jobs_repo helpers, JobStore Protocol,
SQLiteJobStore, PrintQueue Persistierung, PrintService + create_queued,
Recovery in start(), CleanupTask, GET /api/batches/{id}, Lifespan,
manuelle Verification + PR.

Erratum-Update zur Spec im selben Commit (Errata-Sektion ergaenzt):
zwei Job-Klassen, jobs_repo Wiederverwendung, FAILED_RESTART semantic.

Refs #93
C1: recover_inflight_jobs() aus Lifespan-Startup muss entfernt werden —
    kollidiert mit Phase-2-Recovery in PrintQueue.start()
C2: _jobs dict bleibt als In-Memory-Store für Dataclass-Jobs; wird nach
    terminal bereinigt; JobStore ist parallel Source of Truth für DB
C3: Skip-Check if job.state != QUEUED muss VOR mark_printing im
    Worker-Pseudocode explizit gezeigt werden
M1: BatchRead.created_by ist str (SSO-Email oder API-Key-ID), kein UUID
M2: list_by_ids akzeptiert list[str | UUID], normalisiert intern zu UUID
M3: BatchSummary.failed zählt FAILED + FAILED_RESTART zusammen
M4+Err5: _rerender_from_db_job nutzt LabelData.model_validate()
    statt rohes dict; Payload-Schema explizit dokumentiert
M5+Err6: stop() ruft nach Dataclass-Transition auch
    store.mark_failed(job.id, "shutdown") auf
M6: eigene list_pending(printer_id) in jobs_repo die QUEUED+PRINTING+
    PAUSED zurückgibt; list_active nicht erweitern
m4: Router-Prefix @router.get("/{batch_id}") mit prefix="/api/batches"
    — kein Doppel-/api in Route-Dekorator

Refs #93
…M8, m1)

C1: job_store.py Import-Klarstellung (from sqlalchemy import update)
C2+M8: async_session_factory-Fixture in Root-conftest statt Integration-conftest
C3: Worker-Insert konkret mit Zeilennummern aus print_queue.py
C4: LabelData.model_validate(payload["label_data"]) statt direktes Dict an renderer
C5: BatchRead.created_by als str | None (PrintBatch.created_by ist str)
C6: Fixture-Definitionen explizit — client statt auth_client, Fake-Auth-Pattern
M1: Schritt-Reihenfolge korrigiert — bestehende Tests zuerst anpassen, dann neue gruen
M3: hasattr-Anti-Pattern entfernt, Protocol-isinstance durch duck-typing ersetzt
M4: from sqlalchemy import update (nicht from sqlmodel)
M5: Lifespan-Snippet mit echtem async_session + PrintQueue(printers=[printer])
M6: asyncio.sleep(0.05) in CleanupTask-Tests durch stop()-Wait ersetzt
M7: evict-Test nutzt direkte get()-Asserts statt list_active+get-Verkettung
m1: Step 5a mit explizitem Edit-Schritt fuer delete/update Import ergaenzt

Refs #93
- mark_printing_as_failed_restart(printer_id) — nur PRINTING affected
- list_active(printer_id=None) — optionaler Filter
- evict_terminal_older_than(age) — Cleanup-Helper

Refs #93
- list_active: WHERE-Klauseln zuerst, danach order_by (Lesbarkeit)
- evict_terminal_older_than: dokumentiere NULL-finished_at Verhalten

Refs #93
JobStore ist die Persistierungs-Boundary die PrintQueue nutzt um
Lifecycle-Transitionen zu speichern. MemoryJobStore ist die Test-Impl
mit gleicher Semantik wie späterer SQLiteJobStore.

Refs #93
- I-1: MemoryJobStore erbt explizit von JobStore (Protocol-Konformität
  für zukünftige Methoden-Erweiterungen durch mypy prüfbar)
- I-2: _make_job mit vollständigen Typ-Annotationen; alle test_*-Funktionen
  mit -> None; assert-not-None-Guards vor union-attr-Zugriffen
- I-3: mark_printing/mark_done/mark_failed Docstrings dokumentieren
  silent-no-op-Semantik bei fehlendem job_id (Race-against-eviction)
- I-4: PRINTING- und DONE-Jobs in Tests über save_queued + mark_printing
  / mark_done eingebracht statt save_queued mit falschem State aufzurufen

mypy --strict: 0 Fehler auf beiden Dateien
Tests: 5/5 (job_store) + 706/706 (unit gesamt) grün

Refs #93
Per-operation Sessions via async_sessionmaker. Implementiert das
gleiche JobStore Protocol wie MemoryJobStore — Protocol-Conformance
durch isinstance-Check verifiziert.

async_session_factory Fixture in tests/conftest.py (root) damit
Unit- und Integration-Tests sie sehen.

Refs #93
- mark_printing/done/failed: pre-check statt try/except, logger.warning bei falschem State
- result={} Pattern in mark_done dokumentiert
- async_session_factory Fixture: FK-Disable Trade-off im Docstring begründet

Refs #93
…se 2)

QUEUED -> PRINTING -> DONE/FAILED wird synchron in der DB persistiert.
Konstruktor bekommt store: JobStore als optionalen Parameter (Default:
MemoryJobStore für Rückwärtskompatibilität). Worker bridget dataclass.id
(str) via UUID(job.id) an den Store. Skip-Check bleibt vor mark_printing.
_jobs dict bleibt für laufende Drucke mit image_payload erhalten.

Refs #93
Schließt Spec-Errata C2 und Code-Quality-Finding C-1.
Plus Header-Docstring Phase-2-aktualisiert und __init__ Docstring
zur store-Optional-Entscheidung ergänzt.

Refs #93
…se 2)

Neue submit_with_id() und submit_paused_with_id() in PrintQueue erlauben
extern-generierte job_id. PrintService legt erst die DB-Row an
(store.save_queued) und reicht die UUID durch — Hand-off ist atomisch
und persistiert. submit_print_job gibt jetzt UUID statt str zurück;
Route konvertiert zu str für Response-Schema-Kompatibilität.

Refs #93
Adressiert C-1 (PAUSED als QUEUED in DB) und I-1 (kein Rollback).

C-1: on_tape_mismatch=queue Pfad ruft save_queued nicht mehr auf —
PAUSED-Jobs bleiben in-memory-only (Trade-off: Hub-Restart löscht sie,
Phase 3 wird PAUSED in JobState enum aufnehmen).

I-1: happy-path try/except um queue.submit_with_id ergänzt — bei
failure wird DB-Job via mark_failed auf FAILED gesetzt damit keine
stale QUEUED-Rows ohne Worker bleiben.

Refs #93
mark_interrupted(printer_id) markiert verloren-gegangene PRINTING-Jobs
als FAILED_RESTART. QUEUED-Jobs werden aus template_key+payload neu
gerendert und in FIFO-Reihenfolge re-enqueued.

PrintQueue.__init__ erhält optionale Parameter renderer + loader für
Recovery. _rerender_from_db_job() wirft RuntimeError wenn nicht
verdrahtet (sichere Fehlerbehandlung ohne renderer/loader).

Bestehende Persistence-Tests konfigurieren mark_interrupted.return_value=0
und list_pending.return_value=[] damit start() mit AsyncMock-Stores
sauber durchläuft.

Refs #93
… und Race absichern

C-1: KeyError in _rerender_from_db_job (fehlendes label_data) wird nun per
try/except in der Recovery-Loop gefangen — betroffener Job wird als FAILED
markiert, alle anderen QUEUED-Jobs laufen weiter.

I-2: TemplateNotFoundError (gelöschtes Template) erhält die gleiche Behandlung
im selben try/except-Block.

I-1: self._running wird jetzt als erstes nach dem Guard gesetzt (vor dem ersten
await), sodass ein gleichzeitiger zweiter start()-Aufruf sicher abgewiesen wird.
try/finally setzt _running bei Recovery-Fehler zurück.

S-1: db_job.state-Vergleich nutzt jetzt DbJobState.QUEUED.value statt
String-Literal "queued" — konsistente StrEnum-Verwendung.

Neue Tests:
- test_recovery_skips_jobs_with_missing_label_data
- test_recovery_skips_jobs_with_deleted_template

Refs #93
…Phase 2)

Background asyncio task läuft initial bei start() und dann alle 24h,
ruft store.evict_terminal_older_than(retention). Fail-soft bei
DB-Errors — Loop survives.

Refs #93
- asyncio.TimeoutError → TimeoutError (built-in, py312) in stop() und _loop()
- logger.warning-Zeile auf 4 Zeilen aufgeteilt (> 100 Zeichen)
- unbenutztes `import asyncio` aus test_cleanup_task.py entfernt
- -> None Return-Type auf alle async def test_*-Funktionen ergänzt
- Single-Use-Hinweis in CleanupTask-Docstring ergänzt

Refs #93
Liefert Batch-Metadaten + alle Job-States. summary.all_terminal sagt
Hangar ob noch ein SSE-Stream geöffnet werden muss. Job-Reihenfolge
entspricht batch.job_ids; cleanup-geister werden übersprungen.

- jobs_repo.list_by_ids() Bulk-Fetch via SQL IN
- BatchRead + BatchSummary Schemas (model_validator für all_terminal)
- GET /api/batches/{batch_id} Route mit require_read Auth
- Router in main.py registriert
- Integration-conftest patcht app.db.session.async_session (Task-8-Fix)
- ruff I001/E501/RUF100 in print_queue.py + print_service.py behoben

Entblockt strausmann/hangar#81.

Refs #93
- BatchSummary.cancelled: int als neues Pflichtfeld (ISSUE-1)
- Endpoint zählt cancelled Jobs korrekt (CANCELLED-State)
- Ghost-ID-Test-Assertion präzisiert: total == 1 statt < len() (ISSUE-2)
- Neuer Test: failed-Counter zählt FAILED + FAILED_RESTART zusammen

Refs #93
SQLiteJobStore aus async_session; CleanupTask läuft beim Start sofort
und danach alle 24h. PrintQueue und PrintService erhalten store via DI
für Recovery und Persistierung. recover_inflight_jobs() entfernt —
PrintQueue.start() übernimmt Recovery mit korrekter
QUEUED/PRINTING-Differenzierung (Spec R1-C1).

Stub-Printer-Row für Mock-Backend / CI: wenn upsert_runtime_printer
keinen Host-konfigurierten Drucker anlegt, wird eine Stub-Row mit
eindeutigem slug=str(uuid) eingefügt damit jobs.printer_id (FK) nicht
verletzt wird. Test test_phase6b_sse_with_batch angepasst (upsert statt
blindes create, weil Stub-Row bereits existiert).

Refs #93
M-1: Doppelten Kommentar-Block vor dem Stub-Row-if entfernt;
     vollständiger innerer Block (mit slug-UNIQUE-Detail) bleibt.
N-1: Defensiven Idempotency-Kommentar am existing-Check ergänzt,
     damit Mock-Path-Logik nicht als tote Logik erscheint.
N-2: Cleanup-Reihenfolge korrigiert: queue.stop() vor
     cleanup_task.stop(), sodass kein SQLAlchemy-Warning entsteht
     wenn CleanupTask während evict_terminal_older_than gecancelt wird.

Refs #93
Copilot AI review requested due to automatic review settings May 31, 2026 04:39
@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces robust job persistence to the printing service, transitioning from purely in-memory job tracking to a database-backed system. By integrating a JobStore protocol, the service now ensures that job states are durable, supports automatic recovery after service restarts, and provides a new API endpoint for efficient batch status monitoring. A background cleanup task has also been added to manage database growth by pruning old terminal jobs.

Highlights

  • Job Persistence: Implemented a new JobStore protocol with a SQLiteJobStore implementation to persist job state transitions in the database, ensuring durability across service restarts.
  • Restart Recovery: Added automatic recovery in PrintQueue.start(): PRINTING jobs are marked as FAILED_RESTART and QUEUED jobs are re-enqueued using a rerender process.
  • Cleanup Task: Introduced a background CleanupTask that periodically removes terminal jobs (DONE, FAILED, etc.) older than a configurable retention period (default 30 days).
  • New API Endpoint: Added GET /api/batches/{id} to provide a snapshot of batch status and job states, including an all_terminal summary for Hangar's frontend to optimize SSE connections.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 31, 2026

Codecov Report

❌ Patch coverage is 88.98551% with 38 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.92%. Comparing base (280f46d) to head (77e4450).
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
backend/app/services/print_queue.py 81.81% 8 Missing and 4 partials ⚠️
backend/app/api/routes/batches.py 77.77% 6 Missing ⚠️
backend/app/services/cleanup_task.py 86.04% 4 Missing and 2 partials ⚠️
backend/app/services/job_store_sqlite.py 89.47% 3 Missing and 3 partials ⚠️
backend/app/services/job_store.py 93.44% 3 Missing and 1 partial ⚠️
backend/app/main.py 89.47% 2 Missing ⚠️
backend/app/repositories/jobs.py 92.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #94      +/-   ##
==========================================
- Coverage   90.20%   89.92%   -0.29%     
==========================================
  Files          84       89       +5     
  Lines        3684     4008     +324     
  Branches      314      343      +29     
==========================================
+ Hits         3323     3604     +281     
- Misses        279      315      +36     
- Partials       82       89       +7     
Components Coverage Δ
Printer Backends (transport) 87.50% <ø> (ø)
Printer Models (drivers) 91.42% <ø> (ø)
Services 91.38% <88.61%> (-0.65%) ⬇️
REST API 85.08% <78.57%> (-0.39%) ⬇️
Pydantic Schemas 100.00% <100.00%> (ø)
Integration Plugins 100.00% <ø> (ø)
Files with missing lines Coverage Δ
backend/app/api/routes/print.py 97.50% <100.00%> (-1.25%) ⬇️
backend/app/config.py 100.00% <100.00%> (ø)
backend/app/schemas/batch_read.py 100.00% <100.00%> (ø)
backend/app/services/print_service.py 100.00% <100.00%> (ø)
backend/app/main.py 85.19% <89.47%> (-1.40%) ⬇️
backend/app/repositories/jobs.py 89.52% <92.00%> (-0.96%) ⬇️
backend/app/services/job_store.py 93.44% <93.44%> (ø)
backend/app/api/routes/batches.py 77.77% <77.77%> (ø)
backend/app/services/cleanup_task.py 86.04% <86.04%> (ø)
backend/app/services/job_store_sqlite.py 89.47% <89.47%> (ø)
... and 1 more

... and 1 file with indirect coverage changes

Flag Coverage Δ
backend 89.92% <88.98%> (-0.29%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 280f46d...77e4450. Read the comment docs.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements Phase 2 of job persistence for the label printer hub, introducing a JobStore protocol with SQLite and in-memory implementations to persist print job state transitions. It refactors PrintQueue and PrintService to save jobs before submission, adds a restart-recovery mechanism to re-enqueue queued jobs upon startup, introduces a periodic CleanupTask to evict old terminal jobs, and adds a new GET /api/batches/{batch_id} snapshot endpoint. The reviewer's feedback focuses on improving robustness and efficiency, suggesting that database state transitions in the worker loop be wrapped in try-except blocks to prevent queue blockage, that potential TypeError and AttributeError exceptions be handled defensively during recovery, and that redundant database queries in SQLiteJobStore be eliminated. Additionally, the reviewer recommends explicitly awaiting canceled tasks in CleanupTask to avoid resource leaks and wrapping rollback database calls in PrintService to prevent exception masking.

Comment on lines +689 to 694
# Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str)
await self._store.mark_printing(UUID(job.id))

try:
_from = job.state
JobStateMachine.transition(job, JobState.PRINTING)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Wenn self._store.mark_printing eine Exception auslöst (z. B. aufgrund einer temporären Datenbanksperre bei SQLite), wird diese außerhalb des try-Blocks nicht abgefangen. Dies führt dazu, dass der gesamte Worker-Task (_worker) unerwartet beendet wird und die Druckerwarteschlange dauerhaft blockiert. Es wird empfohlen, diesen Aufruf in den try-Block zu verschieben, damit Fehler abgefangen und protokolliert werden können, ohne den Worker-Loop zu beenden.

Suggested change
# Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str)
await self._store.mark_printing(UUID(job.id))
try:
_from = job.state
JobStateMachine.transition(job, JobState.PRINTING)
try:
# Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str)
await self._store.mark_printing(UUID(job.id))
_from = job.state
JobStateMachine.transition(job, JobState.PRINTING)

Comment on lines +230 to +239
try:
image = await self._rerender_from_db_job(db_job)
except (KeyError, ValidationError, TemplateNotFoundError) as exc:
logger.warning(
"Recovery: Job %s rerender fehlgeschlagen (%s), FAILED",
db_job.id,
exc.__class__.__name__,
)
await self._store.mark_failed(db_job.id, f"recovery_rerender_failed: {exc}")
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Falls db_job.payload in der Datenbank beschädigt oder None ist, kann _rerender_from_db_job eine TypeError auslösen (da None nicht indizierbar ist). Da TypeError nicht in der Liste der abgefangenen Ausnahmen enthalten ist, würde dies den gesamten Recovery-Prozess beim Start abbrechen. Es wird empfohlen, TypeError ebenfalls abzufangen.

Suggested change
try:
image = await self._rerender_from_db_job(db_job)
except (KeyError, ValidationError, TemplateNotFoundError) as exc:
logger.warning(
"Recovery: Job %s rerender fehlgeschlagen (%s), FAILED",
db_job.id,
exc.__class__.__name__,
)
await self._store.mark_failed(db_job.id, f"recovery_rerender_failed: {exc}")
continue
try:
image = await self._rerender_from_db_job(db_job)
except (KeyError, ValidationError, TemplateNotFoundError, TypeError) as exc:
logger.warning(
"Recovery: Job %s rerender fehlgeschlagen (%s), FAILED",
db_job.id,
exc.__class__.__name__,
)
await self._store.mark_failed(db_job.id, f"recovery_rerender_failed: {exc}")
continue

Comment on lines +241 to +247
wrapper = Job(
id=str(db_job.id),
printer_id=db_job.printer_id,
image_payload=payload_bytes,
tape_mm=db_job.payload.get("tape_mm"),
options=db_job.payload.get("options", {}),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Falls db_job.payload in der Datenbank aus irgendeinem Grund None ist (z. B. bei fehlerhaften oder unvollständigen Altdaten), führt der direkte Aufruf von db_job.payload.get(...) außerhalb des try-Blocks zu einem AttributeError und bringt den gesamten Recovery-Prozess zum Absturz. Es wird empfohlen, den Payload defensiv zu behandeln.

Suggested change
wrapper = Job(
id=str(db_job.id),
printer_id=db_job.printer_id,
image_payload=payload_bytes,
tape_mm=db_job.payload.get("tape_mm"),
options=db_job.payload.get("options", {}),
)
payload = db_job.payload or {}
wrapper = Job(
id=str(db_job.id),
printer_id=db_job.printer_id,
image_payload=payload_bytes,
tape_mm=payload.get("tape_mm"),
options=payload.get("options", {}),
)

Comment on lines +45 to +58
async def mark_printing(self, job_id: UUID) -> None:
"""Transition QUEUED -> PRINTING. Silently no-op if job not found."""
async with self._session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
return # silent no-op (Protocol contract)
if job.state != JobState.QUEUED.value:
logger.warning(
"mark_printing called on job %s in state %s (expected queued)",
job_id,
job.state,
)
return
await jobs_repo.mark_printing(session, job_id)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In SQLiteJobStore.mark_printing wird zuerst jobs_repo.get aufgerufen, um den Job-Status zu prüfen, und anschließend jobs_repo.mark_printing, welches intern den Job erneut aus der Datenbank lädt. Dies führt zu zwei redundanten Datenbankabfragen pro Zustandsübergang. Da jobs_repo.mark_printing bei Fehlern (Job nicht gefunden oder ungültiger Zustand) eine ValueError auslöst, kann dies effizienter mit einem einzigen try-except-Block gelöst werden. Das gleiche Muster lässt sich auch auf mark_done und mark_failed anwenden.

    async def mark_printing(self, job_id: UUID) -> None:
        """Transition QUEUED -> PRINTING. Silently no-op if job not found."""
        async with self._session_factory() as session:
            try:
                await jobs_repo.mark_printing(session, job_id)
            except ValueError as exc:
                if "not found" in str(exc):
                    return
                logger.warning(str(exc))

Comment on lines +60 to +78
async def mark_done(self, job_id: UUID) -> None:
"""Transition PRINTING -> DONE.

Delegiert an jobs_repo.mark_done mit result={} — JobStore-Protocol
speichert kein structured result (Phase-2 YAGNI; ergänzbar via
`set_result` Methode wenn Hangar das später braucht).
"""
async with self._session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
return # silent no-op (Protocol contract)
if job.state != JobState.PRINTING.value:
logger.warning(
"mark_done called on job %s in state %s (expected printing)",
job_id,
job.state,
)
return
await jobs_repo.mark_done(session, job_id, result={})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Analog zu mark_printing führt auch mark_done zwei redundante Datenbankabfragen durch. Durch die direkte Nutzung von jobs_repo.mark_done innerhalb eines try-except-Blocks lässt sich die Datenbanklast halbieren.

    async def mark_done(self, job_id: UUID) -> None:
        """Transition PRINTING -> DONE.

        Delegiert an jobs_repo.mark_done mit result={} — JobStore-Protocol
        speichert kein structured result (Phase-2 YAGNI; ergänzbar via
        `set_result` Methode wenn Hangar das später braucht).
        """
        async with self._session_factory() as session:
            try:
                await jobs_repo.mark_done(session, job_id, result={})
            except ValueError as exc:
                if "not found" in str(exc):
                    return
                logger.warning(str(exc))

Comment on lines +80 to +99
async def mark_failed(self, job_id: UUID, error: str) -> None:
"""Transition any non-terminal -> FAILED. Silently no-op if job not found."""
_terminal = {
JobState.DONE.value,
JobState.FAILED.value,
JobState.CANCELLED.value,
JobState.FAILED_RESTART.value,
}
async with self._session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
return # silent no-op (Protocol contract)
if job.state in _terminal:
logger.warning(
"mark_failed called on job %s in terminal state %s",
job_id,
job.state,
)
return
await jobs_repo.mark_failed(session, job_id, error)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Auch in mark_failed lässt sich die doppelte Datenbankabfrage durch direktes Aufrufen von jobs_repo.mark_failed mit entsprechender Ausnahmebehandlung vermeiden.

Suggested change
async def mark_failed(self, job_id: UUID, error: str) -> None:
"""Transition any non-terminal -> FAILED. Silently no-op if job not found."""
_terminal = {
JobState.DONE.value,
JobState.FAILED.value,
JobState.CANCELLED.value,
JobState.FAILED_RESTART.value,
}
async with self._session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
return # silent no-op (Protocol contract)
if job.state in _terminal:
logger.warning(
"mark_failed called on job %s in terminal state %s",
job_id,
job.state,
)
return
await jobs_repo.mark_failed(session, job_id, error)
async def mark_failed(self, job_id: UUID, error: str) -> None:
"""Transition any non-terminal -> FAILED. Silently no-op if job not found."""
async with self._session_factory() as session:
try:
await jobs_repo.mark_failed(session, job_id, error)
except ValueError as exc:
if "not found" in str(exc):
return
logger.warning(str(exc))

Comment on lines +53 to +68
async def stop(self, timeout_s: float = 5.0) -> None:
"""Signalisiert dem Loop zu stoppen und wartet bis zu timeout_s Sekunden.

Nach Ablauf des Timeouts wird der Task gecancelled.
"""
self._stopping.set()
if self._task is not None:
try:
await asyncio.wait_for(self._task, timeout=timeout_s)
except TimeoutError:
self._task.cancel()
logger.warning(
"CleanupTask hat sich nicht in %ss beendet, Task gecancelled",
timeout_s,
)
self._task = None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Nach dem Aufruf von self._task.cancel() im Fehlerfall wird der Task nicht mit await abgewartet. Dies kann dazu führen, dass der Task beim Beenden der Anwendung noch kurzzeitig im Hintergrund läuft oder Ressourcen nicht rechtzeitig freigegeben werden. Es wird empfohlen, den Task nach dem Abbruch explizit abzuwarten und eine eventuelle asyncio.CancelledError abzufangen.

Suggested change
async def stop(self, timeout_s: float = 5.0) -> None:
"""Signalisiert dem Loop zu stoppen und wartet bis zu timeout_s Sekunden.
Nach Ablauf des Timeouts wird der Task gecancelled.
"""
self._stopping.set()
if self._task is not None:
try:
await asyncio.wait_for(self._task, timeout=timeout_s)
except TimeoutError:
self._task.cancel()
logger.warning(
"CleanupTask hat sich nicht in %ss beendet, Task gecancelled",
timeout_s,
)
self._task = None
async def stop(self, timeout_s: float = 5.0) -> None:
"""Signalisiert dem Loop zu stoppen und wartet bis zu timeout_s Sekunden.
Nach Ablauf des Timeouts wird der Task gecancelled.
"""
self._stopping.set()
if self._task is not None:
try:
await asyncio.wait_for(self._task, timeout=timeout_s)
except TimeoutError:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.warning(
"CleanupTask hat sich nicht in %ss beendet, Task gecancelled",
timeout_s,
)
self._task = None

Comment on lines +174 to +183
except Exception as exc:
# I-1-Fix: in-memory Submit fehlgeschlagen nach DB-Persist — Rollback.
# Ohne diesen Rollback bliebe eine stale QUEUED-Row in der DB ohne
# Worker-Gegenstück, die nach Hub-Restart fälschlicherweise re-enqueued
# würde. mark_failed markiert die Row als FAILED und verhindert das.
await self._store.mark_failed(
db_job.id,
f"submit_failed: {exc.__class__.__name__}: {exc}",
)
raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Wenn self._store.mark_failed im except-Block fehlschlägt (z. B. wegen Verbindungsproblemen zur DB), wird die ursprüngliche Exception von submit_with_id maskiert. Es wird empfohlen, den Rollback-Datenbankaufruf in ein eigenes try-except zu hüllen, um sicherzustellen, dass die primäre Fehlerursache immer korrekt an den Aufrufer propagiert wird.

Suggested change
except Exception as exc:
# I-1-Fix: in-memory Submit fehlgeschlagen nach DB-Persist — Rollback.
# Ohne diesen Rollback bliebe eine stale QUEUED-Row in der DB ohne
# Worker-Gegenstück, die nach Hub-Restart fälschlicherweise re-enqueued
# würde. mark_failed markiert die Row als FAILED und verhindert das.
await self._store.mark_failed(
db_job.id,
f"submit_failed: {exc.__class__.__name__}: {exc}",
)
raise
except Exception as exc:
# I-1-Fix: in-memory Submit fehlgeschlagen nach DB-Persist — Rollback.
# Ohne diesen Rollback bliebe eine stale QUEUED-Row in der DB ohne
# Worker-Gegenstück, die nach Hub-Restart fälschlicherweise re-enqueued
# würde. mark_failed markiert die Row als FAILED und verhindert das.
try:
await self._store.mark_failed(
db_job.id,
f"submit_failed: {exc.__class__.__name__}: {exc}",
)
except Exception as db_exc:
logger.error("Rollback-mark_failed fehlgeschlagen für Job %s: %s", db_job.id, db_exc)
raise

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds Phase 2 DB-backed job persistence so submitted jobs survive restarts, batch/job snapshots can be served from SQLite, and terminal jobs are cleaned up by retention.

Changes:

  • Adds JobStore abstractions, SQLite-backed persistence, and print-queue recovery/transition writes.
  • Adds GET /api/batches/{id} snapshot support with batch summary state.
  • Wires retention cleanup and expands unit/integration coverage for persistence, recovery, and snapshots.

Reviewed changes

Copilot reviewed 26 out of 27 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
docs/superpowers/specs/2026-05-31-phase-2-job-persistence-design.md Adds Phase 2 persistence design/spec.
docs/superpowers/plans/2026-05-31-phase-2-job-persistence.md Adds implementation plan and task breakdown.
backend/app/services/job_store.py Introduces JobStore protocol and memory implementation.
backend/app/services/job_store_sqlite.py Adds SQLite-backed JobStore.
backend/app/services/cleanup_task.py Adds retention cleanup background task.
backend/app/services/print_queue.py Persists queue state transitions and performs restart recovery.
backend/app/services/print_service.py Persists jobs before queue handoff.
backend/app/repositories/jobs.py Adds repository helpers for recovery, lookup, and cleanup.
backend/app/api/routes/batches.py Adds batch snapshot endpoint.
backend/app/api/routes/print.py Preserves string job IDs in API response.
backend/app/schemas/batch_read.py Adds batch snapshot response schemas.
backend/app/main.py Wires store, cleanup task, recovery renderer, and new route.
backend/app/config.py Adds job retention setting.
backend/tests/conftest.py Adds shared async session factory fixture.
backend/tests/unit/repositories/conftest.py Adds repository test DB fixtures.
backend/tests/unit/repositories/init.py Adds repository test package marker.
backend/tests/unit/repositories/test_jobs_phase2.py Tests new jobs repository helpers.
backend/tests/unit/services/test_job_store_memory.py Tests memory store behavior.
backend/tests/unit/services/test_job_store_sqlite.py Tests SQLite store behavior.
backend/tests/unit/services/test_cleanup_task.py Tests cleanup task lifecycle/fail-soft behavior.
backend/tests/unit/services/test_print_queue_persistence.py Tests queue persistence callbacks.
backend/tests/unit/services/test_print_service.py Updates print service tests for UUID/persistence flow.
backend/tests/integration/conftest.py Patches integration DB session binding.
backend/tests/integration/test_print_service_persistence.py Tests service-level DB persistence before queueing.
backend/tests/integration/test_print_queue_recovery.py Tests restart recovery behavior.
backend/tests/integration/test_batch_snapshot_endpoint.py Tests batch snapshot endpoint summaries/order.
backend/tests/integration/test_phase6b_sse_with_batch.py Adjusts printer setup for Phase 2 stub row behavior.

Comment on lines +689 to +690
# Phase 2: DB-State QUEUED->PRINTING persistieren (bridge: dataclass.id ist str)
await self._store.mark_printing(UUID(job.id))
Comment on lines +161 to +162
api_key_id=None, # TODO: aus AuthContext wenn Endpoint-Layer angepasst
source_ip=None, # TODO: aus AuthContext wenn Endpoint-Layer angepasst
Comment thread backend/app/main.py
Comment on lines +314 to +321
if db_printer_id is None:
# Wenn kein Host konfiguriert ist (Mock-Backend / CI), liefert
# upsert_runtime_printer None zurück und fügt keine Printer-Row ein.
# make_queue_printer erzeugt dann eine neue uuid4. Damit
# jobs.printer_id (FK → printers.id) bei save_queued nicht verletzt
# wird, legen wir hier eine Stub-Row an. slug wird auf str(id) gesetzt
# (eindeutig durch UUID), damit der UNIQUE-Constraint nicht verletzt wird.
_stub_slug = str(printer.id)
Private git.strausmann.de Links durch Text-Referenzen ersetzt.

Refs #93
@strausmann strausmann merged commit ef8fefd into main May 31, 2026
13 checks passed
@strausmann strausmann deleted the feat/phase-2-job-persistence branch May 31, 2026 17:21
github-actions Bot pushed a commit that referenced this pull request Jun 1, 2026
## 0.8.0 (2026-06-01)

* Merge pull request #94 from strausmann/feat/phase-2-job-persistence ([ef8fefd](ef8fefd)), closes [#94](#94)
* fix(batch): cancelled-Feld in BatchSummary ergänzt + Tests verschärft ([ddbe0d4](ddbe0d4)), closes [#93](#93)
* fix(cleanup-task): ruff/mypy strict-Blocker beheben ([71b2b28](71b2b28)), closes [#93](#93)
* fix(docs): private Domain-URLs aus Spec entfernt (Privacy-Scan CI) ([77e4450](77e4450)), closes [#93](#93)
* fix(job-store): Quality-Review-Findings I-1 bis I-4 behoben ([f2e2a4e](f2e2a4e)), closes [#93](#93)
* fix(plan): Phase-2-Plan adressiert alle Reviewer-Findings (C1–C6, M1–M8, m1) ([261fc85](261fc85)), closes [#93](#93)
* fix(print_queue): Recovery-Loop gegen KeyError, TemplateNotFoundError und Race absichern ([735a184](735a184)), closes [#93](#93)
* fix(queue): stop() persistiert PRINTING-Jobs als FAILED in DB ([002ef54](002ef54)), closes [#93](#93)
* fix(service): submit_print_job rollback + paused-jobs nicht persistieren ([ca88c52](ca88c52)), closes [#93](#93)
* style(tests): ruff-Befunde Phase 2 behoben (E501, I001, F401, C408, N806) ([16676fd](16676fd)), closes [#93](#93)
* refactor(main): Fix-Round Phase-2 Review (M-1, N-1, N-2) ([34f7e79](34f7e79)), closes [#93](#93)
* refactor(repo): jobs_repo Code-Quality-Findings Task 1 ([1b1cc1e](1b1cc1e)), closes [#93](#93)
* refactor(services): SQLiteJobStore Code-Quality-Findings Task 3 ([223a0cb](223a0cb)), closes [#93](#93)
* feat(api): GET /api/batches/{id} Snapshot-Endpoint (Phase 2) ([1344873](1344873)), closes [#93](#93)
* feat(lifespan): wire JobStore + CleanupTask in App-Startup (Phase 2) ([d027884](d027884)), closes [#93](#93)
* feat(queue): PrintQueue ruft JobStore bei jeder State-Transition (Phase 2) ([7aa9027](7aa9027)), closes [#93](#93)
* feat(queue): PrintQueue.start() Recovery (Phase 2) ([935a79a](935a79a)), closes [#93](#93)
* feat(repo): jobs_repo Helper für Phase 2 JobStore ([c80bda5](c80bda5)), closes [#93](#93)
* feat(service): PrintService persistiert Job-Row vor queue.submit (Phase 2) ([af20365](af20365)), closes [#93](#93)
* feat(services): CleanupTask + PRINTER_HUB_JOB_RETENTION_DAYS config (Phase 2) ([3c7a27d](3c7a27d)), closes [#93](#93)
* feat(services): JobStore Protocol + MemoryJobStore (Phase 2) ([d39c795](d39c795)), closes [#93](#93)
* feat(services): SQLiteJobStore delegiert an jobs_repo (Phase 2) ([d524859](d524859)), closes [#93](#93)
* docs(plan): Phase 2 Job Persistence Implementation Plan ([a8289c8](a8289c8)), closes [#93](#93)
* docs(spec): adressiere R1 Critical + Major Findings (Review 2026-05-31) ([6a94f11](6a94f11)), closes [#93](#93)
* docs(spec): Phase 2 — Job Persistence Design ([a59b187](a59b187)), closes [strausmann/hangar#81](https://github.com/strausmann/hangar/issues/81) [#93](#93)

[skip ci]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Phase 2] Jobs nicht in DB persistiert — Result-Page bleibt auf "ausstehend" obwohl Druck erfolgreich

2 participants