Skip to content

Haslab-dev/BuildYourOwn-Kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

14 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Kafka-Compatible Broker in Go

A lightweight Kafka-compatible broker built from scratch in Go. Implements enough of the Kafka wire protocol to handle Produce and Fetch operations with disk persistence.

Architecture

Kafka Client
     |
TCP Binary Protocol (port 9092)
     |
+----------------------+
| Go Broker            |
+----------------------+
| Frame Decoder        |  Read/write Kafka wire frames
| Request Dispatcher   |  Route by API key + version
| API Handlers         |  ApiVersions, DescribeTopicPartitions, Fetch, Produce
| Metadata Manager     |  Topic/partition registry
| Storage Engine       |  Append-only log files on disk
+----------------------+
     |
Disk (data/)

Project Structure

cmd/broker/         Entrypoint
internal/
  broker/           TCP server, connection handler, request dispatcher
  protocol/         Binary encoder/decoder (Kafka wire format)
  types/            Shared types and Kafka error codes
  metadata/         Topic and partition registry
  storage/          Append-only log storage engine
  api/              Kafka API handlers
    api_versions.go
    describe_topic_partitions.go
    fetch.go
    produce.go
client/             Bun JS test client

Quick Start

# Build the broker
go build -o bin/broker ./cmd/broker

# Start the broker (listens on :9092)
./bin/broker

# Run the test client (from another terminal)
cd client
bun run test.ts      # ApiVersions + DescribeTopicPartitions tests
bun run produce.ts   # Produce 3 test messages
bun run fetch.ts     # Fetch and display the messages

Storage Layout

data/
  test-topic/
    0.log            # Append-only log for partition 0
  multi-partition-topic/
    0.log
    1.log
    2.log

Each .log file stores records sequentially with a 4-byte length prefix:

[4-byte size][record data][4-byte size][record data]...

Record data format: [8-byte timestamp][4-byte keyLen][key bytes][4-byte valLen][value bytes]

Development Stages

Stage 0 - Project Foundation

Set up Go module and internal packages:

  • internal/types - Kafka error codes, API key constants, record/batch types
  • internal/protocol - Binary reader/writer for Kafka wire format (big-endian, compact strings, uvarints, tagged fields)
  • internal/storage - Append-only log storage with offset-based reads
  • internal/metadata - Thread-safe topic/partition registry

Stage 1 - Bind to Port

TCP server that:

  • Listens on configurable port (default 9092)
  • Accepts TCP connections
  • Handles client disconnects gracefully
  • Supports SIGINT/SIGTERM shutdown

Stage 2-4 - Request Header Parsing

  • Parse Kafka request frames (4-byte length prefix + payload)
  • Extract correlation ID, client ID from request headers
  • Route requests by API key (0=Produce, 1=Fetch, 18=ApiVersions, 85=DescribeTopicPartitions)
  • Handle both legacy (v0-v2) and compact (v3+) header formats

Stage 5 - ApiVersions Handler

Kafka capability negotiation:

  • Returns supported API keys with min/max version ranges
  • Supports compact format (v3+) with tagged fields
  • Supports legacy format (v0-v2)
  • Advertises: Produce(0-9), Fetch(0-13), ApiVersions(0-4), DescribeTopicPartitions(0-4)

Stage 6-8 - Concurrent Client Handling

  • Goroutine per connection for concurrent clients
  • Semaphore-limited (8) concurrent in-flight requests per connection
  • Mutex-protected response writing for ordering
  • WaitGroup-based graceful shutdown

Stage 9-14 - DescribeTopicPartitions (Metadata)

Topic and partition discovery:

  • Topic registry with configurable partition count
  • Returns partition metadata (ID, leader, next offset)
  • Handles unknown topics with UNKNOWN_TOPIC_OR_PARTITION error
  • Supports single and multiple partition responses
  • Supports multi-topic queries
  • Pre-registered topics: test-topic (1 partition), multi-partition-topic (3 partitions)

Stage 15-21 - Fetch API

