Skip to content

Add message queue / event-driven processing with Kafka#18

Merged
vladiant merged 6 commits into
mainfrom
add_message_queue
Apr 15, 2026
Merged

Add message queue / event-driven processing with Kafka#18
vladiant merged 6 commits into
mainfrom
add_message_queue

Conversation

@vladiant
Copy link
Copy Markdown
Collaborator

Summary

Introduces a full message broker layer to decouple image ingestion from processing, following the existing Clean Architecture patterns. Uploads optionally publish processing tasks to Apache Kafka; standalone consumer workers pull tasks and process images asynchronously. The feature includes end-to-end OpenTelemetry metrics for broker operations and a Grafana dashboard section for monitoring the pipeline.

Version

2.2.32.3.0 (MINOR — new feature, backward-compatible)

What changed

Domain Layer

  • New MessageBroker abstract port with publish(), consume(), close() methods and a Message dataclass

Application Layer

  • UploadImageUseCase extended with optional broker — publishes a processing event after persisting
  • New ConsumeProcessingTasksUseCase — pulls messages from the queue and delegates to ProcessImageUseCase
  • Both use cases instrumented with broker metrics (counters, error counters, processing-duration histogram)

Infrastructure Layer

  • KafkaMessageBroker adapter using aiokafka (lazy producer/consumer initialization, consumer groups)
  • InMemoryMessageBroker adapter using asyncio.Queue for testing/local dev
  • 5 new OTel metric instruments: broker_messages_published_total, broker_messages_consumed_total, broker_publish_errors_total, broker_consume_errors_total, broker_consumer_processing_duration_seconds

Presentation Layer

  • DI wiring updated: _broker() factory, get_consume_use_case(), upload wired with broker
  • Lifespan shutdown calls broker.close()

Worker

  • New worker.py — standalone Kafka consumer entrypoint with OTel metrics export on port 9090

Deployment

  • docker-compose.yml: Kafka (KRaft) service + worker service added
  • Minikube: 02a-kafka.yaml, 04a-worker-deployment.yaml, 04b-worker-metrics-service.yaml
  • Prometheus: scrape job for worker metrics endpoint
  • Grafana: 5 new panels in RED Metrics dashboard (publish/consume rates, broker errors, consumer latency, totals)
  • setup.sh / demo.sh updated for Kafka + worker

Configuration

New Variable Default Purpose
IMG_BROKER_ENABLED false Enable Kafka message broker
IMG_KAFKA_BOOTSTRAP_SERVERS localhost:9092 Kafka bootstrap servers
IMG_KAFKA_CONSUMER_GROUP image-processing Consumer group ID
IMG_WORKER_METRICS_PORT 9090 Worker Prometheus metrics port

Files changed

33 files changed, +1312 −15 lines

Status File Purpose
A message_broker.py MessageBroker port + Message dataclass
A consume_processing_tasks.py Consumer use case with metrics
M upload_image.py Optional broker publish + metrics
A kafka_message_broker.py Kafka adapter (aiokafka)
A in_memory_message_broker.py In-memory adapter (test/dev)
M metrics.py 5 broker metric instruments
A worker.py Standalone consumer worker with OTel
M config.py Broker/Kafka settings
M main.py Lifespan broker shutdown
M dependencies.py DI wiring for broker + consumer UC
A 02a-kafka.yaml Kafka KRaft StatelessSet
A 04a-worker-deployment.yaml Worker Deployment
A 04b-worker-metrics-service.yaml Worker metrics headless Service
M 01-prometheus.yaml Worker scrape target
M 06-grafana-dashboards.yaml Broker dashboard panels
M red-metrics.json Broker dashboard panels (file)
A test_consume_processing_tasks.py 5 consumer UC tests
A test_upload_with_broker.py 2 upload+broker tests
A test_broker_metrics.py 10 broker metrics tests
A test_in_memory_broker.py 5 in-memory broker tests
M conftest.py mock_broker fixture
M test_lifespan.py Broker patch in lifespan tests

Testing

  • 282 tests passing — all green, no external services required
  • New tests cover: consumer use case (5), upload+broker (2), broker metrics (10), in-memory broker (5)
  • Validated end-to-end in minikube: upload → Kafka → worker → completed, metrics visible in Prometheus and Grafana

Checklist

  • ruff check — all clean
  • mypy — no issues
  • pytest — 282 tests passing
  • CHANGELOG.md updated under [2.3.0]
  • Version bumped in pyproject.toml (2.2.3 → 2.3.0)
  • README.md and PROJECT_DESCRIPTION.md updated
  • Minikube end-to-end tested (Kafka + worker + metrics + Grafana dashboard)

@vladiant vladiant merged commit 0e63e92 into main Apr 15, 2026
4 checks passed
@vladiant vladiant deleted the add_message_queue branch April 15, 2026 21:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant