Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 121 additions & 27 deletions api/lfs-proxy/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ paths:
$ref: "#/components/schemas/LfsEnvelope"
example:
kfs_lfs: 1
bucket: kafscale-lfs
bucket: my-bucket
key: default/video-uploads/lfs/2026/02/05/abc123
size: 10485760
sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
Expand Down Expand Up @@ -399,10 +399,29 @@ paths:
description: |
Retrieves an LFS object from S3 storage. Supports two modes:

- **presign**: Returns a presigned S3 URL for direct download (default)
- **stream**: Streams the object content through the proxy
- **stream** (default): The proxy buffers the S3 object to temporary
storage while computing SHA-256, then verifies the digest against
the client-supplied envelope checksum. **Only on a successful match
are the bytes streamed to the client** (200 OK with `Content-Length`
set to the verified size). On mismatch — or if the S3 response
exceeds the envelope-declared size — the proxy returns 502 with
`code: integrity_failure` and no payload bytes ever reach the
client. An `integrity_failure` event is emitted to the LFS ops
tracker topic for operator alerting.

For presign mode, the URL TTL is capped by server configuration.
Requires `integrity.sha256` AND `integrity.size` in the request
body; the size enables a hard cap on the S3 read so a compromised
bucket cannot exhaust proxy temp storage.
- **presign**: Returns a presigned S3 URL for direct client-to-S3
download. Disabled by default; enable with
`KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true`. When enabled, the
response body includes an `integrity` block so client SDKs can
verify downloaded bytes themselves. Bare consumption of the
presigned URL (e.g. `curl`) opts out of integrity verification.

**Both modes require the client to supply `integrity.sha256` from the
Kafka envelope.** The Kafka envelope is authoritative; S3 is treated
as untrusted storage.
operationId: lfsDownload
security:
- ApiKeyAuth: []
Expand All @@ -423,19 +442,27 @@ paths:
schema:
$ref: "#/components/schemas/DownloadRequest"
examples:
stream:
summary: Stream content with integrity check (default)
value:
bucket: my-bucket
key: default/video-uploads/lfs/2026/02/05/abc123
mode: stream
integrity:
sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
checksum_alg: sha256
size: 10485760
presign:
summary: Get presigned URL
summary: Get presigned URL (requires KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true)
value:
bucket: kafscale-lfs
bucket: my-bucket
key: default/video-uploads/lfs/2026/02/05/abc123
mode: presign
expires_seconds: 300
stream:
summary: Stream content
value:
bucket: kafscale-lfs
key: default/video-uploads/lfs/2026/02/05/abc123
mode: stream
integrity:
sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
checksum_alg: sha256
size: 10485760
responses:
"200":
description: Presigned URL or streamed object content
Expand All @@ -445,15 +472,33 @@ paths:
$ref: "#/components/schemas/DownloadResponse"
example:
mode: presign
url: https://s3.amazonaws.com/kafscale-lfs/...
url: https://s3.amazonaws.com/my-bucket/...
expires_at: "2026-02-05T10:35:00Z"
integrity:
sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
checksum_alg: sha256
size: 10485760
application/octet-stream:
schema:
type: string
format: binary
description: Streamed object content (when mode=stream)
description: |
Streamed object content (when mode=stream), returned only
after server-side SHA-256 verification has succeeded.
Response headers `Content-Length`, `X-Kafscale-LFS-Checksum`
(`sha256=<hex>`) and `X-Kafscale-LFS-Content-Length` allow
clients to verify independently. On checksum mismatch or
oversize S3 payload, the proxy returns 502
`code: integrity_failure` instead of any body bytes.
"400":
description: Invalid request
description: |
Invalid request. Possible `code` values include
`missing_integrity` (no sha256 supplied), `invalid_integrity`
(sha256 is not a 64-character hex digest or size is negative),
`missing_integrity_size` (no size on stream mode),
`payload_too_large` (Integrity.Size exceeds proxy's
KAFSCALE_LFS_PROXY_MAX_BLOB_SIZE or cannot be verified safely),
`presign_disabled`, `invalid_mode`, `unsupported_checksum_alg`.
content:
application/json:
schema:
Expand All @@ -464,8 +509,21 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
"500":
description: |
Internal proxy failure. Possible `code` values:
`temp_storage_failed` (temporary verification storage unavailable
or full).
content:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
"502":
description: Upstream storage failure
description: |
Upstream storage failure or integrity verification failure.
Possible `code` values: `s3_get_failed`, `s3_presign_failed`,
`integrity_failure` (SHA-256 mismatch or S3 payload exceeds
envelope-declared size).
content:
application/json:
schema:
Expand Down Expand Up @@ -509,7 +567,7 @@ components:
bucket:
type: string
description: S3 bucket name
example: kafscale-lfs
example: my-bucket
key:
type: string
description: S3 object key
Expand Down Expand Up @@ -547,49 +605,85 @@ components:

DownloadRequest:
type: object
required: [bucket, key]
required: [bucket, key, integrity]
description: Request to download an LFS object
properties:
bucket:
type: string
description: S3 bucket name (must match proxy's configured bucket)
example: kafscale-lfs
example: my-bucket
key:
type: string
description: S3 object key from the LFS envelope
example: default/video-uploads/lfs/2026/02/05/abc123
mode:
type: string
enum: [presign, stream]
default: presign
enum: [stream, presign]
default: stream
description: |
Download mode:
- presign: Return a presigned URL for direct S3 download
- stream: Stream content through the proxy
- stream (default): Proxy buffers S3 → verifies SHA-256 against
`integrity.sha256` → only then streams to client. Mismatch or
oversize S3 payload → 502 `code: integrity_failure`. Requires
`integrity.sha256` AND `integrity.size`.
- presign: Return a presigned URL for direct S3 download. Requires
`KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true` on the proxy. Client
SDK must verify downloaded bytes against `integrity.sha256` in
the response.
expires_seconds:
type: integer
format: int32
default: 120
minimum: 1
maximum: 3600
description: Requested presign URL TTL in seconds (capped by server)
integrity:
$ref: "#/components/schemas/IntegrityClaim"

IntegrityClaim:
type: object
required: [sha256]
description: |
Authoritative integrity metadata carried from the Kafka-stored LFS
envelope. The proxy verifies S3 bytes against this claim (stream mode)
or echoes it back for client-side verification (presign mode).
properties:
sha256:
type: string
pattern: "^[0-9a-fA-F]{64}$"
description: SHA-256 hash of the object content, hex-encoded
example: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
checksum_alg:
type: string
enum: [sha256]
default: sha256
description: Checksum algorithm (only sha256 is supported)
size:
type: integer
format: int64
minimum: 1
description: Expected object size in bytes; required for stream mode
example: 10485760

DownloadResponse:
type: object
description: Response for presign download mode
required: [mode]
description: Response for download request
properties:
mode:
type: string
enum: [presign]
enum: [stream, presign]
description: Download mode used
url:
type: string
format: uri
description: Presigned S3 URL for direct download
description: Presigned S3 URL (presign mode only)
expires_at:
type: string
format: date-time
description: URL expiration timestamp
description: URL expiration timestamp (presign mode only)
integrity:
$ref: "#/components/schemas/IntegrityClaim"

UploadInitRequest:
type: object
Expand Down
8 changes: 8 additions & 0 deletions cmd/broker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,14 @@ func (f *failingS3Client) UploadIndex(ctx context.Context, key string, body []by
return errors.New("s3 unavailable")
}

func (f *failingS3Client) DeleteSegment(ctx context.Context, key string) error {
return errors.New("s3 unavailable")
}

func (f *failingS3Client) DeleteIndex(ctx context.Context, key string) error {
return errors.New("s3 unavailable")
}

func (f *failingS3Client) DownloadSegment(ctx context.Context, key string, rng *storage.ByteRange) ([]byte, error) {
return nil, errors.New("unsupported")
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/broker/s3_dual.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func (d *dualS3Client) UploadIndex(ctx context.Context, key string, body []byte)
return d.write.UploadIndex(ctx, key, body)
}

func (d *dualS3Client) DeleteSegment(ctx context.Context, key string) error {
return d.write.DeleteSegment(ctx, key)
}

func (d *dualS3Client) DeleteIndex(ctx context.Context, key string) error {
return d.write.DeleteIndex(ctx, key)
}

func (d *dualS3Client) DownloadSegment(ctx context.Context, key string, rng *storage.ByteRange) ([]byte, error) {
data, err := d.read.DownloadSegment(ctx, key, rng)
if err == nil {
Expand Down
29 changes: 28 additions & 1 deletion cmd/kafscale-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -191,18 +192,32 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3
return err
}

restoreCommitted := false
targetCreated := false
defer func() {
if restoreCommitted || !targetCreated {
return
}
if err := store.DeleteTopic(context.Background(), cfg.TargetTopic); err != nil {
if !errors.Is(err, metadata.ErrUnknownTopic) {
fmt.Fprintf(os.Stderr, "warning: failed to roll back target topic %s: %v\n", cfg.TargetTopic, err)
}
}
}()

if _, err := store.CreateTopic(ctx, metadata.TopicSpec{
Name: cfg.TargetTopic,
NumPartitions: sourceCfg.Partitions,
ReplicationFactor: int16(sourceCfg.ReplicationFactor),
}); err != nil {
return err
}
targetCreated = true

targetCfg := cloneTopicConfig(sourceCfg)
targetCfg.Name = cfg.TargetTopic
targetCfg.CreatedAt = time.Now().UTC().Format(time.RFC3339)
if err := store.UpdateTopicConfig(ctx, targetCfg); err != nil {
if err := persistTopicConfig(ctx, store, targetCfg); err != nil {
return err
}

Expand Down Expand Up @@ -244,6 +259,7 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3
for _, partition := range result.Partitions {
_, _ = fmt.Fprintf(stdout, "partition=%d segments=%d last_offset=%d\n", partition.Partition, partition.SegmentsCopied, partition.LastOffset)
}
restoreCommitted = true
return nil
}

Expand Down Expand Up @@ -286,6 +302,17 @@ func writePartitionStates(ctx context.Context, store *metadata.EtcdStore, topic
return nil
}

func persistTopicConfig(ctx context.Context, store *metadata.EtcdStore, cfg *metadatapb.TopicConfig) error {
payload, err := metadata.EncodeTopicConfig(cfg)
if err != nil {
return err
}
putCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
_, err = store.EtcdClient().Put(putCtx, metadata.TopicConfigKey(cfg.Name), string(payload))
return err
}

func cloneTopicConfig(cfg *metadatapb.TopicConfig) *metadatapb.TopicConfig {
if cfg == nil {
return nil
Expand Down
Loading
Loading