Message consumption from disk:

  • Parses Fetch v12+ compact format requests
  • Reads records from append-only log by offset
  • Supports batched reads with maxBytes limit
  • Returns records in Kafka RecordBatch format (v2 magic)
  • Handles unknown/empty topics with proper error codes
  • Supports offset-based replay

Stage 22-29 - Produce API

Message production to disk:

  • Parses Produce v3+ compact format requests
  • Decodes Kafka RecordBatch (v2 magic) with zigzag varint records
  • Appends records to partition log files
  • Syncs to disk after each write for durability
  • Returns base offset for each produced batch
  • Supports multi-partition and multi-topic routing
  • Auto-assigns timestamps when missing

Kafka Wire Protocol Details

The broker handles these wire format aspects:

  • Big-endian binary encoding for all fixed-width types
  • Compact strings/arrays (uvarint length + 1, tagged fields) for newer API versions
  • Legacy strings/arrays (int16/int32 length) for older API versions
  • Zigzag varints for record encoding (timestamp delta, offset delta, key/value lengths)
  • Uvarints for unsigned variable-length integers
  • RecordBatch v2 format for both produce and fetch

Bun JS Test Client

The client/ directory contains a TypeScript test client for Bun:

File Description
kafka-client.ts Binary protocol encoder/decoder, connection utilities
test.ts ApiVersions, DescribeTopicPartitions, unknown topic tests
produce.ts Produces 3 test messages to test-topic
fetch.ts Fetches and displays all messages from test-topic
examples.ts Full-scale Enterprise Demonstration Suite (Saga, Event Sourcing, IoT)

Advanced Production Demonstration Suite

To demonstrate how this DIY Kafka broker can be integrated into production-grade distributed applications, client/examples.ts implements a complete suite of enterprise architecture patterns and streaming workflows.

πŸ›‘οΈ Core Enterprise Design Patterns

  1. Defensive Parsing & "Poison Pill" Mitigation

    • Problem: When multiple schemas share a topic, or historical messages drift, a consumer can encounter a malformed record (a "poison pill") and crash.
    • Solution: We implemented safeDeserialize(), a defensive utility that intercepts malformed JSON or payload schema mismatches, logs a warning, and continues consuming the stream rather than crashing.
  2. Topic High-Water Mark Isolation

    • Problem: The DIY broker only supports pre-registered topics (test-topic and multi-partition-topic). All examples must share the same topic, which can cause cross-scenario pollution.
    • Solution: examples.ts calls getTopicHighWatermark() to obtain the current maximum offset before starting a scenario. Each consumer then polls starting strictly at that offset, completely isolating scenario executions.
  3. Event Envelope Standard

    • All events are wrapped in a standard enterprise structure with metadata:
      • eventId (Unique tracking identifier)
      • eventType (Used for routing and filtering by the consumer)
      • timestamp (Epoch time)
      • correlationId (To trace distributed workflows across systems)
      • sender (Source service identifier)

πŸš€ Demonstration Scenarios

  1. Scenario 1: E-Commerce Microservices Event Saga

    • Choreography: order-service publishes OrderPlaced events ➑️ payment-service consumes them, bills the customer, and produces PaymentProcessed ➑️ notification-service reads payment confirmations and triggers user alerts.
    • Resilience: An invalid JSON "poison pill" is explicitly written to the topic. The payment consumer handles the crash gracefully, skips it, and successfully processes subsequent items.
  2. Scenario 2: Distributed Trace Log Streamer

    • Auth and DB microservices ship logs to test-topic. A Log Concentrator filters and reports high-priority ERROR entries with correlation details, while skipping lower-priority telemetry logs.
  3. Scenario 3: Audit Ledger and Event Sourcing

    • Financial ledger recording account opening, deposit, withdrawal, and transfer transactions. The ledger replays these events chronologically to reconstruct exact, up-to-date bank account balances.
  4. Scenario 4: High-Throughput IoT Telemetry Processing

    • Sensors emit temperature readings. A streaming analytics consumer monitors temp statistics, calculates rolling averages, and fires warnings when temperatures exceed threshold safety margins.
  5. Scenario 5: Broker Topology & Protocols Audit

    • Directly queries the broker for its API support protocols and discovers leader/replica metadata for all partition configurations on multi-partition-topic (3 partitions).

