Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [2.3.0] - 2026-04-15

### Added

- Message broker abstraction (`MessageBroker` port) in the domain layer for
event-driven processing, decoupling image ingestion from processing.
- Kafka adapter (`KafkaMessageBroker`) using `aiokafka` for production
consumer-group-based processing of image tasks.
- In-memory adapter (`InMemoryMessageBroker`) for testing and local development
without external services.
- `ConsumeProcessingTasksUseCase` for pulling processing tasks from the message
queue and delegating to the existing `ProcessImageUseCase`.
- `UploadImageUseCase` now optionally publishes a processing event to the
message broker after persisting, enabling fully async processing.
- Standalone Kafka consumer worker entrypoint (`python -m src.worker`) for
horizontal scaling via consumer groups.
- Configuration settings: `IMG_BROKER_ENABLED`, `IMG_KAFKA_BOOTSTRAP_SERVERS`,
`IMG_KAFKA_CONSUMER_GROUP`.
- `aiokafka` dependency added to `requirements.txt` and `pyproject.toml`.
- Tests for `InMemoryMessageBroker`, `ConsumeProcessingTasksUseCase`, and
upload-with-broker event publishing.
- Shared `mock_broker` fixture in `tests/conftest.py`.
- OpenTelemetry metrics for message broker operations: `broker_messages_published_total`,
`broker_messages_consumed_total`, `broker_publish_errors_total`,
`broker_consume_errors_total`, and `broker_consumer_processing_duration_seconds`.
- Publish and consume use cases instrumented with broker metrics (counters,
error counters, and processing-duration histogram).
- Tests for all broker metrics recording paths (publish and consume).

## [2.2.3] - 2026-04-04

### Added
Expand Down
16 changes: 12 additions & 4 deletions PROJECT_DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ The system is built according to strict **Clean Architecture** principles with f
└─────────────────────────────────────────────────────────────────┘
```

- **Domain Layer** defines pure business entities (`Image`, `RetentionPolicy`) and abstract ports (`ImageProcessor`, `ImageRepository`, `ImageStorage`) with zero external dependencies.
- **Application Layer** contains use cases (`UploadImage`, `ProcessImage`, `GetImage`, `ListImages`, `ApplyRetention`) that orchestrate domain logic through ports, using frozen dataclass DTOs for data transfer.
- **Domain Layer** defines pure business entities (`Image`, `RetentionPolicy`) and abstract ports (`ImageProcessor`, `ImageRepository`, `ImageStorage`, `MessageBroker`) with zero external dependencies.
- **Application Layer** contains use cases (`UploadImage`, `ProcessImage`, `GetImage`, `ListImages`, `ApplyRetention`, `ConsumeProcessingTasks`) that orchestrate domain logic through ports, using frozen dataclass DTOs for data transfer.
- **Infrastructure Layer** provides concrete adapters: PostgreSQL repository via async SQLAlchemy, Pillow-based image processor with `ProcessPoolExecutor` parallelism, and a local file storage backend.
- **Presentation Layer** exposes FastAPI routes with Pydantic validation, request logging middleware, and a dedicated dependency injection module that wires infrastructure into use cases.

Expand All @@ -40,6 +40,7 @@ Each layer depends only inward — infrastructure and presentation never leak in
The service implements **high-performance pipelines** for processing large amounts of image data:

- **CPU-bound parallelism**: Thumbnail generation and metadata extraction are offloaded to a `ProcessPoolExecutor`, keeping the async event loop fully responsive under heavy load. Pure synchronous Pillow functions run in worker processes while the main thread continues handling requests.
- **Event-driven processing via message queue**: Image ingestion is decoupled from processing through a `MessageBroker` domain port. When enabled, uploads publish processing tasks to **Apache Kafka**. Standalone consumer workers (`python -m src.worker`) pull tasks from the queue and delegate to the existing `ProcessImageUseCase`. Multiple worker replicas share a Kafka consumer group for automatic partition assignment and horizontal scaling — the production pattern for high-throughput pipelines. An in-memory broker adapter is included for testing and local development without external services.
- **Bounded batch concurrency**: A batch processing pipeline uses `asyncio.Semaphore` combined with `asyncio.gather` to process multiple images concurrently with a configurable concurrency limit (1–32), preventing resource exhaustion while maximizing throughput.
- **Non-blocking file I/O**: All storage operations (`store`, `retrieve`, `delete`) use `asyncio.to_thread()` so that disk reads/writes never block the event loop.
- **Content-addressed storage**: Uploaded files are stored with a SHA256-based filename prefix, enabling deduplication and safe concurrent writes.
Expand Down Expand Up @@ -139,7 +140,8 @@ The service is fully instrumented with **OpenTelemetry** for distributed tracing
- **Duration**: `http_request_duration_seconds` — request latency histogram.
- **Saturation**: `http_active_requests` — in-flight request gauge.
- **Image processing**: `image_processing_duration_seconds`, `image_uploads_total`, `images_currently_processing`.
- Metrics exposed via a Prometheus `/metrics` endpoint, scraped by **Prometheus**.
- **Message broker**: `broker_messages_published_total`, `broker_messages_consumed_total` (publish/consume throughput by topic), `broker_publish_errors_total`, `broker_consume_errors_total` (error counts by topic and reason), `broker_consumer_processing_duration_seconds` (end-to-end consume-to-process latency histogram).
- Metrics exposed via a Prometheus `/metrics` endpoint on the API service (port 8000) and the Kafka worker (port 9090), scraped by **Prometheus**.

### Logging

Expand All @@ -153,7 +155,7 @@ Three dashboards are provisioned automatically:

| Dashboard | Description |
|-----------|-------------|
| **RED Metrics** | Request rate, error rate, latency percentiles, saturation gauges |
| **RED Metrics** | Request rate, error rate, latency percentiles, saturation gauges, message broker publish/consume rates, broker errors, consumer processing latency |
| **Traces** | Service map, recent traces with clickable trace IDs, duration distribution |
| **Logs** | Application logs, log volume by level, error log filter |

Expand Down Expand Up @@ -188,6 +190,10 @@ All settings are provided via environment variables (prefix `IMG_`) using **pyda
| `IMG_CORS_ORIGINS` | `[]` | Allowed CORS origins (e.g. `["http://localhost:3000"]`; empty = disabled) |
| `IMG_CORS_ALLOW_METHODS` | `["GET","POST","PUT","DELETE","OPTIONS"]` | Allowed HTTP methods for CORS |
| `IMG_CORS_ALLOW_HEADERS` | `["*"]` | Allowed headers for CORS |
| `IMG_BROKER_ENABLED` | `false` | Enable Kafka message broker for async processing |
| `IMG_KAFKA_BOOTSTRAP_SERVERS` | `localhost:9092` | Kafka bootstrap servers |
| `IMG_KAFKA_CONSUMER_GROUP` | `image-processing` | Kafka consumer group ID |
| `IMG_WORKER_METRICS_PORT` | `9090` | Prometheus metrics port for the Kafka worker |
| `IMG_DEBUG` | `false` | Enable debug logging |
| `IMG_OTEL_ENABLED` | `false` | Enable OpenTelemetry instrumentation |
| `IMG_OTEL_EXPORTER_OTLP_ENDPOINT` | `http://localhost:4317` | OTLP gRPC endpoint for trace export |
Expand Down Expand Up @@ -230,6 +236,7 @@ Test tooling: pytest, pytest-asyncio (auto mode), httpx, aiosqlite (in-memory SQ
| Async/Await | Entire codebase is asynchronous — non-blocking I/O throughout |
| ProcessPoolExecutor | CPU-bound image work dispatched to worker processes |
| Bounded Concurrency | `asyncio.Semaphore` + `asyncio.gather` for batch processing |
| Event-Driven Processing | Message broker port with Kafka adapter; consumer groups for scaling |
| Connection Pooling | Async SQLAlchemy pool with configurable size and overflow |
| Partial Indexes | Database index on `expires_at` for non-null values only |
| Multi-Stage Docker | Builder/runtime separation for minimal production images |
Expand All @@ -249,6 +256,7 @@ Test tooling: pytest, pytest-asyncio (auto mode), httpx, aiosqlite (in-memory SQ
| **Web Framework** | FastAPI, Uvicorn (ASGI) |
| **Database** | PostgreSQL 16, SQLAlchemy 2.0 (async), asyncpg |
| **Image Processing** | Pillow, pybind11 (C++ bridge) |
| **Messaging** | Apache Kafka (aiokafka), in-memory queue (dev/test) |
| **Validation** | Pydantic v2, pydantic-settings |
| **Containerization** | Docker (multi-stage), Docker Compose |
| **Orchestration** | Kubernetes, Minikube (local demo) |
Expand Down
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ docker compose up --build
# Swagger docs at http://localhost:8000/docs
```

### Kafka Consumer Worker

When `IMG_BROKER_ENABLED=true`, uploads publish processing tasks to Kafka.
Run the standalone consumer worker to process them:

```bash
export IMG_BROKER_ENABLED=true
export IMG_KAFKA_BOOTSTRAP_SERVERS=localhost:9092
python -m src.worker
```

Scale horizontally by running multiple worker replicas — they share a Kafka
consumer group for automatic partition assignment.

The Docker entrypoint runs `alembic upgrade head` before starting Uvicorn,
so database migrations are applied automatically on every deployment.

Expand Down Expand Up @@ -69,6 +83,10 @@ minikube service grafana --namespace=observability # open Grafana

See [minikube/observability/README.md](minikube/observability/README.md) for dashboards and configuration.

The RED Metrics dashboard includes a **Message Broker** section with panels for
publish/consume rates, broker errors, consumer processing latency percentiles,
and running totals. The worker exposes its own Prometheus endpoint on port 9090.

### Kubernetes (Production)

```bash
Expand Down Expand Up @@ -127,6 +145,10 @@ All settings via environment variables (prefix `IMG_`), validated by [pydantic-s
| `IMG_CORS_ORIGINS` | `[]` | Allowed CORS origins (e.g. `["http://localhost:3000"]`; empty = disabled) |
| `IMG_CORS_ALLOW_METHODS` | `["GET","POST","PUT","DELETE","OPTIONS"]` | Allowed HTTP methods for CORS |
| `IMG_CORS_ALLOW_HEADERS` | `["*"]` | Allowed headers for CORS |
| `IMG_BROKER_ENABLED` | `false` | Enable Kafka message broker for async processing |
| `IMG_KAFKA_BOOTSTRAP_SERVERS` | `localhost:9092` | Kafka bootstrap servers |
| `IMG_KAFKA_CONSUMER_GROUP` | `image-processing` | Kafka consumer group ID |
| `IMG_WORKER_METRICS_PORT` | `9090` | Prometheus metrics port for the Kafka worker |
| `IMG_DEBUG` | `false` | Enable debug logging |
| `IMG_OTEL_ENABLED` | `false` | Enable OpenTelemetry instrumentation |
| `IMG_OTEL_EXPORTER_OTLP_ENDPOINT` | `http://localhost:4317` | OTLP gRPC endpoint for trace export |
Expand All @@ -138,9 +160,11 @@ All settings via environment variables (prefix `IMG_`), validated by [pydantic-s
src/
├── config.py # 12-factor configuration
├── main.py # FastAPI app factory + lifespan
├── worker.py # Standalone Kafka consumer worker
├── domain/ # Entities & ports (zero external deps)
├── application/ # Use cases & DTOs
├── infrastructure/ # Adapters (PostgreSQL, Pillow, filesystem)
│ ├── messaging/ # Message broker adapters (Kafka, in-memory)
│ └── observability/ # OpenTelemetry setup, metrics, middleware
└── presentation/ # FastAPI routes, schemas, middleware

Expand Down
49 changes: 49 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,40 @@ services:
IMG_DB_NAME: "${POSTGRES_DB:-images}"
IMG_STORAGE_BASE_DIR: "/data/images"
IMG_PROCESSING_MAX_WORKERS: "4"
IMG_BROKER_ENABLED: "true"
IMG_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
IMG_KAFKA_CONSUMER_GROUP: "image-processing"
IMG_DEBUG: "true"
volumes:
- image_data:/data/images
depends_on:
db:
condition: service_healthy
kafka:
condition: service_healthy

worker:
build: .
command: ["python", "-m", "src.worker"]
environment:
IMG_DB_USER: "${POSTGRES_USER:-postgres}"
IMG_DB_PASSWORD: "${POSTGRES_PASSWORD:?Set POSTGRES_PASSWORD}"
IMG_DB_HOST: "db"
IMG_DB_PORT: "5432"
IMG_DB_NAME: "${POSTGRES_DB:-images}"
IMG_STORAGE_BASE_DIR: "/data/images"
IMG_PROCESSING_MAX_WORKERS: "4"
IMG_BROKER_ENABLED: "true"
IMG_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
IMG_KAFKA_CONSUMER_GROUP: "image-processing"
IMG_DEBUG: "true"
volumes:
- image_data:/data/images
depends_on:
db:
condition: service_healthy
kafka:
condition: service_healthy

db:
image: postgres:16-alpine
Expand All @@ -34,6 +62,27 @@ services:
timeout: 5s
retries: 5

kafka:
image: apache/kafka:3.9.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@localhost:9093"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
healthcheck:
test: ["CMD-SHELL", "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 > /dev/null 2>&1"]
interval: 10s
timeout: 10s
retries: 10
start_period: 30s

volumes:
pg_data:
image_data:
3 changes: 3 additions & 0 deletions minikube/01-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ data:
IMG_DB_POOL_SIZE: "5"
IMG_DB_MAX_OVERFLOW: "10"
IMG_RETENTION_BATCH_SIZE: "50"
IMG_BROKER_ENABLED: "true"
IMG_KAFKA_BOOTSTRAP_SERVERS: "kafka-svc:9092"
IMG_KAFKA_CONSUMER_GROUP: "image-processing"
IMG_DEBUG: "true"
IMG_OTEL_ENABLED: "true"
IMG_OTEL_EXPORTER_OTLP_ENDPOINT: "http://tempo.observability.svc.cluster.local:4317"
Expand Down
89 changes: 89 additions & 0 deletions minikube/02a-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Single-node Kafka (KRaft mode — no Zookeeper) for minikube demo
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-pvc
namespace: cv-platform
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka
namespace: cv-platform
labels:
app: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: apache/kafka:3.9.0
ports:
- containerPort: 9092
env:
- name: KAFKA_NODE_ID
value: "1"
- name: KAFKA_PROCESS_ROLES
value: "broker,controller"
- name: KAFKA_LISTENERS
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://kafka-svc:9092"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "1@localhost:9093"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_LOG_DIRS
value: "/var/lib/kafka/data"
- name: CLUSTER_ID
value: "MkU3OEVBNTcwNTJENDM2Qk"
resources:
requests:
cpu: "100m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "768Mi"
readinessProbe:
tcpSocket:
port: 9092
initialDelaySeconds: 20
periodSeconds: 10
volumeMounts:
- name: kafka-data
mountPath: /var/lib/kafka/data
volumes:
- name: kafka-data
persistentVolumeClaim:
claimName: kafka-pvc
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
namespace: cv-platform
spec:
selector:
app: kafka
ports:
- port: 9092
targetPort: 9092
type: ClusterIP
60 changes: 60 additions & 0 deletions minikube/04a-worker-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Kafka consumer worker — processes images from the message queue
apiVersion: apps/v1
kind: Deployment
metadata:
name: image-worker
namespace: cv-platform
labels:
app: image-worker
spec:
replicas: 1
selector:
matchLabels:
app: image-worker
template:
metadata:
labels:
app: image-worker
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
prometheus.io/path: "/metrics"
spec:
securityContext:
runAsNonRoot: true
runAsUser: 999
runAsGroup: 999
containers:
- name: image-worker
image: image-service:latest
imagePullPolicy: Never
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
command: ["python", "-m", "src.worker"]
ports:
- containerPort: 9090
name: metrics
envFrom:
- configMapRef:
name: image-service-config
- secretRef:
name: image-service-db-credentials
resources:
requests:
cpu: "100m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "1Gi"
volumeMounts:
- name: image-data
mountPath: /data/images
- name: tmp
mountPath: /tmp
volumes:
- name: image-data
persistentVolumeClaim:
claimName: image-data-pvc
- name: tmp
emptyDir: {}
Loading
Loading