diff --git a/api/lfs-proxy/openapi.yaml b/api/lfs-proxy/openapi.yaml index d330dec4..ebb6c53a 100644 --- a/api/lfs-proxy/openapi.yaml +++ b/api/lfs-proxy/openapi.yaml @@ -152,7 +152,7 @@ paths: $ref: "#/components/schemas/LfsEnvelope" example: kfs_lfs: 1 - bucket: kafscale-lfs + bucket: my-bucket key: default/video-uploads/lfs/2026/02/05/abc123 size: 10485760 sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 @@ -399,10 +399,29 @@ paths: description: | Retrieves an LFS object from S3 storage. Supports two modes: - - **presign**: Returns a presigned S3 URL for direct download (default) - - **stream**: Streams the object content through the proxy + - **stream** (default): The proxy buffers the S3 object to temporary + storage while computing SHA-256, then verifies the digest against + the client-supplied envelope checksum. **Only on a successful match + are the bytes streamed to the client** (200 OK with `Content-Length` + set to the verified size). On mismatch — or if the S3 response + exceeds the envelope-declared size — the proxy returns 502 with + `code: integrity_failure` and no payload bytes ever reach the + client. An `integrity_failure` event is emitted to the LFS ops + tracker topic for operator alerting. - For presign mode, the URL TTL is capped by server configuration. + Requires `integrity.sha256` AND `integrity.size` in the request + body; the size enables a hard cap on the S3 read so a compromised + bucket cannot exhaust proxy temp storage. + - **presign**: Returns a presigned S3 URL for direct client-to-S3 + download. Disabled by default; enable with + `KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true`. When enabled, the + response body includes an `integrity` block so client SDKs can + verify downloaded bytes themselves. Bare consumption of the + presigned URL (e.g. `curl`) opts out of integrity verification. + + **Both modes require the client to supply `integrity.sha256` from the + Kafka envelope.** The Kafka envelope is authoritative; S3 is treated + as untrusted storage. operationId: lfsDownload security: - ApiKeyAuth: [] @@ -423,19 +442,27 @@ paths: schema: $ref: "#/components/schemas/DownloadRequest" examples: + stream: + summary: Stream content with integrity check (default) + value: + bucket: my-bucket + key: default/video-uploads/lfs/2026/02/05/abc123 + mode: stream + integrity: + sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: sha256 + size: 10485760 presign: - summary: Get presigned URL + summary: Get presigned URL (requires KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true) value: - bucket: kafscale-lfs + bucket: my-bucket key: default/video-uploads/lfs/2026/02/05/abc123 mode: presign expires_seconds: 300 - stream: - summary: Stream content - value: - bucket: kafscale-lfs - key: default/video-uploads/lfs/2026/02/05/abc123 - mode: stream + integrity: + sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: sha256 + size: 10485760 responses: "200": description: Presigned URL or streamed object content @@ -445,15 +472,33 @@ paths: $ref: "#/components/schemas/DownloadResponse" example: mode: presign - url: https://s3.amazonaws.com/kafscale-lfs/... + url: https://s3.amazonaws.com/my-bucket/... expires_at: "2026-02-05T10:35:00Z" + integrity: + sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: sha256 + size: 10485760 application/octet-stream: schema: type: string format: binary - description: Streamed object content (when mode=stream) + description: | + Streamed object content (when mode=stream), returned only + after server-side SHA-256 verification has succeeded. + Response headers `Content-Length`, `X-Kafscale-LFS-Checksum` + (`sha256=`) and `X-Kafscale-LFS-Content-Length` allow + clients to verify independently. On checksum mismatch or + oversize S3 payload, the proxy returns 502 + `code: integrity_failure` instead of any body bytes. "400": - description: Invalid request + description: | + Invalid request. Possible `code` values include + `missing_integrity` (no sha256 supplied), `invalid_integrity` + (sha256 is not a 64-character hex digest or size is negative), + `missing_integrity_size` (no size on stream mode), + `payload_too_large` (Integrity.Size exceeds proxy's + KAFSCALE_LFS_PROXY_MAX_BLOB_SIZE or cannot be verified safely), + `presign_disabled`, `invalid_mode`, `unsupported_checksum_alg`. content: application/json: schema: @@ -464,8 +509,21 @@ paths: application/json: schema: $ref: "#/components/schemas/ErrorResponse" + "500": + description: | + Internal proxy failure. Possible `code` values: + `temp_storage_failed` (temporary verification storage unavailable + or full). + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" "502": - description: Upstream storage failure + description: | + Upstream storage failure or integrity verification failure. + Possible `code` values: `s3_get_failed`, `s3_presign_failed`, + `integrity_failure` (SHA-256 mismatch or S3 payload exceeds + envelope-declared size). content: application/json: schema: @@ -509,7 +567,7 @@ components: bucket: type: string description: S3 bucket name - example: kafscale-lfs + example: my-bucket key: type: string description: S3 object key @@ -547,25 +605,31 @@ components: DownloadRequest: type: object - required: [bucket, key] + required: [bucket, key, integrity] description: Request to download an LFS object properties: bucket: type: string description: S3 bucket name (must match proxy's configured bucket) - example: kafscale-lfs + example: my-bucket key: type: string description: S3 object key from the LFS envelope example: default/video-uploads/lfs/2026/02/05/abc123 mode: type: string - enum: [presign, stream] - default: presign + enum: [stream, presign] + default: stream description: | Download mode: - - presign: Return a presigned URL for direct S3 download - - stream: Stream content through the proxy + - stream (default): Proxy buffers S3 → verifies SHA-256 against + `integrity.sha256` → only then streams to client. Mismatch or + oversize S3 payload → 502 `code: integrity_failure`. Requires + `integrity.sha256` AND `integrity.size`. + - presign: Return a presigned URL for direct S3 download. Requires + `KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true` on the proxy. Client + SDK must verify downloaded bytes against `integrity.sha256` in + the response. expires_seconds: type: integer format: int32 @@ -573,23 +637,53 @@ components: minimum: 1 maximum: 3600 description: Requested presign URL TTL in seconds (capped by server) + integrity: + $ref: "#/components/schemas/IntegrityClaim" + + IntegrityClaim: + type: object + required: [sha256] + description: | + Authoritative integrity metadata carried from the Kafka-stored LFS + envelope. The proxy verifies S3 bytes against this claim (stream mode) + or echoes it back for client-side verification (presign mode). + properties: + sha256: + type: string + pattern: "^[0-9a-fA-F]{64}$" + description: SHA-256 hash of the object content, hex-encoded + example: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: + type: string + enum: [sha256] + default: sha256 + description: Checksum algorithm (only sha256 is supported) + size: + type: integer + format: int64 + minimum: 1 + description: Expected object size in bytes; required for stream mode + example: 10485760 DownloadResponse: type: object - description: Response for presign download mode + required: [mode] + description: Response for download request properties: mode: type: string - enum: [presign] + enum: [stream, presign] description: Download mode used url: type: string format: uri - description: Presigned S3 URL for direct download + description: Presigned S3 URL (presign mode only) expires_at: type: string format: date-time - description: URL expiration timestamp + description: URL expiration timestamp (presign mode only) + integrity: + $ref: "#/components/schemas/IntegrityClaim" UploadInitRequest: type: object diff --git a/cmd/broker/main_test.go b/cmd/broker/main_test.go index fd87ee84..4de709a7 100644 --- a/cmd/broker/main_test.go +++ b/cmd/broker/main_test.go @@ -928,6 +928,14 @@ func (f *failingS3Client) UploadIndex(ctx context.Context, key string, body []by return errors.New("s3 unavailable") } +func (f *failingS3Client) DeleteSegment(ctx context.Context, key string) error { + return errors.New("s3 unavailable") +} + +func (f *failingS3Client) DeleteIndex(ctx context.Context, key string) error { + return errors.New("s3 unavailable") +} + func (f *failingS3Client) DownloadSegment(ctx context.Context, key string, rng *storage.ByteRange) ([]byte, error) { return nil, errors.New("unsupported") } diff --git a/cmd/broker/s3_dual.go b/cmd/broker/s3_dual.go index 72771e8d..00ddfc7f 100644 --- a/cmd/broker/s3_dual.go +++ b/cmd/broker/s3_dual.go @@ -41,6 +41,14 @@ func (d *dualS3Client) UploadIndex(ctx context.Context, key string, body []byte) return d.write.UploadIndex(ctx, key, body) } +func (d *dualS3Client) DeleteSegment(ctx context.Context, key string) error { + return d.write.DeleteSegment(ctx, key) +} + +func (d *dualS3Client) DeleteIndex(ctx context.Context, key string) error { + return d.write.DeleteIndex(ctx, key) +} + func (d *dualS3Client) DownloadSegment(ctx context.Context, key string, rng *storage.ByteRange) ([]byte, error) { data, err := d.read.DownloadSegment(ctx, key, rng) if err == nil { diff --git a/cmd/kafscale-cli/main.go b/cmd/kafscale-cli/main.go index d8c99215..134c8800 100644 --- a/cmd/kafscale-cli/main.go +++ b/cmd/kafscale-cli/main.go @@ -17,6 +17,7 @@ package main import ( "context" + "errors" "flag" "fmt" "io" @@ -191,6 +192,19 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3 return err } + restoreCommitted := false + targetCreated := false + defer func() { + if restoreCommitted || !targetCreated { + return + } + if err := store.DeleteTopic(context.Background(), cfg.TargetTopic); err != nil { + if !errors.Is(err, metadata.ErrUnknownTopic) { + fmt.Fprintf(os.Stderr, "warning: failed to roll back target topic %s: %v\n", cfg.TargetTopic, err) + } + } + }() + if _, err := store.CreateTopic(ctx, metadata.TopicSpec{ Name: cfg.TargetTopic, NumPartitions: sourceCfg.Partitions, @@ -198,11 +212,12 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3 }); err != nil { return err } + targetCreated = true targetCfg := cloneTopicConfig(sourceCfg) targetCfg.Name = cfg.TargetTopic targetCfg.CreatedAt = time.Now().UTC().Format(time.RFC3339) - if err := store.UpdateTopicConfig(ctx, targetCfg); err != nil { + if err := persistTopicConfig(ctx, store, targetCfg); err != nil { return err } @@ -244,6 +259,7 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3 for _, partition := range result.Partitions { _, _ = fmt.Fprintf(stdout, "partition=%d segments=%d last_offset=%d\n", partition.Partition, partition.SegmentsCopied, partition.LastOffset) } + restoreCommitted = true return nil } @@ -286,6 +302,17 @@ func writePartitionStates(ctx context.Context, store *metadata.EtcdStore, topic return nil } +func persistTopicConfig(ctx context.Context, store *metadata.EtcdStore, cfg *metadatapb.TopicConfig) error { + payload, err := metadata.EncodeTopicConfig(cfg) + if err != nil { + return err + } + putCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + _, err = store.EtcdClient().Put(putCtx, metadata.TopicConfigKey(cfg.Name), string(payload)) + return err +} + func cloneTopicConfig(cfg *metadatapb.TopicConfig) *metadatapb.TopicConfig { if cfg == nil { return nil diff --git a/cmd/kafscale-cli/main_test.go b/cmd/kafscale-cli/main_test.go index 0a547a99..099e6e56 100644 --- a/cmd/kafscale-cli/main_test.go +++ b/cmd/kafscale-cli/main_test.go @@ -33,6 +33,28 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" ) +type failingListS3 struct { + *storage.MemoryS3Client + err error +} + +func (f *failingListS3) ListSegments(context.Context, string) ([]storage.S3Object, error) { + return nil, f.err +} + +type failingUploadIndexS3 struct { + *storage.MemoryS3Client + targetPrefix string + err error +} + +func (f *failingUploadIndexS3) UploadIndex(ctx context.Context, key string, body []byte) error { + if strings.HasPrefix(key, f.targetPrefix) { + return f.err + } + return f.MemoryS3Client.UploadIndex(ctx, key, body) +} + func TestRunHelpAndUnknownCommand(t *testing.T) { var stdout bytes.Buffer if err := run(context.Background(), []string{"help"}, &stdout, &bytes.Buffer{}); err != nil { @@ -365,3 +387,124 @@ func TestExecuteRestoreRejectsUnknownPartition(t *testing.T) { t.Fatalf("expected unknown partition error, got %v", err) } } + +func TestExecuteRestoreRollsBackTargetTopicOnRecoveryFailure(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + store, err := metadata.NewEtcdStore(ctx, metadata.ClusterMetadata{ + Brokers: []protocol.MetadataBroker{ + {NodeID: 1, Host: "broker-0", Port: 9092}, + }, + ControllerID: 1, + }, metadata.EtcdStoreConfig{Endpoints: endpoints}) + if err != nil { + t.Fatalf("NewEtcdStore: %v", err) + } + defer func() { _ = store.EtcdClient().Close() }() + + if _, err := store.CreateTopic(ctx, metadata.TopicSpec{ + Name: "orders", + NumPartitions: 1, + ReplicationFactor: 1, + }); err != nil { + t.Fatalf("CreateTopic: %v", err) + } + + s3 := &failingListS3{ + MemoryS3Client: storage.NewMemoryS3Client(), + err: errors.New("list failed"), + } + err = executeRestore(ctx, &bytes.Buffer{}, restoreConfig{ + SourceTopic: "orders", + SourceNamespace: "default", + TargetTopic: "orders-restored", + TargetNamespace: "default", + RestoreTo: time.Now().UTC(), + }, s3, store) + if err == nil || !strings.Contains(err.Error(), "list failed") { + t.Fatalf("expected list failure, got %v", err) + } + + meta, err := store.Metadata(ctx, []string{"orders-restored"}) + if err != nil { + t.Fatalf("Metadata: %v", err) + } + if len(meta.Topics) != 1 || meta.Topics[0].ErrorCode == 0 { + t.Fatalf("expected rolled back target topic to be absent, got %+v", meta.Topics) + } +} + +func TestExecuteRestoreRollsBackCopiedS3ObjectsOnPartialFailure(t *testing.T) { + endpoints := testutil.StartEmbeddedEtcd(t) + ctx := context.Background() + + store, err := metadata.NewEtcdStore(ctx, metadata.ClusterMetadata{ + Brokers: []protocol.MetadataBroker{ + {NodeID: 1, Host: "broker-0", Port: 9092}, + }, + ControllerID: 1, + }, metadata.EtcdStoreConfig{Endpoints: endpoints}) + if err != nil { + t.Fatalf("NewEtcdStore: %v", err) + } + defer func() { _ = store.EtcdClient().Close() }() + + if _, err := store.CreateTopic(ctx, metadata.TopicSpec{ + Name: "orders", + NumPartitions: 1, + ReplicationFactor: 1, + }); err != nil { + t.Fatalf("CreateTopic: %v", err) + } + + mem := storage.NewMemoryS3Client() + artifact, err := storage.BuildSegment(storage.SegmentWriterConfig{IndexIntervalMessages: 1}, []storage.RecordBatch{ + { + BaseOffset: 0, + LastOffsetDelta: 0, + MessageCount: 1, + Bytes: make([]byte, 70), + }, + }, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC)) + if err != nil { + t.Fatalf("BuildSegment: %v", err) + } + if err := mem.UploadSegment(ctx, "default/orders/0/segment-00000000000000000000.kfs", artifact.SegmentBytes); err != nil { + t.Fatalf("UploadSegment: %v", err) + } + if err := mem.UploadIndex(ctx, "default/orders/0/segment-00000000000000000000.index", artifact.IndexBytes); err != nil { + t.Fatalf("UploadIndex: %v", err) + } + + s3 := &failingUploadIndexS3{ + MemoryS3Client: mem, + targetPrefix: "default/orders-restored/", + err: errors.New("upload index failed"), + } + err = executeRestore(ctx, &bytes.Buffer{}, restoreConfig{ + SourceTopic: "orders", + SourceNamespace: "default", + TargetTopic: "orders-restored", + TargetNamespace: "default", + RestoreTo: time.Now().UTC(), + }, s3, store) + if err == nil || !strings.Contains(err.Error(), "upload index failed") { + t.Fatalf("expected upload index failure, got %v", err) + } + + if _, err := mem.DownloadSegment(ctx, "default/orders-restored/0/segment-00000000000000000000.kfs", nil); err == nil { + t.Fatal("expected restored segment to be cleaned up after failure") + } + if _, err := mem.DownloadIndex(ctx, "default/orders-restored/0/segment-00000000000000000000.index"); err == nil { + t.Fatal("expected restored index to be cleaned up after failure") + } + + meta, err := store.Metadata(ctx, []string{"orders-restored"}) + if err != nil { + t.Fatalf("Metadata: %v", err) + } + if len(meta.Topics) != 1 || meta.Topics[0].ErrorCode == 0 { + t.Fatalf("expected rolled back target topic to be absent, got %+v", meta.Topics) + } +} diff --git a/cmd/proxy/lfs.go b/cmd/proxy/lfs.go index 91034d71..7252095d 100644 --- a/cmd/proxy/lfs.go +++ b/cmd/proxy/lfs.go @@ -76,6 +76,7 @@ type lfsModule struct { httpShutdownTimeout time.Duration topicMaxLength int downloadTTLMax time.Duration + presignEnabled bool dialTimeout time.Duration backendTLSConfig *tls.Config backendSASLMechanism string @@ -98,9 +99,27 @@ type lfsModule struct { rr uint32 } +var blockedBucketNames = []string{ + "kafscale-lfs", + "kafscale-lfs-dev", + "kafscale-lfs-staging", + "kafscale-example", +} + func initLFSModule(ctx context.Context, logger *slog.Logger) (*lfsModule, error) { s3Bucket := strings.TrimSpace(os.Getenv("KAFSCALE_LFS_PROXY_S3_BUCKET")) + if s3Bucket == "" { + return nil, fmt.Errorf("KAFSCALE_LFS_PROXY_S3_BUCKET is required when LFS is enabled") + } + for _, blocked := range blockedBucketNames { + if s3Bucket == blocked { + return nil, fmt.Errorf("KAFSCALE_LFS_PROXY_S3_BUCKET=%q is a known unsafe example name and must not be used — configure your own bucket", blocked) + } + } s3Region := strings.TrimSpace(os.Getenv("KAFSCALE_LFS_PROXY_S3_REGION")) + if s3Region == "" { + return nil, fmt.Errorf("KAFSCALE_LFS_PROXY_S3_REGION is required when LFS is enabled") + } s3Endpoint := strings.TrimSpace(os.Getenv("KAFSCALE_LFS_PROXY_S3_ENDPOINT")) s3PublicURL := strings.TrimSpace(os.Getenv("KAFSCALE_LFS_PROXY_S3_PUBLIC_ENDPOINT")) s3AccessKey := strings.TrimSpace(os.Getenv("KAFSCALE_LFS_PROXY_S3_ACCESS_KEY")) @@ -217,6 +236,7 @@ func initLFSModule(ctx context.Context, logger *slog.Logger) (*lfsModule, error) httpShutdownTimeout: httpShutdownTimeout, topicMaxLength: topicMaxLength, downloadTTLMax: time.Duration(downloadTTLSec) * time.Second, + presignEnabled: lfsEnvBoolDefault("KAFSCALE_LFS_PROXY_PRESIGN_ENABLED", false), dialTimeout: dialTimeout, backendRetries: backendRetries, backendBackoff: backendBackoff, @@ -237,9 +257,32 @@ func initLFSModule(ctx context.Context, logger *slog.Logger) (*lfsModule, error) s3HealthInterval := time.Duration(lfsEnvInt("KAFSCALE_LFS_PROXY_S3_HEALTH_INTERVAL_SEC", defaultLFSS3HealthIntervalSec)) * time.Second m.startS3HealthCheck(ctx, s3HealthInterval) + if m.presignEnabled { + m.logger.Warn("presign download mode is ENABLED — LFS downloads in presign mode bypass server-side integrity verification. Clients must verify payloads against the envelope checksum. Bare consumption of presigned URLs (e.g. curl) opts out of tamper detection.", + "flag", "KAFSCALE_LFS_PROXY_PRESIGN_ENABLED") + go m.runPresignWarningLoop(ctx) + } + return m, nil } +// runPresignWarningLoop logs a recurring warning every 5 minutes while presign +// mode is enabled. This makes the security trade-off visible in long-running +// logs, so operators cannot silently forget they opted out of integrity checks. +func (m *lfsModule) runPresignWarningLoop(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.logger.Warn("presign download mode is ENABLED — clients must verify payloads against envelope checksum", + "flag", "KAFSCALE_LFS_PROXY_PRESIGN_ENABLED") + } + } +} + // rewriteProduceRequest is the integration point called from handleProduceRouting. // It scans produce records for LFS_BLOB headers, uploads blobs to S3, and // replaces record values with LFS envelope JSON — all in-place on the parsed diff --git a/cmd/proxy/lfs_http.go b/cmd/proxy/lfs_http.go index c4ab824e..8cc231c5 100644 --- a/cmd/proxy/lfs_http.go +++ b/cmd/proxy/lfs_http.go @@ -26,6 +26,7 @@ import ( "io" "math" "net/http" + "os" "regexp" "strconv" "strings" @@ -58,16 +59,35 @@ type lfsErrorResponse struct { } type lfsDownloadRequest struct { - Bucket string `json:"bucket"` - Key string `json:"key"` - Mode string `json:"mode"` - ExpiresSeconds int `json:"expires_seconds"` + Bucket string `json:"bucket"` + Key string `json:"key"` + Mode string `json:"mode"` + ExpiresSeconds int `json:"expires_seconds"` + Integrity *lfsIntegrityRequest `json:"integrity,omitempty"` +} + +// lfsIntegrityRequest carries the Kafka-authoritative checksum from the +// consumer's envelope into the proxy. The proxy verifies S3 bytes against this +// value on stream-mode downloads and echoes it back on presign-mode responses. +type lfsIntegrityRequest struct { + SHA256 string `json:"sha256"` + ChecksumAlg string `json:"checksum_alg,omitempty"` + Size int64 `json:"size,omitempty"` } type lfsDownloadResponse struct { - Mode string `json:"mode"` - URL string `json:"url"` - ExpiresAt string `json:"expires_at"` + Mode string `json:"mode"` + URL string `json:"url,omitempty"` + ExpiresAt string `json:"expires_at,omitempty"` + Integrity *lfsIntegrityResponse `json:"integrity,omitempty"` +} + +// lfsIntegrityResponse is the echoed checksum returned in presign-mode +// responses so client SDKs can verify the downloaded bytes themselves. +type lfsIntegrityResponse struct { + SHA256 string `json:"sha256"` + ChecksumAlg string `json:"checksum_alg"` + Size int64 `json:"size,omitempty"` } type lfsUploadInitRequest struct { @@ -200,7 +220,7 @@ func (m *lfsModule) lfsCORSMiddleware(next http.HandlerFunc) http.HandlerFunc { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Range, X-Kafka-Topic, X-Kafka-Key, X-Kafka-Partition, X-LFS-Checksum, X-LFS-Checksum-Alg, X-LFS-Size, X-LFS-Mode, X-Request-ID, X-API-Key, Authorization") - w.Header().Set("Access-Control-Expose-Headers", "X-Request-ID") + w.Header().Set("Access-Control-Expose-Headers", "X-Request-ID, X-Kafscale-LFS-Checksum, X-Kafscale-LFS-Content-Length") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusNoContent) return @@ -416,13 +436,78 @@ func (m *lfsModule) handleHTTPDownload(w http.ResponseWriter, r *http.Request) { mode := strings.ToLower(strings.TrimSpace(req.Mode)) if mode == "" { - mode = "presign" + mode = "stream" } if mode != "presign" && mode != "stream" { m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "invalid_mode", "mode must be presign or stream") return } + if mode == "presign" && !m.presignEnabled { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "presign_disabled", + "presign mode is disabled on this proxy; use mode=stream or set KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true") + return + } + + // The consumer's Kafka envelope is the authoritative source of the + // checksum. The proxy refuses to serve an object without one — S3 is + // treated as untrusted storage. + if req.Integrity == nil || strings.TrimSpace(req.Integrity.SHA256) == "" { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "missing_integrity", + "integrity.sha256 is required; pass the checksum from the LFS envelope") + return + } + expectedSHA := strings.ToLower(strings.TrimSpace(req.Integrity.SHA256)) + if len(expectedSHA) != sha256.Size*2 { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "invalid_integrity", + "integrity.sha256 must be a 64-character hex-encoded SHA-256 digest") + return + } + if _, err := hex.DecodeString(expectedSHA); err != nil { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "invalid_integrity", + "integrity.sha256 must be hex-encoded") + return + } + expectedAlg := strings.ToLower(strings.TrimSpace(req.Integrity.ChecksumAlg)) + if expectedAlg == "" { + expectedAlg = "sha256" + } + if expectedAlg != "sha256" { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "unsupported_checksum_alg", + "only checksum_alg=sha256 is supported") + return + } + if req.Integrity.Size < 0 { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "invalid_integrity", + "integrity.size must be non-negative") + return + } + // Size is required for stream mode because the proxy enforces a hard byte + // cap on the S3 read (defense against attacker-extended payloads from a + // compromised bucket). Presign mode can do without — the URL it returns + // carries no proxy-side byte transfer. + if mode == "stream" && req.Integrity.Size <= 0 { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "missing_integrity_size", + "integrity.size is required for stream mode; pass the size from the LFS envelope") + return + } + // Upper-bound the client-claimed size against the proxy's configured + // max-blob ceiling. Without this cap a caller (or a compromised producer + // upstream) can claim Integrity.Size = TB-scale and force the proxy to + // allocate that much temp storage per request. Also avoids the + // expectedSize+1 integer overflow when computing the io.LimitReader cap. + if mode == "stream" && m.maxBlob > 0 && req.Integrity.Size > m.maxBlob { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "payload_too_large", + "integrity.size exceeds proxy maximum blob size (KAFSCALE_LFS_PROXY_MAX_BLOB_SIZE)") + return + } + if mode == "stream" && req.Integrity.Size == math.MaxInt64 { + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadRequest, "payload_too_large", + "integrity.size is too large to verify safely") + return + } + expectedSize := req.Integrity.Size + clientIP := lfsGetClientIP(r) start := time.Now() ttlSeconds := 0 @@ -455,33 +540,139 @@ func (m *lfsModule) handleHTTPDownload(w http.ResponseWriter, r *http.Request) { Mode: "presign", URL: url, ExpiresAt: time.Now().UTC().Add(ttl).Format(time.RFC3339), + Integrity: &lfsIntegrityResponse{ + SHA256: expectedSHA, + ChecksumAlg: "sha256", + Size: expectedSize, + }, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _ = json.NewEncoder(w).Encode(resp) case "stream": - obj, err := m.s3Uploader.GetObject(r.Context(), req.Key) - if err != nil { - m.metrics.IncS3Errors() - m.lfsWriteHTTPError(w, requestID, "", http.StatusBadGateway, "s3_get_failed", err.Error()) - return - } - defer func() { _ = obj.Body.Close() }() - contentType := "application/octet-stream" - if obj.ContentType != nil && *obj.ContentType != "" { - contentType = *obj.ContentType + m.streamDownloadWithVerify(r, w, requestID, req.Bucket, req.Key, expectedSHA, expectedSize, start) + } +} + +// streamDownloadWithVerify fetches the S3 object, buffers it to a temporary +// file while computing SHA-256, and only begins streaming to the client if +// the hash matches the envelope checksum. On mismatch (or oversize) the +// handler returns 502 with a structured error JSON — no tampered bytes ever +// reach the client. +// +// This is intentionally NOT a stream-and-verify-at-end design. An earlier +// attempt (PR #139, commit c4b230f) tried to stream + verify + truncate on +// mismatch via chunked-transfer framing tricks. Adversarial review showed +// the design was unsound: for any object larger than Go's internal chunk +// buffer the tampered bytes were already on the wire before the abort; +// HTTP intermediaries (nginx-ingress, ALB, CDNs) buffer responses and +// erase the truncation signal; trailer-based signaling is invisible to +// most HTTP client libraries (Python requests, JS fetch, curl --output). +// +// Buffer-then-verify trades latency and temporary disk for an actual +// security guarantee that holds across transports and intermediaries. +// The S3 read is capped at expectedSize+1 via io.LimitReader so a +// compromised bucket cannot exhaust proxy disk by returning a larger +// payload than the envelope declares. +func (m *lfsModule) streamDownloadWithVerify(r *http.Request, w http.ResponseWriter, requestID, bucket, key, expectedSHA string, expectedSize int64, start time.Time) { + obj, err := m.s3Uploader.GetObject(r.Context(), key) + if err != nil { + m.metrics.IncS3Errors() + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadGateway, "s3_get_failed", err.Error()) + return + } + defer func() { _ = obj.Body.Close() }() + + tmpFile, err := os.CreateTemp("", "kafscale-lfs-verify-*") + if err != nil { + m.lfsWriteHTTPError(w, requestID, "", http.StatusInternalServerError, "temp_storage_failed", err.Error()) + return + } + tmpPath := tmpFile.Name() + defer func() { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + }() + + // Cap S3 read at expectedSize+1. The +1 lets us detect over-size payloads + // (a compromised bucket returning more bytes than the envelope declares) + // without trusting the S3-reported Content-Length. + sizeLimit := expectedSize + 1 + limited := io.LimitReader(obj.Body, sizeLimit) + + hasher := sha256.New() + buf := make([]byte, 32*1024) + var written int64 + for { + n, readErr := limited.Read(buf) + if n > 0 { + chunk := buf[:n] + nw, writeErr := tmpFile.Write(chunk) + if writeErr != nil { + m.logger.Warn("temporary file write failed during verification buffer", "error", writeErr, "bytes", written) + m.lfsWriteHTTPError(w, requestID, "", http.StatusInternalServerError, "temp_storage_failed", writeErr.Error()) + return + } + if nw != len(chunk) { + m.logger.Warn("temporary file short write during verification buffer", "written", nw, "expected", len(chunk), "bytes", written) + m.lfsWriteHTTPError(w, requestID, "", http.StatusInternalServerError, "temp_storage_failed", io.ErrShortWrite.Error()) + return + } + _, _ = hasher.Write(chunk) + written += int64(n) } - w.Header().Set("Content-Type", contentType) - var size int64 - if obj.ContentLength != nil { - size = *obj.ContentLength - w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + if readErr == io.EOF { + break } - if _, err := io.Copy(w, obj.Body); err != nil { - m.logger.Warn("download stream failed", "error", err) + if readErr != nil { + m.metrics.IncS3Errors() + m.logger.Warn("S3 read failed during verification buffer", "error", readErr, "bytes", written) + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadGateway, "s3_get_failed", readErr.Error()) + return } - m.tracker.EmitDownloadCompleted(requestID, req.Key, mode, time.Since(start), size) } + + if written > expectedSize { + m.logger.Error("LFS download size exceeded envelope — possible bucket compromise", + "bucket", bucket, "key", key, "expected_size", expectedSize, "read_at_least", written) + m.tracker.EmitDownloadIntegrityFailed(requestID, bucket, key, "stream", "sha256", expectedSHA, "", written, expectedSize) + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadGateway, "integrity_failure", + "S3 object exceeds envelope-declared size; refusing to serve") + return + } + + actualSHA := hex.EncodeToString(hasher.Sum(nil)) + if actualSHA != expectedSHA { + m.logger.Error("LFS download integrity check FAILED — S3 bytes do not match Kafka envelope checksum", + "bucket", bucket, "key", key, + "expected_sha256", expectedSHA, "actual_sha256", actualSHA, + "bytes_read", written) + m.tracker.EmitDownloadIntegrityFailed(requestID, bucket, key, "stream", "sha256", expectedSHA, actualSHA, written, expectedSize) + m.lfsWriteHTTPError(w, requestID, "", http.StatusBadGateway, "integrity_failure", + "S3 bytes do not match Kafka envelope SHA-256; refusing to serve") + return + } + + // Verified. Stream the temp file to the client with Content-Length set — + // the bytes are now provably authentic, so a complete Content-Length-N + // response is correct. + if _, err := tmpFile.Seek(0, io.SeekStart); err != nil { + m.lfsWriteHTTPError(w, requestID, "", http.StatusInternalServerError, "temp_storage_failed", err.Error()) + return + } + contentType := "application/octet-stream" + if obj.ContentType != nil && *obj.ContentType != "" { + contentType = *obj.ContentType + } + w.Header().Set("Content-Type", contentType) + w.Header().Set("Content-Length", strconv.FormatInt(written, 10)) + w.Header().Set("X-Kafscale-LFS-Checksum", "sha256="+actualSHA) + w.Header().Set("X-Kafscale-LFS-Content-Length", strconv.FormatInt(written, 10)) + w.WriteHeader(http.StatusOK) + if _, err := io.Copy(w, tmpFile); err != nil { + m.logger.Warn("download stream to client failed after verification", "error", err) + } + m.tracker.EmitDownloadCompleted(requestID, key, "stream", time.Since(start), written) } func (m *lfsModule) handleHTTPUploadInit(w http.ResponseWriter, r *http.Request) { diff --git a/cmd/proxy/lfs_http_test.go b/cmd/proxy/lfs_http_test.go index 20ba891e..64f589a8 100644 --- a/cmd/proxy/lfs_http_test.go +++ b/cmd/proxy/lfs_http_test.go @@ -17,12 +17,16 @@ package main import ( "bytes" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "io" "log/slog" + "math" "net/http" "net/http/httptest" + "strconv" "strings" "sync/atomic" "testing" @@ -346,8 +350,11 @@ func TestLfsCORSMiddleware_Preflight(t *testing.T) { if rr.Header().Get("Access-Control-Allow-Headers") == "" { t.Fatal("expected Access-Control-Allow-Headers header") } - if rr.Header().Get("Access-Control-Expose-Headers") != "X-Request-ID" { - t.Fatal("expected Access-Control-Expose-Headers: X-Request-ID") + exposed := rr.Header().Get("Access-Control-Expose-Headers") + for _, header := range []string{"X-Request-ID", "X-Kafscale-LFS-Checksum", "X-Kafscale-LFS-Content-Length"} { + if !strings.Contains(exposed, header) { + t.Fatalf("expected Access-Control-Expose-Headers to include %s, got %q", header, exposed) + } } } @@ -572,11 +579,18 @@ func TestHandleHTTPDownload_WrongBucket(t *testing.T) { func TestHandleHTTPDownload_PresignMode(t *testing.T) { m := testHTTPModule(t) m.httpAPIKey = "" + m.presignEnabled = true + const sha = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" body, _ := json.Marshal(lfsDownloadRequest{ Bucket: "test-bucket", Key: "test-ns/topic/lfs/2025/01/01/obj-123", Mode: "presign", + Integrity: &lfsIntegrityRequest{ + SHA256: sha, + ChecksumAlg: "sha256", + Size: 1024, + }, }) req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) rr := httptest.NewRecorder() @@ -598,16 +612,115 @@ func TestHandleHTTPDownload_PresignMode(t *testing.T) { if resp.ExpiresAt == "" { t.Fatal("expected non-empty expires_at") } + if resp.Integrity == nil { + t.Fatal("expected integrity block in presign response") + } + if resp.Integrity.SHA256 != sha { + t.Fatalf("expected integrity.sha256=%s, got %s", sha, resp.Integrity.SHA256) + } } -func TestHandleHTTPDownload_DefaultModeIsPresign(t *testing.T) { +func TestHandleHTTPDownload_PresignDisabledByDefault(t *testing.T) { m := testHTTPModule(t) m.httpAPIKey = "" + // presignEnabled defaults to false — do not override body, _ := json.Marshal(lfsDownloadRequest{ Bucket: "test-bucket", Key: "test-ns/topic/lfs/2025/01/01/obj-123", - // Mode intentionally omitted (empty string) + Mode: "presign", + Integrity: &lfsIntegrityRequest{ + SHA256: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + }, + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + if rr.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d; body: %s", rr.Code, rr.Body.String()) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + if errResp.Code != "presign_disabled" { + t.Fatalf("expected code=presign_disabled, got %s", errResp.Code) + } +} + +func TestHandleHTTPDownload_MissingIntegrity(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + m.presignEnabled = true + + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", + Key: "test-ns/topic/lfs/2025/01/01/obj-123", + Mode: "presign", + // Integrity intentionally omitted + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + if rr.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d; body: %s", rr.Code, rr.Body.String()) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + if errResp.Code != "missing_integrity" { + t.Fatalf("expected code=missing_integrity, got %s", errResp.Code) + } +} + +func TestHandleHTTPDownload_InvalidIntegritySHA(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", + Key: "test-ns/topic/lfs/2025/01/01/obj-invalid-sha", + Mode: "stream", + Integrity: &lfsIntegrityRequest{ + SHA256: "not-a-sha256", + Size: 1024, + }, + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + if rr.Code != http.StatusBadRequest { + t.Fatalf("expected 400 for invalid integrity sha, got %d; body: %s", rr.Code, rr.Body.String()) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + if errResp.Code != "invalid_integrity" { + t.Fatalf("expected code=invalid_integrity, got %s", errResp.Code) + } +} + +func TestHandleHTTPDownload_StreamVerifyMatch(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + payload := []byte("hello integrity world") + key := "test-ns/topic/lfs/2025/01/01/obj-verify-ok" + m.s3Uploader.api.(*fakeS3).objects[key] = payload + h := sha256.New() + h.Write(payload) + expectedSHA := hex.EncodeToString(h.Sum(nil)) + + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", Key: key, Mode: "stream", + Integrity: &lfsIntegrityRequest{ + SHA256: expectedSHA, ChecksumAlg: "sha256", Size: int64(len(payload)), + }, }) req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) rr := httptest.NewRecorder() @@ -616,12 +729,181 @@ func TestHandleHTTPDownload_DefaultModeIsPresign(t *testing.T) { if rr.Code != http.StatusOK { t.Fatalf("expected 200, got %d; body: %s", rr.Code, rr.Body.String()) } - var resp lfsDownloadResponse - if err := json.NewDecoder(rr.Body).Decode(&resp); err != nil { - t.Fatalf("failed to decode response: %v", err) + if got := rr.Header().Get("X-Kafscale-LFS-Checksum"); got != "sha256="+expectedSHA { + t.Errorf("expected X-Kafscale-LFS-Checksum=sha256=%s, got %q", expectedSHA, got) } - if resp.Mode != "presign" { - t.Fatalf("expected default mode=presign, got %s", resp.Mode) + if got := rr.Header().Get("Content-Length"); got != "21" { + t.Errorf("expected Content-Length=21 on verified response, got %q", got) + } + if !bytes.Equal(rr.Body.Bytes(), payload) { + t.Errorf("response body mismatch") + } +} + +func TestHandleHTTPDownload_StreamVerifyMismatch_ReturnsErrorJSON(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + tampered := []byte("legitimate content") + key := "test-ns/topic/lfs/2025/01/01/obj-tampered" + m.s3Uploader.api.(*fakeS3).objects[key] = tampered + + wrongSHA := "0000000000000000000000000000000000000000000000000000000000000000" + + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", Key: key, Mode: "stream", + Integrity: &lfsIntegrityRequest{ + SHA256: wrongSHA, ChecksumAlg: "sha256", Size: int64(len(tampered)), + }, + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + if rr.Code != http.StatusBadGateway { + t.Fatalf("expected 502 on integrity mismatch, got %d; body: %s", rr.Code, rr.Body.String()) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err != nil { + t.Fatalf("expected JSON error response, got: %s", rr.Body.String()) + } + if errResp.Code != "integrity_failure" { + t.Fatalf("expected error code=integrity_failure, got %q", errResp.Code) + } + // Tampered bytes MUST NOT appear in the response body. + if bytes.Contains(rr.Body.Bytes(), tampered) { + t.Fatalf("SECURITY: tampered bytes leaked into mismatch response body") + } +} + +func TestHandleHTTPDownload_DefaultModeIsStream(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + // presignEnabled stays false — default mode must be stream, not presign, + // otherwise requests without explicit mode would hit presign_disabled. + + // Provide a real envelope (sha + size). Stream mode now requires Size, + // so omitting it would short-circuit to missing_integrity_size before + // reaching the mode-routing logic this test cares about. + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", Key: "test-ns/topic/lfs/2025/01/01/obj-123", + // Mode intentionally omitted (empty string → default) + Integrity: &lfsIntegrityRequest{ + SHA256: "0000000000000000000000000000000000000000000000000000000000000000", + Size: 42, + }, + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + // With default mode = stream, the request should NOT short-circuit to + // presign_disabled (which would happen if default were presign). Instead + // it should reach the verify path: fakeS3 returns no object -> 502 + // s3_get_failed. + if rr.Code == http.StatusBadRequest { + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err == nil { + if errResp.Code == "presign_disabled" { + t.Fatalf("default mode must not be presign; got presign_disabled") + } + if errResp.Code == "missing_integrity_size" { + t.Fatalf("test setup bug: Integrity.Size must be set") + } + } + } + if rr.Code != http.StatusBadGateway { + t.Fatalf("expected 502 from stream-mode verify path, got %d; body: %s", rr.Code, rr.Body.String()) + } +} + +func TestHandleHTTPDownload_StreamRejectsOversizedIntegritySize(t *testing.T) { + // Defense against tmpfs exhaustion / integer overflow on expectedSize+1. + // Without this cap, a caller can claim Integrity.Size = TB-scale and the + // proxy would attempt to buffer that much per request (the LimitReader + // would also overflow to a negative N, yielding misleading errors). + m := testHTTPModule(t) + m.httpAPIKey = "" + m.maxBlob = 10 * 1024 * 1024 // 10 MiB cap for the test + + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", Key: "test-ns/topic/lfs/2025/01/01/obj-oversize-claim", + Mode: "stream", + Integrity: &lfsIntegrityRequest{ + SHA256: "0000000000000000000000000000000000000000000000000000000000000000", + Size: m.maxBlob + 1, // 1 byte over the cap + }, + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + if rr.Code != http.StatusBadRequest { + t.Fatalf("expected 400 when Integrity.Size exceeds maxBlob, got %d", rr.Code) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err != nil { + t.Fatalf("expected JSON error: %v", err) + } + if errResp.Code != "payload_too_large" { + t.Fatalf("expected code=payload_too_large, got %q", errResp.Code) + } +} + +func TestHandleHTTPDownload_StreamRejectsMaxInt64IntegritySizeWhenUncapped(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + m.maxBlob = 0 // unlimited cap must still not overflow expectedSize+1 + + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", Key: "test-ns/topic/lfs/2025/01/01/obj-maxint-claim", + Mode: "stream", + Integrity: &lfsIntegrityRequest{ + SHA256: "0000000000000000000000000000000000000000000000000000000000000000", + Size: math.MaxInt64, + }, + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + if rr.Code != http.StatusBadRequest { + t.Fatalf("expected 400 when Integrity.Size is MaxInt64, got %d", rr.Code) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err != nil { + t.Fatalf("expected JSON error: %v", err) + } + if errResp.Code != "payload_too_large" { + t.Fatalf("expected code=payload_too_large, got %q", errResp.Code) + } +} + +func TestHandleHTTPDownload_StreamRequiresIntegritySize(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", Key: "test-ns/topic/lfs/2025/01/01/obj-no-size", + Mode: "stream", + Integrity: &lfsIntegrityRequest{ + SHA256: "0000000000000000000000000000000000000000000000000000000000000000", + // Size intentionally omitted + }, + }) + req := httptest.NewRequest(http.MethodPost, "/lfs/download", bytes.NewReader(body)) + rr := httptest.NewRecorder() + m.handleHTTPDownload(rr, req) + + if rr.Code != http.StatusBadRequest { + t.Fatalf("expected 400 when stream mode is missing Integrity.Size, got %d", rr.Code) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(rr.Body).Decode(&errResp); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + if errResp.Code != "missing_integrity_size" { + t.Fatalf("expected code=missing_integrity_size, got %s", errResp.Code) } } @@ -1347,3 +1629,226 @@ func TestHandleHTTPUploadSession_S3Unhealthy(t *testing.T) { t.Fatalf("expected 503, got %d", rr.Code) } } + +// --------------------------------------------------------------------------- +// Stream-mode integrity wire-format tests (PR #139). +// +// Real httptest.NewServer + http.Client. The earlier recorder-based and +// chunked-trailer designs both proved unsound; this suite asserts the +// buffer-then-verify guarantee directly: tampered bytes are NEVER written +// to the response body. Object sizes span 256 B → 1 MiB to defeat the +// "small payloads stay in Go's chunk-writer buffer" test artifact that +// hid the same-size-tampering bug in the previous iteration. +// --------------------------------------------------------------------------- + +func streamVerifyPostBody(t *testing.T, srv *httptest.Server, key, sha string, size int64) *http.Response { + t.Helper() + body, _ := json.Marshal(lfsDownloadRequest{ + Bucket: "test-bucket", Key: key, Mode: "stream", + Integrity: &lfsIntegrityRequest{SHA256: sha, ChecksumAlg: "sha256", Size: size}, + }) + resp, err := http.Post(srv.URL, "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatalf("POST failed: %v", err) + } + return resp +} + +func TestStreamVerify_HonestObject_AtMultipleSizes(t *testing.T) { + sizes := []int{256, 4 * 1024, 64 * 1024, 1024 * 1024} + for _, n := range sizes { + n := n + t.Run("size="+strconv.Itoa(n), func(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + payload := bytes.Repeat([]byte("A"), n) + key := "test-ns/topic/lfs/2025/01/01/obj-honest-" + strconv.Itoa(n) + m.s3Uploader.api.(*fakeS3).objects[key] = payload + h := sha256.New() + h.Write(payload) + sha := hex.EncodeToString(h.Sum(nil)) + + srv := httptest.NewServer(http.HandlerFunc(m.handleHTTPDownload)) + defer srv.Close() + + resp := streamVerifyPostBody(t, srv, key, sha, int64(n)) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %d", resp.StatusCode) + } + if cl := resp.Header.Get("Content-Length"); cl != strconv.Itoa(n) { + t.Errorf("expected Content-Length=%d on verified response, got %q", n, cl) + } + got, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("body read failed: %v", err) + } + if !bytes.Equal(got, payload) { + t.Errorf("body mismatch at size=%d", n) + } + }) + } +} + +// TestStreamVerify_SameSizeTamper_NoTamperedBytesInResponse is the headline +// regression test for the issue @novatechflow flagged on PR #139. It uses +// payload sizes well above Go's internal chunk-writer buffer to catch the +// failure mode that hid behind small-payload tests in the earlier design. +func TestStreamVerify_SameSizeTamper_NoTamperedBytesInResponse(t *testing.T) { + sizes := []int{256, 4 * 1024, 64 * 1024, 1024 * 1024} + for _, n := range sizes { + n := n + t.Run("size="+strconv.Itoa(n), func(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + honest := bytes.Repeat([]byte("A"), n) + tampered := bytes.Repeat([]byte("X"), n) // same length, different content + key := "test-ns/topic/lfs/2025/01/01/obj-tamper-" + strconv.Itoa(n) + m.s3Uploader.api.(*fakeS3).objects[key] = tampered + + // Envelope SHA is for the HONEST bytes. + h := sha256.New() + h.Write(honest) + envelopeSHA := hex.EncodeToString(h.Sum(nil)) + + srv := httptest.NewServer(http.HandlerFunc(m.handleHTTPDownload)) + defer srv.Close() + + resp := streamVerifyPostBody(t, srv, key, envelopeSHA, int64(n)) + defer resp.Body.Close() + + // Mismatch must produce a structured error response, never 200. + if resp.StatusCode != http.StatusBadGateway { + t.Fatalf("expected 502 on integrity mismatch, got %d at size=%d", resp.StatusCode, n) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("error reading 502 body: %v", err) + } + // THE security assertion: tampered bytes MUST NOT appear in the + // response under any circumstance. A previous design returned + // the tampered bytes and signalled failure via framing tricks + // that most HTTP clients ignored. + if bytes.Contains(body, tampered) { + t.Fatalf("SECURITY: tampered payload (%d bytes of 'X') leaked into mismatch response at size=%d. "+ + "This is the @novatechflow PR #139 failure mode.", n, n) + } + // And the error code is the expected one. + var errResp lfsErrorResponse + if err := json.Unmarshal(body, &errResp); err != nil { + t.Fatalf("expected JSON error body, got: %q", body) + } + if errResp.Code != "integrity_failure" { + t.Errorf("expected code=integrity_failure, got %q", errResp.Code) + } + }) + } +} + +// TestStreamVerify_OversizedS3Response_RefusesToServe defends against an +// attacker who replaces an S3 object with one larger than the envelope- +// declared size. Without the LimitReader cap, the proxy could buffer +// arbitrary attacker-controlled bytes (disk exhaustion) before realising +// the hash mismatch. +func TestStreamVerify_OversizedS3Response_RefusesToServe(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + declaredSize := 1024 + honest := bytes.Repeat([]byte("A"), declaredSize) + oversized := bytes.Repeat([]byte("X"), declaredSize*10) // 10x larger + key := "test-ns/topic/lfs/2025/01/01/obj-oversize" + m.s3Uploader.api.(*fakeS3).objects[key] = oversized + + h := sha256.New() + h.Write(honest) + envelopeSHA := hex.EncodeToString(h.Sum(nil)) + + srv := httptest.NewServer(http.HandlerFunc(m.handleHTTPDownload)) + defer srv.Close() + + resp := streamVerifyPostBody(t, srv, key, envelopeSHA, int64(declaredSize)) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadGateway { + t.Fatalf("expected 502 for oversized S3 object, got %d", resp.StatusCode) + } + body, _ := io.ReadAll(resp.Body) + if bytes.Contains(body, oversized[:100]) { + t.Fatalf("SECURITY: oversized attacker bytes leaked into response") + } +} + +// TestStreamVerify_EmptyObjectSubstitution_RefusesToServe defends against +// the simplest bucket-takeover attack: replace the S3 object with a 0-byte +// file. The previous chunked-truncation design was racey on this case +// (Go might or might not have flushed status bytes before the abort, +// so client outcomes were transport-dependent). Buffer-then-verify +// produces a deterministic 502. +func TestStreamVerify_EmptyObjectSubstitution_RefusesToServe(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + declaredSize := 1024 + honest := bytes.Repeat([]byte("A"), declaredSize) + key := "test-ns/topic/lfs/2025/01/01/obj-empty-sub" + m.s3Uploader.api.(*fakeS3).objects[key] = []byte{} // 0 bytes + + h := sha256.New() + h.Write(honest) + envelopeSHA := hex.EncodeToString(h.Sum(nil)) + + srv := httptest.NewServer(http.HandlerFunc(m.handleHTTPDownload)) + defer srv.Close() + + resp := streamVerifyPostBody(t, srv, key, envelopeSHA, int64(declaredSize)) + defer resp.Body.Close() + + if resp.StatusCode != http.StatusBadGateway { + t.Fatalf("expected 502 for empty-object substitution, got %d", resp.StatusCode) + } + var errResp lfsErrorResponse + if err := json.NewDecoder(resp.Body).Decode(&errResp); err != nil { + t.Fatalf("expected JSON error body: %v", err) + } + if errResp.Code != "integrity_failure" { + t.Errorf("expected code=integrity_failure, got %q", errResp.Code) + } +} + +// TestStreamVerify_ContentLengthSetOnlyAfterVerification guards against +// the previous broken design. If Content-Length appears on a non-2xx +// response, or before the verify step, the framing assumption breaks. +func TestStreamVerify_ContentLengthSetOnlyAfterVerification(t *testing.T) { + m := testHTTPModule(t) + m.httpAPIKey = "" + + declaredSize := 4096 + honest := bytes.Repeat([]byte("A"), declaredSize) + tampered := bytes.Repeat([]byte("X"), declaredSize) + key := "test-ns/topic/lfs/2025/01/01/obj-cl-check" + m.s3Uploader.api.(*fakeS3).objects[key] = tampered + + h := sha256.New() + h.Write(honest) + envelopeSHA := hex.EncodeToString(h.Sum(nil)) + + srv := httptest.NewServer(http.HandlerFunc(m.handleHTTPDownload)) + defer srv.Close() + + resp := streamVerifyPostBody(t, srv, key, envelopeSHA, int64(declaredSize)) + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + t.Fatal("regression: 200 OK on integrity mismatch") + } + // On the error response the body is the JSON error envelope — its size + // is small and NOT declaredSize. If Content-Length somehow equals + // declaredSize a future regression has reintroduced the leak. + if cl := resp.Header.Get("Content-Length"); cl == strconv.Itoa(declaredSize) { + t.Fatalf("regression: error response Content-Length=%s matches S3 size; bytes may be leaking", cl) + } +} diff --git a/cmd/proxy/lfs_test.go b/cmd/proxy/lfs_test.go index 75002472..41466609 100644 --- a/cmd/proxy/lfs_test.go +++ b/cmd/proxy/lfs_test.go @@ -69,7 +69,15 @@ func (f *fakeS3) PutObject(ctx context.Context, params *s3.PutObjectInput, optFn return &s3.PutObjectOutput{}, nil } func (f *fakeS3) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - return &s3.GetObjectOutput{}, nil + data, ok := f.objects[*params.Key] + if !ok { + return nil, errors.New("NoSuchKey: object not found") + } + length := int64(len(data)) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader(data)), + ContentLength: &length, + }, nil } func (f *fakeS3) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { f.deleted = append(f.deleted, *params.Key) diff --git a/cmd/proxy/lfs_tracker.go b/cmd/proxy/lfs_tracker.go index dd7c4e4d..6b951512 100644 --- a/cmd/proxy/lfs_tracker.go +++ b/cmd/proxy/lfs_tracker.go @@ -362,6 +362,17 @@ func (t *LfsOpsTracker) EmitDownloadCompleted(requestID, s3Key, mode string, dur t.Emit(event) } +// EmitDownloadIntegrityFailed emits a download-integrity-failure event. This +// signals that bytes fetched from S3 did not match the client-supplied envelope +// checksum — i.e. tampering, a compromised bucket, or a stale envelope. +func (t *LfsOpsTracker) EmitDownloadIntegrityFailed(requestID, s3Bucket, s3Key, mode, checksumAlg, expectedSHA256, actualSHA256 string, bytesRead, expectedSize int64) { + if !t.IsEnabled() { + return + } + event := NewDownloadIntegrityFailedEvent(t.config.ProxyID, requestID, s3Bucket, s3Key, mode, checksumAlg, expectedSHA256, actualSHA256, bytesRead, expectedSize) + t.Emit(event) +} + // EmitOrphanDetected emits an orphan detected event. func (t *LfsOpsTracker) EmitOrphanDetected(requestID, detectionSource, topic, s3Bucket, s3Key, originalRequestID, reason string, size int64) { if !t.IsEnabled() { diff --git a/cmd/proxy/lfs_tracker_types.go b/cmd/proxy/lfs_tracker_types.go index 853d8650..3df0b200 100644 --- a/cmd/proxy/lfs_tracker_types.go +++ b/cmd/proxy/lfs_tracker_types.go @@ -22,12 +22,13 @@ import ( // Event types for LFS operations tracking. const ( - EventTypeUploadStarted = "upload_started" - EventTypeUploadCompleted = "upload_completed" - EventTypeUploadFailed = "upload_failed" - EventTypeDownloadRequested = "download_requested" - EventTypeDownloadCompleted = "download_completed" - EventTypeOrphanDetected = "orphan_detected" + EventTypeUploadStarted = "upload_started" + EventTypeUploadCompleted = "upload_completed" + EventTypeUploadFailed = "upload_failed" + EventTypeDownloadRequested = "download_requested" + EventTypeDownloadCompleted = "download_completed" + EventTypeDownloadIntegrityFailed = "download_integrity_failed" + EventTypeOrphanDetected = "orphan_detected" ) // TrackerEventVersion is the current schema version for tracker events. @@ -102,6 +103,21 @@ type DownloadCompletedEvent struct { Size int64 `json:"size,omitempty"` } +// DownloadIntegrityFailedEvent is emitted when the bytes served from S3 do not +// match the client-supplied envelope checksum. This indicates either post-upload +// tampering in S3, a compromised bucket, or a stale/incorrect envelope. +type DownloadIntegrityFailedEvent struct { + BaseEvent + S3Bucket string `json:"s3_bucket"` + S3Key string `json:"s3_key"` + Mode string `json:"mode"` + ChecksumAlg string `json:"checksum_alg"` + ExpectedSHA256 string `json:"expected_sha256"` + ActualSHA256 string `json:"actual_sha256"` + BytesRead int64 `json:"bytes_read"` + ExpectedSize int64 `json:"expected_size,omitempty"` +} + // OrphanDetectedEvent is emitted when an orphaned S3 object is detected. type OrphanDetectedEvent struct { BaseEvent @@ -127,20 +143,22 @@ func (e *BaseEvent) GetEventType() string { } // GetTopic returns the topic for partitioning. -func (e *UploadStartedEvent) GetTopic() string { return e.Topic } -func (e *UploadCompletedEvent) GetTopic() string { return e.Topic } -func (e *UploadFailedEvent) GetTopic() string { return e.Topic } -func (e *DownloadRequestedEvent) GetTopic() string { return "" } -func (e *DownloadCompletedEvent) GetTopic() string { return "" } -func (e *OrphanDetectedEvent) GetTopic() string { return e.Topic } +func (e *UploadStartedEvent) GetTopic() string { return e.Topic } +func (e *UploadCompletedEvent) GetTopic() string { return e.Topic } +func (e *UploadFailedEvent) GetTopic() string { return e.Topic } +func (e *DownloadRequestedEvent) GetTopic() string { return "" } +func (e *DownloadCompletedEvent) GetTopic() string { return "" } +func (e *DownloadIntegrityFailedEvent) GetTopic() string { return "" } +func (e *OrphanDetectedEvent) GetTopic() string { return e.Topic } // Marshal serializes the event to JSON. -func (e *UploadStartedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } -func (e *UploadCompletedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } -func (e *UploadFailedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } -func (e *DownloadRequestedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } -func (e *DownloadCompletedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } -func (e *OrphanDetectedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } +func (e *UploadStartedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } +func (e *UploadCompletedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } +func (e *UploadFailedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } +func (e *DownloadRequestedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } +func (e *DownloadCompletedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } +func (e *DownloadIntegrityFailedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } +func (e *OrphanDetectedEvent) Marshal() ([]byte, error) { return json.Marshal(e) } // newBaseEvent creates a new base event with common fields. func newBaseEvent(eventType, proxyID, requestID string) BaseEvent { @@ -223,6 +241,21 @@ func NewDownloadCompletedEvent(proxyID, requestID, s3Key, mode string, durationM } } +// NewDownloadIntegrityFailedEvent creates a new integrity-failure event. +func NewDownloadIntegrityFailedEvent(proxyID, requestID, s3Bucket, s3Key, mode, checksumAlg, expectedSHA256, actualSHA256 string, bytesRead, expectedSize int64) *DownloadIntegrityFailedEvent { + return &DownloadIntegrityFailedEvent{ + BaseEvent: newBaseEvent(EventTypeDownloadIntegrityFailed, proxyID, requestID), + S3Bucket: s3Bucket, + S3Key: s3Key, + Mode: mode, + ChecksumAlg: checksumAlg, + ExpectedSHA256: expectedSHA256, + ActualSHA256: actualSHA256, + BytesRead: bytesRead, + ExpectedSize: expectedSize, + } +} + // NewOrphanDetectedEvent creates a new orphan detected event. func NewOrphanDetectedEvent(proxyID, requestID, detectionSource, topic, s3Bucket, s3Key, originalRequestID, reason string, size int64) *OrphanDetectedEvent { return &OrphanDetectedEvent{ diff --git a/cmd/proxy/openapi.yaml b/cmd/proxy/openapi.yaml index 065ad0e1..22d1d54a 100644 --- a/cmd/proxy/openapi.yaml +++ b/cmd/proxy/openapi.yaml @@ -152,7 +152,7 @@ paths: $ref: "#/components/schemas/LfsEnvelope" example: kfs_lfs: 1 - bucket: kafscale-lfs + bucket: my-bucket key: default/video-uploads/lfs/2026/02/05/abc123 size: 10485760 sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 @@ -223,10 +223,29 @@ paths: description: | Retrieves an LFS object from S3 storage. Supports two modes: - - **presign**: Returns a presigned S3 URL for direct download (default) - - **stream**: Streams the object content through the proxy + - **stream** (default): The proxy buffers the S3 object to temporary + storage while computing SHA-256, then verifies the digest against + the client-supplied envelope checksum. **Only on a successful match + are the bytes streamed to the client** (200 OK with `Content-Length` + set to the verified size). On mismatch — or if the S3 response + exceeds the envelope-declared size — the proxy returns 502 with + `code: integrity_failure` and no payload bytes ever reach the + client. An `integrity_failure` event is emitted to the LFS ops + tracker topic for operator alerting. - For presign mode, the URL TTL is capped by server configuration. + Requires `integrity.sha256` AND `integrity.size` in the request + body; the size enables a hard cap on the S3 read so a compromised + bucket cannot exhaust proxy temp storage. + - **presign**: Returns a presigned S3 URL for direct client-to-S3 + download. Disabled by default; enable with + `KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true`. When enabled, the + response body includes an `integrity` block so client SDKs can + verify downloaded bytes themselves. Bare consumption of the + presigned URL (e.g. `curl`) opts out of integrity verification. + + **Both modes require the client to supply `integrity.sha256` from the + Kafka envelope.** The Kafka envelope is authoritative; S3 is treated + as untrusted storage. operationId: lfsDownload security: - ApiKeyAuth: [] @@ -247,19 +266,27 @@ paths: schema: $ref: "#/components/schemas/DownloadRequest" examples: + stream: + summary: Stream content with integrity check (default) + value: + bucket: my-bucket + key: default/video-uploads/lfs/2026/02/05/abc123 + mode: stream + integrity: + sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: sha256 + size: 10485760 presign: - summary: Get presigned URL + summary: Get presigned URL (requires KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true) value: - bucket: kafscale-lfs + bucket: my-bucket key: default/video-uploads/lfs/2026/02/05/abc123 mode: presign expires_seconds: 300 - stream: - summary: Stream content - value: - bucket: kafscale-lfs - key: default/video-uploads/lfs/2026/02/05/abc123 - mode: stream + integrity: + sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: sha256 + size: 10485760 responses: "200": description: Presigned URL or streamed object content @@ -269,15 +296,33 @@ paths: $ref: "#/components/schemas/DownloadResponse" example: mode: presign - url: https://s3.amazonaws.com/kafscale-lfs/... + url: https://s3.amazonaws.com/my-bucket/... expires_at: "2026-02-05T10:35:00Z" + integrity: + sha256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: sha256 + size: 10485760 application/octet-stream: schema: type: string format: binary - description: Streamed object content (when mode=stream) + description: | + Streamed object content (when mode=stream), returned only + after server-side SHA-256 verification has succeeded. + Response headers `Content-Length`, `X-Kafscale-LFS-Checksum` + (`sha256=`) and `X-Kafscale-LFS-Content-Length` allow + clients to verify independently. On checksum mismatch or + oversize S3 payload, the proxy returns 502 + `code: integrity_failure` instead of any body bytes. "400": - description: Invalid request + description: | + Invalid request. Possible `code` values include + `missing_integrity` (no sha256 supplied), `invalid_integrity` + (sha256 is not a 64-character hex digest or size is negative), + `missing_integrity_size` (no size on stream mode), + `payload_too_large` (Integrity.Size exceeds proxy's + KAFSCALE_LFS_PROXY_MAX_BLOB_SIZE or cannot be verified safely), + `presign_disabled`, `invalid_mode`, `unsupported_checksum_alg`. content: application/json: schema: @@ -288,8 +333,21 @@ paths: application/json: schema: $ref: "#/components/schemas/ErrorResponse" + "500": + description: | + Internal proxy failure. Possible `code` values: + `temp_storage_failed` (temporary verification storage unavailable + or full). + content: + application/json: + schema: + $ref: "#/components/schemas/ErrorResponse" "502": - description: Upstream storage failure + description: | + Upstream storage failure or integrity verification failure. + Possible `code` values: `s3_get_failed`, `s3_presign_failed`, + `integrity_failure` (SHA-256 mismatch or S3 payload exceeds + envelope-declared size). content: application/json: schema: @@ -333,7 +391,7 @@ components: bucket: type: string description: S3 bucket name - example: kafscale-lfs + example: my-bucket key: type: string description: S3 object key @@ -371,25 +429,31 @@ components: DownloadRequest: type: object - required: [bucket, key] + required: [bucket, key, integrity] description: Request to download an LFS object properties: bucket: type: string description: S3 bucket name (must match proxy's configured bucket) - example: kafscale-lfs + example: my-bucket key: type: string description: S3 object key from the LFS envelope example: default/video-uploads/lfs/2026/02/05/abc123 mode: type: string - enum: [presign, stream] - default: presign + enum: [stream, presign] + default: stream description: | Download mode: - - presign: Return a presigned URL for direct S3 download - - stream: Stream content through the proxy + - stream (default): Proxy buffers S3 → verifies SHA-256 against + `integrity.sha256` → only then streams to client. Mismatch or + oversize S3 payload → 502 `code: integrity_failure`. Requires + `integrity.sha256` AND `integrity.size`. + - presign: Return a presigned URL for direct S3 download. Requires + `KAFSCALE_LFS_PROXY_PRESIGN_ENABLED=true` on the proxy. Client + SDK must verify downloaded bytes against `integrity.sha256` in + the response. expires_seconds: type: integer format: int32 @@ -397,23 +461,53 @@ components: minimum: 1 maximum: 3600 description: Requested presign URL TTL in seconds (capped by server) + integrity: + $ref: "#/components/schemas/IntegrityClaim" + + IntegrityClaim: + type: object + required: [sha256] + description: | + Authoritative integrity metadata carried from the Kafka-stored LFS + envelope. The proxy verifies S3 bytes against this claim (stream mode) + or echoes it back for client-side verification (presign mode). + properties: + sha256: + type: string + pattern: "^[0-9a-fA-F]{64}$" + description: SHA-256 hash of the object content, hex-encoded + example: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + checksum_alg: + type: string + enum: [sha256] + default: sha256 + description: Checksum algorithm (only sha256 is supported) + size: + type: integer + format: int64 + minimum: 1 + description: Expected object size in bytes; required for stream mode + example: 10485760 DownloadResponse: type: object - description: Response for presign download mode + required: [mode] + description: Response for download request properties: mode: type: string - enum: [presign] + enum: [stream, presign] description: Download mode used url: type: string format: uri - description: Presigned S3 URL for direct download + description: Presigned S3 URL (presign mode only) expires_at: type: string format: date-time - description: URL expiration timestamp + description: URL expiration timestamp (presign mode only) + integrity: + $ref: "#/components/schemas/IntegrityClaim" ErrorResponse: type: object diff --git a/deploy/docker-compose/Makefile b/deploy/docker-compose/Makefile index da1b53f9..1fe625d2 100644 --- a/deploy/docker-compose/Makefile +++ b/deploy/docker-compose/Makefile @@ -17,6 +17,8 @@ REGISTRY ?= 192.168.0.131:5100 TAG ?= dev +BUCKET ?= kafscale +OUT ?= downloaded-blob.bin up: ## Start all services REGISTRY=$(REGISTRY) TAG=$(TAG) docker-compose up -d @@ -47,11 +49,13 @@ test-upload: ## Test LFS upload (creates 1KB test file) -H "Content-Type: application/octet-stream" \ --data-binary @- | jq . -test-download: ## Test LFS download (requires KEY variable) - @if [ -z "$(KEY)" ]; then echo "Usage: make test-download KEY=default/topic/lfs/..."; exit 1; fi - @curl -s -X POST http://localhost:8080/lfs/download \ +test-download: ## Test LFS stream download (requires KEY, SHA256, and SIZE) + @if [ -z "$(KEY)" ] || [ -z "$(SHA256)" ] || [ -z "$(SIZE)" ]; then echo "Usage: make test-download KEY=default/topic/lfs/... SHA256= SIZE= [BUCKET=kafscale] [OUT=downloaded-blob.bin]"; exit 1; fi + @curl -f -sS -X POST http://localhost:8080/lfs/download \ -H "Content-Type: application/json" \ - -d '{"bucket":"kafscale","key":"$(KEY)","mode":"presign"}' | jq . + -d '{"bucket":"$(BUCKET)","key":"$(KEY)","mode":"stream","integrity":{"sha256":"$(SHA256)","checksum_alg":"sha256","size":$(SIZE)}}' \ + -o "$(OUT)" + @echo "Downloaded verified LFS object to $(OUT)" clean: ## Stop services and remove volumes docker-compose down -v diff --git a/deploy/docker-compose/README.md b/deploy/docker-compose/README.md index 8d726abf..8c73f814 100644 --- a/deploy/docker-compose/README.md +++ b/deploy/docker-compose/README.md @@ -133,10 +133,21 @@ These settings allow 6+ GB streaming uploads without hitting default limits. ### LFS Download ```bash -# Get presigned URL +# Stream a verified object through the proxy. +# Use bucket/key/sha256/size from the LFS envelope returned by /lfs/produce. curl -X POST http://localhost:8080/lfs/download \ -H "Content-Type: application/json" \ - -d '{"bucket":"kafscale","key":"default/test-uploads/lfs/...","mode":"presign"}' + -d '{ + "bucket": "kafscale", + "key": "default/test-uploads/lfs/2026/02/05/obj-...", + "mode": "stream", + "integrity": { + "sha256": "", + "checksum_alg": "sha256", + "size": 1024 + } + }' \ + -o downloaded-blob.bin ``` ## Traceability diff --git a/pkg/lfs/doc.go b/pkg/lfs/doc.go index 80f86172..ab59e81f 100644 --- a/pkg/lfs/doc.go +++ b/pkg/lfs/doc.go @@ -36,7 +36,7 @@ The LFS envelope is a JSON object stored as the Kafka message value: { "kfs_lfs": 1, - "bucket": "kafscale-lfs", + "bucket": "my-bucket", "key": "default/topic/lfs/2026/02/01/obj-uuid", "size": 10485760, "sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", @@ -51,7 +51,7 @@ Basic usage with franz-go: // Create S3 client s3Client, err := lfs.NewS3Client(ctx, lfs.S3Config{ - Bucket: "kafscale-lfs", + Bucket: "my-bucket", Region: "us-east-1", Endpoint: "http://minio:9000", // optional }) diff --git a/pkg/lfs/envelope_test.go b/pkg/lfs/envelope_test.go index 05fed6cb..4150154d 100644 --- a/pkg/lfs/envelope_test.go +++ b/pkg/lfs/envelope_test.go @@ -156,7 +156,7 @@ func TestIsLfsEnvelope(t *testing.T) { func TestEnvelopeRoundTrip(t *testing.T) { env := Envelope{ Version: 1, - Bucket: "kafscale-lfs", + Bucket: "test-bucket", Key: "prod/events/lfs/2026/02/01/obj-550e8400-e29b-41d4-a716-446655440000", Size: 5242880, SHA256: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", diff --git a/pkg/storage/recovery.go b/pkg/storage/recovery.go index 743bc3b8..33cab9cf 100644 --- a/pkg/storage/recovery.go +++ b/pkg/storage/recovery.go @@ -75,6 +75,11 @@ type sourceSegment struct { RecoveredSegment } +type copiedObject struct { + segmentKey string + indexKey string +} + // RecoverTopicToTimestamp copies immutable segment/index pairs from one topic to // another up to the first segment created after the requested cutoff. func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecoveryConfig) (*TopicRecoveryResult, error) { @@ -100,7 +105,7 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery return nil, fmt.Errorf("target topic must differ from source topic") } - targetPrefix := path.Join(cfg.TargetNamespace, cfg.TargetTopic) + targetPrefix := path.Join(cfg.TargetNamespace, cfg.TargetTopic) + "/" existing, err := s3.ListSegments(ctx, targetPrefix) if err != nil { return nil, err @@ -111,7 +116,7 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery } } - sourcePrefix := path.Join(cfg.SourceNamespace, cfg.SourceTopic) + sourcePrefix := path.Join(cfg.SourceNamespace, cfg.SourceTopic) + "/" objects, err := s3.ListSegments(ctx, sourcePrefix) if err != nil { return nil, err @@ -153,6 +158,18 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery RestoreTo: cfg.RestoreTo.UTC(), Partitions: make([]RecoveredPartition, 0, len(partitions)), } + copiedObjects := make([]copiedObject, 0) + restoreCommitted := false + defer func() { + if restoreCommitted { + return + } + for i := len(copiedObjects) - 1; i >= 0; i-- { + copied := copiedObjects[i] + _ = s3.DeleteIndex(context.Background(), copied.indexKey) + _ = s3.DeleteSegment(context.Background(), copied.segmentKey) + } + }() for _, partition := range partitions { segments := segmentsByPartition[partition] @@ -182,6 +199,10 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery if err := s3.UploadSegment(ctx, targetSegmentKey, segmentBytes); err != nil { return nil, err } + copiedObjects = append(copiedObjects, copiedObject{ + segmentKey: targetSegmentKey, + indexKey: targetIndexKey, + }) if err := s3.UploadIndex(ctx, targetIndexKey, indexBytes); err != nil { return nil, err } @@ -197,6 +218,7 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery result.Partitions = append(result.Partitions, summary) } + restoreCommitted = true return result, nil } diff --git a/pkg/storage/recovery_test.go b/pkg/storage/recovery_test.go index 47e2de64..4671e4a8 100644 --- a/pkg/storage/recovery_test.go +++ b/pkg/storage/recovery_test.go @@ -86,6 +86,48 @@ func TestRecoverTopicToTimestampRejectsExistingTarget(t *testing.T) { } } +func TestRecoverTopicToTimestamp_IgnoresSiblingTopicPrefixes(t *testing.T) { + s3 := NewMemoryS3Client() + created := time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC) + uploadRecoverySegment(t, s3, "default", "orders", 0, 0, created) + uploadRecoverySegment(t, s3, "default", "orders-v2", 0, 0, created) + + result, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: created.Add(time.Minute), + }) + if err != nil { + t.Fatalf("RecoverTopicToTimestamp: %v", err) + } + if result.SegmentsCopied != 1 { + t.Fatalf("expected 1 copied segment, got %d", result.SegmentsCopied) + } +} + +func TestRecoverTopicToTimestamp_IgnoresSiblingTargetPrefixes(t *testing.T) { + s3 := NewMemoryS3Client() + created := time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC) + uploadRecoverySegment(t, s3, "default", "orders", 0, 0, created) + uploadRecoverySegment(t, s3, "default", "orders-restore-old", 0, 0, created) + + result, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: created.Add(time.Minute), + }) + if err != nil { + t.Fatalf("RecoverTopicToTimestamp: %v", err) + } + if result.SegmentsCopied != 1 { + t.Fatalf("expected 1 copied segment, got %d", result.SegmentsCopied) + } +} + func uploadRecoverySegment(t *testing.T, s3 *MemoryS3Client, namespace string, topic string, partition int32, baseOffset int64, created time.Time) { t.Helper() diff --git a/pkg/storage/s3_aws.go b/pkg/storage/s3_aws.go index daf1de4d..ce4be38e 100644 --- a/pkg/storage/s3_aws.go +++ b/pkg/storage/s3_aws.go @@ -35,6 +35,7 @@ import ( type awsS3API interface { PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) HeadBucket(ctx context.Context, params *s3.HeadBucketInput, optFns ...func(*s3.Options)) (*s3.HeadBucketOutput, error) CreateBucket(ctx context.Context, params *s3.CreateBucketInput, optFns ...func(*s3.Options)) (*s3.CreateBucketOutput, error) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) @@ -153,6 +154,14 @@ func (c *awsS3Client) UploadIndex(ctx context.Context, key string, body []byte) return c.putObject(ctx, key, body) } +func (c *awsS3Client) DeleteSegment(ctx context.Context, key string) error { + return c.deleteObject(ctx, key) +} + +func (c *awsS3Client) DeleteIndex(ctx context.Context, key string) error { + return c.deleteObject(ctx, key) +} + func (c *awsS3Client) putObject(ctx context.Context, key string, body []byte) error { input := &s3.PutObjectInput{ Bucket: aws.String(c.bucket), @@ -179,6 +188,17 @@ func (c *awsS3Client) putObject(ctx context.Context, key string, body []byte) er return nil } +func (c *awsS3Client) deleteObject(ctx context.Context, key string) error { + _, err := c.api.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(c.bucket), + Key: aws.String(key), + }) + if err != nil { + return fmt.Errorf("delete object %s: %w", key, err) + } + return nil +} + func isNotFoundErr(err error) bool { var apiErr smithy.APIError if errors.As(err, &apiErr) { diff --git a/pkg/storage/s3_memory.go b/pkg/storage/s3_memory.go index c8317751..3a3d4dc7 100644 --- a/pkg/storage/s3_memory.go +++ b/pkg/storage/s3_memory.go @@ -59,6 +59,20 @@ func (m *MemoryS3Client) UploadIndex(ctx context.Context, key string, body []byt return nil } +func (m *MemoryS3Client) DeleteSegment(ctx context.Context, key string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.data, key) + return nil +} + +func (m *MemoryS3Client) DeleteIndex(ctx context.Context, key string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.index, key) + return nil +} + func (m *MemoryS3Client) DownloadSegment(ctx context.Context, key string, rng *ByteRange) ([]byte, error) { m.mu.Lock() defer m.mu.Unlock() diff --git a/pkg/storage/s3client.go b/pkg/storage/s3client.go index 6fad8189..896c2b48 100644 --- a/pkg/storage/s3client.go +++ b/pkg/storage/s3client.go @@ -41,6 +41,8 @@ func (br *ByteRange) headerValue() *string { type S3Client interface { UploadSegment(ctx context.Context, key string, body []byte) error UploadIndex(ctx context.Context, key string, body []byte) error + DeleteSegment(ctx context.Context, key string) error + DeleteIndex(ctx context.Context, key string) error DownloadSegment(ctx context.Context, key string, rng *ByteRange) ([]byte, error) DownloadIndex(ctx context.Context, key string) ([]byte, error) ListSegments(ctx context.Context, prefix string) ([]S3Object, error) diff --git a/pkg/storage/s3client_test.go b/pkg/storage/s3client_test.go index 86a30205..6c15f4b4 100644 --- a/pkg/storage/s3client_test.go +++ b/pkg/storage/s3client_test.go @@ -31,6 +31,7 @@ type fakeS3 struct { putInputs []*s3.PutObjectInput getInput *s3.GetObjectInput getData []byte + deleteKey *string putErr error getErr error headErr error @@ -53,6 +54,11 @@ func (f *fakeS3) GetObject(ctx context.Context, params *s3.GetObjectInput, optFn }, nil } +func (f *fakeS3) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + f.deleteKey = params.Key + return &s3.DeleteObjectOutput{}, nil +} + func (f *fakeS3) HeadBucket(ctx context.Context, params *s3.HeadBucketInput, optFns ...func(*s3.Options)) (*s3.HeadBucketOutput, error) { return &s3.HeadBucketOutput{}, f.headErr } @@ -183,6 +189,18 @@ func TestAWSS3Client_UploadNoKMS(t *testing.T) { } } +func TestAWSS3Client_DeleteSegment(t *testing.T) { + api := &fakeS3{} + client := newAWSClientWithAPI("test-bucket", "us-east-1", "", api) + + if err := client.DeleteSegment(context.Background(), "topic/0/segment-0"); err != nil { + t.Fatalf("DeleteSegment: %v", err) + } + if api.deleteKey == nil || *api.deleteKey != "topic/0/segment-0" { + t.Fatalf("unexpected delete key: %v", api.deleteKey) + } +} + func TestAWSS3Client_EnsureBucket(t *testing.T) { api := &fakeS3{} client := newAWSClientWithAPI("test-bucket", "us-east-1", "", api) @@ -364,3 +382,22 @@ func TestMemoryS3Client_ListSegments(t *testing.T) { t.Fatalf("expected 0 objects, got %d", len(objs)) } } + +func TestMemoryS3Client_DeleteSegmentAndIndex(t *testing.T) { + m := NewMemoryS3Client() + _ = m.UploadSegment(context.Background(), "topic/0/seg-0", []byte("a")) + _ = m.UploadIndex(context.Background(), "topic/0/seg-0.index", []byte("idx")) + + if err := m.DeleteSegment(context.Background(), "topic/0/seg-0"); err != nil { + t.Fatalf("DeleteSegment: %v", err) + } + if err := m.DeleteIndex(context.Background(), "topic/0/seg-0.index"); err != nil { + t.Fatalf("DeleteIndex: %v", err) + } + if _, err := m.DownloadSegment(context.Background(), "topic/0/seg-0", nil); err == nil { + t.Fatal("expected deleted segment to be missing") + } + if _, err := m.DownloadIndex(context.Background(), "topic/0/seg-0.index"); err == nil { + t.Fatal("expected deleted index to be missing") + } +}