πŸ“Š Network Execution Breakdown (Client vs. Broker Logs)

The binary protocol exchanges between the Bun TypeScript client and the Go broker align with 100% precision. Below is the mapping of network frames recorded on the broker console alongside the client runner:

Client Side Output:

$ bun run examples.ts

🏁 Starting Kafka Client Advanced Scenarios Demonstration Suite...

================================================================================
 πŸš€ Scenario 1: E-Commerce Microservices Event Saga
================================================================================
  [Setup] Topic starting high-water mark offset is: 62

πŸ‘‰ Step 1: Order Service placing customer orders
  βœ… Order Placed -> ORD-101 (Customer: Alice) | Offset: 62
  βœ… Order Placed -> ORD-102 (Customer: Bob) | Offset: 63
  βœ… Order Placed -> ORD-103 (Customer: Charlie) | Offset: 64
  [Demo] Injecting a corrupted poison-pill event to demonstrate system resilience...

πŸ‘‰ Step 2: Payment Service reacting to orders & processing credit cards
  πŸ’³ Processing payment for order ORD-101 ($129.98)
     ↳ Transact success: TXN-719276 | Produced to 'test-topic' Offset: 66
  πŸ’³ Processing payment for order ORD-102 ($449.99)
     ↳ Transact success: TXN-343540 | Produced to 'test-topic' Offset: 67
  πŸ’³ Processing payment for order ORD-103 ($39.5)
     ↳ Transact success: TXN-410569 | Produced to 'test-topic' Offset: 68

πŸ‘‰ Step 3: Notification Service compiling records for checkout status updates
  βœ‰οΈ  Email Alert Sent: "Hey customer, payment for order ORD-101 was processed successfully (Txn: TXN-719276)!"
  βœ‰οΈ  Email Alert Sent: "Hey customer, payment for order ORD-102 was processed successfully (Txn: TXN-343540)!"
  βœ‰οΈ  Email Alert Sent: "Hey customer, payment for order ORD-103 was processed successfully (Txn: TXN-410569)!"

... [All scenarios completed successfully without errors]

Broker Side Network Trace:

2026/05/28 17:09:02 client connected: 127.0.0.1:57540 (active: 1)
2026/05/28 17:09:02 client connected: 127.0.0.1:57541 (active: 2)
2026/05/28 17:09:02 client connected: 127.0.0.1:57542 (active: 3)
2026/05/28 17:09:02 request: api=1 version=13 correlation=1 client=order-service
2026/05/28 17:09:02 request: api=0 version=9 correlation=1 client=order-service  (Produce x4)
2026/05/28 17:09:02 request: api=1 version=13 correlation=1 client=payment-service (Fetch orders)
2026/05/28 17:09:02 request: api=0 version=9 correlation=1 client=payment-service  (Produce Txn events)
2026/05/28 17:09:02 request: api=1 version=13 correlation=1 client=notification-service (Fetch updates)
...
  • API Key 0 (Produce): Used to write events durably to disk.
  • API Key 1 (Fetch): Used to read/poll new records starting at the computed watermark.
  • API Key 18 (ApiVersions) & API Key 85 (DescribeTopicPartitions): Used in topology discovery to audit leader brokers and mapping states.

Limitations (V1)

  • No replication or Raft consensus
  • No consumer groups
  • No transactions or exactly-once semantics
  • No TLS/SASL authentication
  • No compression support
  • No log compaction
  • Single-segment per partition (no rolling segments)
  • Topics must be pre-registered at startup

About

A lightweight Kafka-compatible broker built from scratch in Go. Implements enough of the Kafka wire protocol to handle Produce and Fetch operations with disk persistence.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors