From bf1b2d15d2903404b3b9594da352a119b0e3bc6e Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Mon, 18 May 2026 08:01:04 +0200 Subject: [PATCH 1/3] skip index fetch in decoders --- .../internal/decoder/decoder.go | 17 +++---- .../internal/decoder/decoder_test.go | 45 ++++++++++++++++++ .../sql-processor/internal/decoder/decoder.go | 18 +++----- .../internal/decoder/decoder_test.go | 46 +++++++++++++++++++ 4 files changed, 105 insertions(+), 21 deletions(-) diff --git a/addons/processors/iceberg-processor/internal/decoder/decoder.go b/addons/processors/iceberg-processor/internal/decoder/decoder.go index e180fca5..40409fff 100644 --- a/addons/processors/iceberg-processor/internal/decoder/decoder.go +++ b/addons/processors/iceberg-processor/internal/decoder/decoder.go @@ -59,6 +59,10 @@ type Decoder interface { Decode(ctx context.Context, segmentKey, indexKey string, topic string, partition int32) ([]Record, error) } +type getObjectAPI interface { + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) +} + // New returns an S3-backed decoder. func New(cfg config.Config) (Decoder, error) { loadOptions := []func(*awsconfig.LoadOptions) error{} @@ -93,19 +97,12 @@ func New(cfg config.Config) (Decoder, error) { } type s3Decoder struct { - client *s3.Client + client getObjectAPI bucket string } -func (d *s3Decoder) Decode(ctx context.Context, segmentKey, indexKey string, topic string, partition int32) ([]Record, error) { - indexBytes, err := d.getObject(ctx, indexKey) - if err != nil { - return nil, fmt.Errorf("download index: %w", err) - } - if _, err := parseIndex(indexBytes); err != nil { - return nil, err - } - +// Decode only needs the segment payload; segment/index pairing is validated during discovery. +func (d *s3Decoder) Decode(ctx context.Context, segmentKey, _ string, topic string, partition int32) ([]Record, error) { segmentBytes, err := d.getObject(ctx, segmentKey) if err != nil { return nil, fmt.Errorf("download segment: %w", err) diff --git a/addons/processors/iceberg-processor/internal/decoder/decoder_test.go b/addons/processors/iceberg-processor/internal/decoder/decoder_test.go index f402100f..074477e5 100644 --- a/addons/processors/iceberg-processor/internal/decoder/decoder_test.go +++ b/addons/processors/iceberg-processor/internal/decoder/decoder_test.go @@ -17,9 +17,15 @@ package decoder import ( "bytes" + "context" "encoding/binary" + "errors" + "io" "testing" "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" ) func TestDecodeSegment(t *testing.T) { @@ -62,6 +68,30 @@ func TestParseIndex(t *testing.T) { } } +func TestDecodeSkipsIndexDownload(t *testing.T) { + segment := buildSegmentBytes(10, 1, time.Now().UnixMilli(), buildRecordBatch(10, time.Now().UnixMilli(), []byte("k1"), []byte("v1"))) + client := &fakeGetObjectClient{ + objects: map[string][]byte{ + "segment.kfs": segment, + }, + } + dec := &s3Decoder{ + client: client, + bucket: "test-bucket", + } + + records, err := dec.Decode(context.Background(), "segment.kfs", "segment.index", "orders", 0) + if err != nil { + t.Fatalf("Decode: %v", err) + } + if len(records) != 1 { + t.Fatalf("expected 1 record, got %d", len(records)) + } + if len(client.requests) != 1 || client.requests[0] != "segment.kfs" { + t.Fatalf("unexpected requests: %+v", client.requests) + } +} + func buildIndexBytes(count int32) []byte { buf := bytes.NewBuffer(make([]byte, 0, 64)) buf.WriteString(indexMagic) @@ -152,3 +182,18 @@ func encodeVarint(value int64) []byte { } return out } + +type fakeGetObjectClient struct { + objects map[string][]byte + requests []string +} + +func (f *fakeGetObjectClient) GetObject(ctx context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + key := aws.ToString(params.Key) + f.requests = append(f.requests, key) + data, ok := f.objects[key] + if !ok { + return nil, errors.New("missing object") + } + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(data))}, nil +} diff --git a/addons/processors/sql-processor/internal/decoder/decoder.go b/addons/processors/sql-processor/internal/decoder/decoder.go index a6f8771b..d8233d12 100644 --- a/addons/processors/sql-processor/internal/decoder/decoder.go +++ b/addons/processors/sql-processor/internal/decoder/decoder.go @@ -59,6 +59,10 @@ type Decoder interface { Decode(ctx context.Context, segmentKey, indexKey string, topic string, partition int32) ([]Record, error) } +type getObjectAPI interface { + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) +} + func New(cfg config.Config) (Decoder, error) { loadOptions := []func(*awsconfig.LoadOptions) error{} if cfg.S3.Region != "" { @@ -93,7 +97,7 @@ func New(cfg config.Config) (Decoder, error) { } type s3Decoder struct { - client *s3.Client + client getObjectAPI bucket string metrics s3Metrics } @@ -116,16 +120,8 @@ func newS3Metrics() s3Metrics { } } -func (d *s3Decoder) Decode(ctx context.Context, segmentKey, indexKey string, topic string, partition int32) ([]Record, error) { - indexBytes, err := d.getObject(ctx, "get", indexKey) - if err != nil { - return nil, fmt.Errorf("download index: %w", err) - } - if _, err := parseIndex(indexBytes); err != nil { - d.metrics.decodeErrors.Inc() - return nil, err - } - +// Decode only needs the segment payload; segment/index pairing is validated during discovery. +func (d *s3Decoder) Decode(ctx context.Context, segmentKey, _ string, topic string, partition int32) ([]Record, error) { segmentBytes, err := d.getObject(ctx, "get", segmentKey) if err != nil { return nil, fmt.Errorf("download segment: %w", err) diff --git a/addons/processors/sql-processor/internal/decoder/decoder_test.go b/addons/processors/sql-processor/internal/decoder/decoder_test.go index 7ee35fc8..e9aa532e 100644 --- a/addons/processors/sql-processor/internal/decoder/decoder_test.go +++ b/addons/processors/sql-processor/internal/decoder/decoder_test.go @@ -17,8 +17,14 @@ package decoder import ( "bytes" + "context" "encoding/binary" + "errors" + "io" "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" ) func TestParseIndex(t *testing.T) { @@ -82,6 +88,31 @@ func TestDecodeBatchCompressed(t *testing.T) { } } +func TestDecodeSkipsIndexDownload(t *testing.T) { + segment := buildSegment(buildBatch(5, 1000, buildRecord(0, 0, []byte("k"), []byte("v")))) + client := &fakeGetObjectClient{ + objects: map[string][]byte{ + "segment.kfs": segment, + }, + } + dec := &s3Decoder{ + client: client, + bucket: "test-bucket", + metrics: newS3Metrics(), + } + + records, err := dec.Decode(context.Background(), "segment.kfs", "segment.index", "orders", 0) + if err != nil { + t.Fatalf("Decode: %v", err) + } + if len(records) != 1 { + t.Fatalf("expected 1 record, got %d", len(records)) + } + if len(client.requests) != 1 || client.requests[0] != "segment.kfs" { + t.Fatalf("unexpected requests: %+v", client.requests) + } +} + func buildSegment(batch []byte) []byte { segment := make([]byte, 0, segmentHeaderLen+len(batch)+segmentFooterLen) header := make([]byte, segmentHeaderLen) @@ -150,3 +181,18 @@ func makeRecordPayload(tsDelta int32, offsetDelta int32, key []byte, value []byt writeVarint(&body, 0) return body.Bytes() } + +type fakeGetObjectClient struct { + objects map[string][]byte + requests []string +} + +func (f *fakeGetObjectClient) GetObject(ctx context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + key := aws.ToString(params.Key) + f.requests = append(f.requests, key) + data, ok := f.objects[key] + if !ok { + return nil, errors.New("missing object") + } + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(data))}, nil +} From 9e49edc7838db46c773a044468870fa51d1d17ce Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Mon, 18 May 2026 08:01:10 +0200 Subject: [PATCH 2/3] add exact PITR segment truncation --- cmd/kafscale-cli/main.go | 4 +- cmd/kafscale-cli/main_test.go | 105 +++++++++++--- pkg/storage/index.go | 26 ++-- pkg/storage/recovery.go | 47 +++++-- pkg/storage/recovery_exact.go | 208 +++++++++++++++++++++++++++ pkg/storage/recovery_exact_test.go | 217 +++++++++++++++++++++++++++++ pkg/storage/recovery_test.go | 7 +- 7 files changed, 564 insertions(+), 50 deletions(-) create mode 100644 pkg/storage/recovery_exact.go create mode 100644 pkg/storage/recovery_exact_test.go diff --git a/cmd/kafscale-cli/main.go b/cmd/kafscale-cli/main.go index 134c8800..bb4f32ff 100644 --- a/cmd/kafscale-cli/main.go +++ b/cmd/kafscale-cli/main.go @@ -174,7 +174,7 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3 return err } if len(sourceMeta.Topics) == 0 || sourceMeta.Topics[0].ErrorCode != 0 { - return metadata.ErrUnknownTopic + return fmt.Errorf("source topic metadata: %w", metadata.ErrUnknownTopic) } sourcePartitions := make(map[int32]struct{}, len(sourceMeta.Topics[0].Partitions)) @@ -238,7 +238,7 @@ func executeRestore(ctx context.Context, stdout io.Writer, cfg restoreConfig, s3 return err } if len(targetMeta.Topics) == 0 || targetMeta.Topics[0].ErrorCode != 0 { - return metadata.ErrUnknownTopic + return fmt.Errorf("target topic metadata: %w", metadata.ErrUnknownTopic) } recoveredByPartition := make(map[int32]storage.RecoveredPartition, len(result.Partitions)) diff --git a/cmd/kafscale-cli/main_test.go b/cmd/kafscale-cli/main_test.go index 099e6e56..f105adec 100644 --- a/cmd/kafscale-cli/main_test.go +++ b/cmd/kafscale-cli/main_test.go @@ -18,7 +18,9 @@ package main import ( "bytes" "context" + "encoding/binary" "errors" + "hash/crc32" "strings" "testing" "time" @@ -95,12 +97,7 @@ func TestRunRestoreCommandUsesInjectedClients(t *testing.T) { s3 := storage.NewMemoryS3Client() artifact, err := storage.BuildSegment(storage.SegmentWriterConfig{IndexIntervalMessages: 1}, []storage.RecordBatch{ - { - BaseOffset: 0, - LastOffsetDelta: 0, - MessageCount: 1, - Bytes: make([]byte, 70), - }, + makeStorageRecoveryBatch(0, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC).UnixMilli(), []int64{0}), }, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC)) if err != nil { t.Fatalf("BuildSegment: %v", err) @@ -123,8 +120,13 @@ func TestRunRestoreCommandUsesInjectedClients(t *testing.T) { newS3Client = func(context.Context, storage.S3Config) (storage.S3Client, error) { return s3, nil } - newEtcdStore = func(context.Context, metadata.ClusterMetadata, metadata.EtcdStoreConfig) (*metadata.EtcdStore, error) { - return store, nil + newEtcdStore = func(ctx context.Context, _ metadata.ClusterMetadata, cfg metadata.EtcdStoreConfig) (*metadata.EtcdStore, error) { + return metadata.NewEtcdStore(ctx, metadata.ClusterMetadata{ + Brokers: []protocol.MetadataBroker{ + {NodeID: 1, Host: "broker-0", Port: 9092}, + }, + ControllerID: 1, + }, cfg) } newMemoryS3 = func() storage.S3Client { return s3 } @@ -237,12 +239,7 @@ func TestExecuteRestoreCreatesRecoveredTopic(t *testing.T) { s3 := storage.NewMemoryS3Client() artifact, err := storage.BuildSegment(storage.SegmentWriterConfig{IndexIntervalMessages: 1}, []storage.RecordBatch{ - { - BaseOffset: 0, - LastOffsetDelta: 0, - MessageCount: 1, - Bytes: make([]byte, 70), - }, + makeStorageRecoveryBatch(0, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC).UnixMilli(), []int64{0}), }, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC)) if err != nil { t.Fatalf("BuildSegment: %v", err) @@ -460,12 +457,7 @@ func TestExecuteRestoreRollsBackCopiedS3ObjectsOnPartialFailure(t *testing.T) { mem := storage.NewMemoryS3Client() artifact, err := storage.BuildSegment(storage.SegmentWriterConfig{IndexIntervalMessages: 1}, []storage.RecordBatch{ - { - BaseOffset: 0, - LastOffsetDelta: 0, - MessageCount: 1, - Bytes: make([]byte, 70), - }, + makeStorageRecoveryBatch(0, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC).UnixMilli(), []int64{0}), }, time.Date(2026, 5, 13, 12, 0, 0, 0, time.UTC)) if err != nil { t.Fatalf("BuildSegment: %v", err) @@ -508,3 +500,76 @@ func TestExecuteRestoreRollsBackCopiedS3ObjectsOnPartialFailure(t *testing.T) { t.Fatalf("expected rolled back target topic to be absent, got %+v", meta.Topics) } } + +func makeStorageRecoveryBatch(baseOffset, firstTimestamp int64, timestampDeltas []int64) storage.RecordBatch { + records := make([][]byte, 0, len(timestampDeltas)) + maxTimestamp := firstTimestamp + for i, delta := range timestampDeltas { + records = append(records, makeStorageRecoveryRecord(delta, int64(i))) + if ts := firstTimestamp + delta; ts > maxTimestamp { + maxTimestamp = ts + } + } + + bodyLen := 0 + for _, record := range records { + bodyLen += len(record) + } + const recordBatchHeaderLen = 61 + const batchFrameHeaderLen = 12 + batch := make([]byte, recordBatchHeaderLen+bodyLen) + binary.BigEndian.PutUint64(batch[0:8], uint64(baseOffset)) + binary.BigEndian.PutUint32(batch[8:12], uint32(len(batch)-batchFrameHeaderLen)) + batch[16] = 2 + binary.BigEndian.PutUint64(batch[27:35], uint64(firstTimestamp)) + binary.BigEndian.PutUint64(batch[35:43], uint64(maxTimestamp)) + binary.BigEndian.PutUint64(batch[43:51], uint64(^uint64(0))) + binary.BigEndian.PutUint16(batch[51:53], uint16(^uint16(0))) + binary.BigEndian.PutUint32(batch[53:57], uint32(^uint32(0))) + binary.BigEndian.PutUint32(batch[57:61], uint32(len(records))) + offset := recordBatchHeaderLen + for _, record := range records { + copy(batch[offset:], record) + offset += len(record) + } + binary.BigEndian.PutUint32(batch[23:27], uint32(len(records)-1)) + binary.BigEndian.PutUint32(batch[17:21], crc32.Checksum(batch[21:], crc32.MakeTable(crc32.Castagnoli))) + + return storage.RecordBatch{ + BaseOffset: baseOffset, + LastOffsetDelta: int32(len(records) - 1), + MessageCount: int32(len(records)), + Bytes: batch, + } +} + +func makeStorageRecoveryRecord(timestampDelta, offsetDelta int64) []byte { + payload := bytes.NewBuffer(nil) + payload.WriteByte(0) + payload.Write(encodeStorageRecoveryVarint(timestampDelta)) + payload.Write(encodeStorageRecoveryVarint(offsetDelta)) + payload.Write(encodeStorageRecoveryVarint(-1)) + payload.Write(encodeStorageRecoveryVarint(-1)) + payload.Write(encodeStorageRecoveryVarint(0)) + + record := bytes.NewBuffer(nil) + record.Write(encodeStorageRecoveryVarint(int64(payload.Len()))) + record.Write(payload.Bytes()) + return record.Bytes() +} + +func encodeStorageRecoveryVarint(value int64) []byte { + zigzag := uint64(value<<1) ^ uint64(value>>63) + out := make([]byte, 0, 10) + for { + b := byte(zigzag & 0x7f) + zigzag >>= 7 + if zigzag != 0 { + b |= 0x80 + } + out = append(out, b) + if zigzag == 0 { + return out + } + } +} diff --git a/pkg/storage/index.go b/pkg/storage/index.go index 64c5dd77..ec54699e 100644 --- a/pkg/storage/index.go +++ b/pkg/storage/index.go @@ -87,44 +87,48 @@ func (b *IndexBuilder) BuildBytes() ([]byte, error) { // ParseIndex validates and returns entries from serialized bytes. func ParseIndex(data []byte) ([]*IndexEntry, error) { + _, entries, err := parseIndexMetadata(data) + return entries, err +} + +func parseIndexMetadata(data []byte) (int32, []*IndexEntry, error) { if len(data) < 16 { - return nil, fmt.Errorf("index too small") + return 0, nil, fmt.Errorf("index too small") } if string(data[:4]) != indexMagic { - return nil, fmt.Errorf("invalid index magic") + return 0, nil, fmt.Errorf("invalid index magic") } reader := bytes.NewReader(data[4:]) var version uint16 if err := binary.Read(reader, binary.BigEndian, &version); err != nil { - return nil, err + return 0, nil, err } if version != 1 { - return nil, fmt.Errorf("unsupported index version %d", version) + return 0, nil, fmt.Errorf("unsupported index version %d", version) } var count int32 if err := binary.Read(reader, binary.BigEndian, &count); err != nil { - return nil, err + return 0, nil, err } var interval int32 if err := binary.Read(reader, binary.BigEndian, &interval); err != nil { - return nil, err + return 0, nil, err } var reserved uint16 if err := binary.Read(reader, binary.BigEndian, &reserved); err != nil { - return nil, err + return 0, nil, err } entries := make([]*IndexEntry, count) for i := int32(0); i < count; i++ { var offset int64 var position int32 if err := binary.Read(reader, binary.BigEndian, &offset); err != nil { - return nil, err + return 0, nil, err } if err := binary.Read(reader, binary.BigEndian, &position); err != nil { - return nil, err + return 0, nil, err } entries[i] = &IndexEntry{Offset: offset, Position: position} } - _ = interval - return entries, nil + return interval, entries, nil } diff --git a/pkg/storage/recovery.go b/pkg/storage/recovery.go index 33cab9cf..cf2b400f 100644 --- a/pkg/storage/recovery.go +++ b/pkg/storage/recovery.go @@ -80,8 +80,9 @@ type copiedObject struct { indexKey string } -// RecoverTopicToTimestamp copies immutable segment/index pairs from one topic to -// another up to the first segment created after the requested cutoff. +// RecoverTopicToTimestamp copies immutable segment/index pairs into a new topic +// and truncates the final candidate segment at the first record whose timestamp +// exceeds the requested cutoff. func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecoveryConfig) (*TopicRecoveryResult, error) { if s3 == nil { return nil, fmt.Errorf("s3 client required") @@ -174,17 +175,21 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery for _, partition := range partitions { segments := segmentsByPartition[partition] sort.Slice(segments, func(i, j int) bool { return segments[i].BaseOffset < segments[j].BaseOffset }) + lastCandidate := -1 + for i, segment := range segments { + if segment.CreatedAt.After(cfg.RestoreTo) { + break + } + lastCandidate = i + } summary := RecoveredPartition{ Partition: partition, LastOffset: -1, Segments: make([]RecoveredSegment, 0, len(segments)), } - for _, segment := range segments { - if segment.CreatedAt.After(cfg.RestoreTo) { - break - } - + for i := 0; i <= lastCandidate; i++ { + segment := segments[i] segmentBytes, err := s3.DownloadSegment(ctx, segment.SourceKey, nil) if err != nil { return nil, err @@ -194,20 +199,40 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery return nil, err } - targetSegmentKey := segmentObjectKey(cfg.TargetNamespace, cfg.TargetTopic, partition, segment.BaseOffset) - targetIndexKey := segmentIndexKey(cfg.TargetNamespace, cfg.TargetTopic, partition, segment.BaseOffset) - if err := s3.UploadSegment(ctx, targetSegmentKey, segmentBytes); err != nil { + plan := &segmentRestorePlan{ + segmentBytes: segmentBytes, + indexBytes: indexBytes, + baseOffset: segment.BaseOffset, + lastOffset: segment.LastOffset, + keep: true, + } + if i == lastCandidate { + plan, err = buildRestorePlan(segmentBytes, indexBytes, cfg.RestoreTo, segment.CreatedAt) + if err != nil { + return nil, err + } + if !plan.keep { + break + } + } + + targetSegmentKey := segmentObjectKey(cfg.TargetNamespace, cfg.TargetTopic, partition, plan.baseOffset) + targetIndexKey := segmentIndexKey(cfg.TargetNamespace, cfg.TargetTopic, partition, plan.baseOffset) + if err := s3.UploadSegment(ctx, targetSegmentKey, plan.segmentBytes); err != nil { return nil, err } copiedObjects = append(copiedObjects, copiedObject{ segmentKey: targetSegmentKey, indexKey: targetIndexKey, }) - if err := s3.UploadIndex(ctx, targetIndexKey, indexBytes); err != nil { + if err := s3.UploadIndex(ctx, targetIndexKey, plan.indexBytes); err != nil { return nil, err } copied := segment.RecoveredSegment + copied.BaseOffset = plan.baseOffset + copied.LastOffset = plan.lastOffset + copied.SizeBytes = int64(len(plan.segmentBytes)) copied.TargetKey = targetSegmentKey copied.TargetIndex = targetIndexKey summary.Segments = append(summary.Segments, copied) diff --git a/pkg/storage/recovery_exact.go b/pkg/storage/recovery_exact.go new file mode 100644 index 00000000..b448f39d --- /dev/null +++ b/pkg/storage/recovery_exact.go @@ -0,0 +1,208 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "bytes" + "encoding/binary" + "fmt" + "hash/crc32" + "io" + "time" +) + +const ( + recordBatchHeaderLen = 61 + batchFrameHeaderLen = 12 +) + +type segmentRestorePlan struct { + segmentBytes []byte + indexBytes []byte + baseOffset int64 + lastOffset int64 + keep bool +} + +func buildRestorePlan(segmentBytes, indexBytes []byte, restoreTo time.Time, createdAt time.Time) (*segmentRestorePlan, error) { + indexInterval, _, err := parseIndexMetadata(indexBytes) + if err != nil { + return nil, fmt.Errorf("parse index: %w", err) + } + batches, err := collectRecoverableBatches(segmentBytes, restoreTo.UnixMilli()) + if err != nil { + return nil, err + } + if len(batches) == 0 { + return &segmentRestorePlan{keep: false}, nil + } + artifact, err := BuildSegment(SegmentWriterConfig{IndexIntervalMessages: indexInterval}, batches, createdAt) + if err != nil { + return nil, err + } + return &segmentRestorePlan{ + segmentBytes: artifact.SegmentBytes, + indexBytes: artifact.IndexBytes, + baseOffset: artifact.BaseOffset, + lastOffset: artifact.LastOffset, + keep: true, + }, nil +} + +func collectRecoverableBatches(segmentBytes []byte, cutoffMs int64) ([]RecordBatch, error) { + if len(segmentBytes) < segmentHeaderLen+segmentFooterLen { + return nil, fmt.Errorf("segment too small") + } + if string(segmentBytes[:4]) != segmentMagic { + return nil, fmt.Errorf("invalid segment magic") + } + body := segmentBytes[segmentHeaderLen : len(segmentBytes)-segmentFooterLen] + batches := make([]RecordBatch, 0) + for offset := 0; offset+batchFrameHeaderLen <= len(body); { + batchLen := int(binary.BigEndian.Uint32(body[offset+8 : offset+12])) + if batchLen <= 0 { + break + } + frameLen := batchFrameHeaderLen + batchLen + if offset+frameLen > len(body) { + return nil, fmt.Errorf("record batch exceeds segment bounds") + } + batch := append([]byte(nil), body[offset:offset+frameLen]...) + truncated, keep, done, err := truncateRecordBatchToTimestamp(batch, cutoffMs) + if err != nil { + return nil, err + } + if keep { + batches = append(batches, truncated) + } + offset += frameLen + if done { + break + } + } + return batches, nil +} + +func truncateRecordBatchToTimestamp(batch []byte, cutoffMs int64) (RecordBatch, bool, bool, error) { + if len(batch) < recordBatchHeaderLen { + return RecordBatch{}, false, false, fmt.Errorf("record batch too small: %d", len(batch)) + } + firstTimestamp := int64(binary.BigEndian.Uint64(batch[27:35])) + maxTimestamp := int64(binary.BigEndian.Uint64(batch[35:43])) + if maxTimestamp <= cutoffMs { + parsed, err := NewRecordBatchFromBytes(batch) + return parsed, true, false, err + } + if firstTimestamp > cutoffMs { + return RecordBatch{}, false, true, nil + } + attributes := int16(binary.BigEndian.Uint16(batch[21:23])) + if compressionType(attributes) != 0 { + return RecordBatch{}, false, false, fmt.Errorf("exact PITR does not support compressed record batches") + } + + recordCount := int32(binary.BigEndian.Uint32(batch[57:61])) + reader := bytes.NewReader(batch[recordBatchHeaderLen:]) + keptCount := int32(0) + keptBytes := 0 + lastOffsetDelta := int32(0) + maxIncludedTimestamp := firstTimestamp + recordDataLen := len(batch[recordBatchHeaderLen:]) + for i := int32(0); i < recordCount; i++ { + timestampDelta, offsetDelta, err := scanRecord(reader) + if err != nil { + return RecordBatch{}, false, false, err + } + recordTimestamp := firstTimestamp + timestampDelta + if recordTimestamp > cutoffMs { + break + } + keptCount++ + keptBytes = recordDataLen - reader.Len() + lastOffsetDelta = offsetDelta + if recordTimestamp > maxIncludedTimestamp { + maxIncludedTimestamp = recordTimestamp + } + } + if keptCount == 0 { + return RecordBatch{}, false, true, nil + } + if keptCount == recordCount { + parsed, err := NewRecordBatchFromBytes(batch) + return parsed, true, true, err + } + + truncated := append([]byte(nil), batch[:recordBatchHeaderLen+keptBytes]...) + binary.BigEndian.PutUint32(truncated[8:12], uint32(len(truncated)-batchFrameHeaderLen)) + binary.BigEndian.PutUint32(truncated[23:27], uint32(lastOffsetDelta)) + binary.BigEndian.PutUint64(truncated[35:43], uint64(maxIncludedTimestamp)) + binary.BigEndian.PutUint32(truncated[57:61], uint32(keptCount)) + binary.BigEndian.PutUint32(truncated[17:21], crc32.Checksum(truncated[21:], crcTable)) + + parsed, err := NewRecordBatchFromBytes(truncated) + return parsed, true, true, err +} + +func scanRecord(reader *bytes.Reader) (timestampDelta int64, offsetDelta int32, err error) { + recordLen, err := readVarint(reader) + if err != nil { + return 0, 0, err + } + if recordLen < 0 { + return 0, 0, fmt.Errorf("invalid record length") + } + recordData := make([]byte, recordLen) + if _, err := io.ReadFull(reader, recordData); err != nil { + return 0, 0, err + } + buf := bytes.NewReader(recordData) + if _, err := buf.ReadByte(); err != nil { + return 0, 0, err + } + timestampDelta, err = readVarint(buf) + if err != nil { + return 0, 0, err + } + offsetDelta64, err := readVarint(buf) + if err != nil { + return 0, 0, err + } + return timestampDelta, int32(offsetDelta64), nil +} + +func readVarint(reader *bytes.Reader) (int64, error) { + var value uint64 + var shift uint + for { + b, err := reader.ReadByte() + if err != nil { + return 0, err + } + value |= uint64(b&0x7f) << shift + if b&0x80 == 0 { + break + } + shift += 7 + if shift > 63 { + return 0, fmt.Errorf("varint too long") + } + } + return int64((value >> 1) ^ uint64((int64(value&1)<<63)>>63)), nil +} + +func compressionType(attributes int16) int16 { + return attributes & 0x07 +} diff --git a/pkg/storage/recovery_exact_test.go b/pkg/storage/recovery_exact_test.go new file mode 100644 index 00000000..fa561714 --- /dev/null +++ b/pkg/storage/recovery_exact_test.go @@ -0,0 +1,217 @@ +// Copyright 2026 Alexander Alten (novatechflow), NovaTechflow (novatechflow.com). +// This project is supported and financed by Scalytics, Inc. (www.scalytics.io). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "bytes" + "context" + "encoding/binary" + "hash/crc32" + "testing" + "time" +) + +func TestRecoverTopicToTimestampTruncatesFinalSegmentByRecordTimestamp(t *testing.T) { + s3 := NewMemoryS3Client() + created := time.Date(2026, 5, 18, 12, 0, 0, 0, time.UTC) + firstTimestamp := created.UnixMilli() + batch := buildRecoveryBatch(0, firstTimestamp, []int64{0, 1_000, 2_000}) + uploadRecoverySegmentWithBatches(t, s3, "default", "orders", 0, created, 1, batch) + + result, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: time.UnixMilli(firstTimestamp + 1_500), + }) + if err != nil { + t.Fatalf("RecoverTopicToTimestamp: %v", err) + } + if result.SegmentsCopied != 1 { + t.Fatalf("expected 1 copied segment, got %d", result.SegmentsCopied) + } + if result.Partitions[0].LastOffset != 1 { + t.Fatalf("expected last offset 1, got %d", result.Partitions[0].LastOffset) + } + + segmentBytes, err := s3.DownloadSegment(context.Background(), "default/orders-restore/0/segment-00000000000000000000.kfs", nil) + if err != nil { + t.Fatalf("download restored segment: %v", err) + } + if got := int32(binary.BigEndian.Uint32(segmentBytes[16:20])); got != 2 { + t.Fatalf("expected 2 messages in restored segment header, got %d", got) + } + if got := CountRecordBatchMessages(segmentBytes[segmentHeaderLen : len(segmentBytes)-segmentFooterLen]); got != 2 { + t.Fatalf("expected 2 messages in restored segment body, got %d", got) + } + lastOffset, err := parseSegmentFooter(segmentBytes[len(segmentBytes)-segmentFooterLen:]) + if err != nil { + t.Fatalf("parse footer: %v", err) + } + if lastOffset != 1 { + t.Fatalf("expected footer last offset 1, got %d", lastOffset) + } + + indexBytes, err := s3.DownloadIndex(context.Background(), "default/orders-restore/0/segment-00000000000000000000.index") + if err != nil { + t.Fatalf("download restored index: %v", err) + } + entries, err := ParseIndex(indexBytes) + if err != nil { + t.Fatalf("ParseIndex: %v", err) + } + if len(entries) != 1 || entries[0].Offset != 0 { + t.Fatalf("unexpected restored index entries: %+v", entries) + } +} + +func TestRecoverTopicToTimestampSkipsFinalSegmentWhenItsFirstRecordIsAfterCutoff(t *testing.T) { + s3 := NewMemoryS3Client() + created := time.Date(2026, 5, 18, 12, 0, 0, 0, time.UTC) + uploadRecoverySegmentWithBatches(t, s3, "default", "orders", 0, created, 1, buildRecoveryBatch(0, created.UnixMilli(), []int64{0})) + uploadRecoverySegmentWithBatches(t, s3, "default", "orders", 0, created.Add(time.Minute), 1, buildRecoveryBatch(1, created.Add(2*time.Minute).UnixMilli(), []int64{0})) + + result, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: created.Add(90 * time.Second), + }) + if err != nil { + t.Fatalf("RecoverTopicToTimestamp: %v", err) + } + if result.SegmentsCopied != 1 { + t.Fatalf("expected only 1 copied segment, got %d", result.SegmentsCopied) + } + if result.Partitions[0].LastOffset != 0 { + t.Fatalf("expected last offset 0, got %d", result.Partitions[0].LastOffset) + } + if _, err := s3.DownloadSegment(context.Background(), "default/orders-restore/0/segment-00000000000000000001.kfs", nil); err == nil { + t.Fatal("expected second segment to be skipped") + } +} + +func TestRecoverTopicToTimestampRejectsCompressedIntersectingBatch(t *testing.T) { + s3 := NewMemoryS3Client() + created := time.Date(2026, 5, 18, 12, 0, 0, 0, time.UTC) + batch := buildRecoveryBatch(0, created.UnixMilli(), []int64{0, 1_000}) + binary.BigEndian.PutUint16(batch.Bytes[21:23], 1) + binary.BigEndian.PutUint32(batch.Bytes[17:21], crc32Checksum(batch.Bytes[21:])) + uploadRecoverySegmentWithBatches(t, s3, "default", "orders", 0, created, 1, batch) + + _, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: time.UnixMilli(created.UnixMilli() + 500), + }) + if err == nil { + t.Fatal("expected compressed intersecting batch to fail exact PITR") + } +} + +func uploadRecoverySegmentWithBatches(t *testing.T, s3 *MemoryS3Client, namespace, topic string, partition int32, created time.Time, indexInterval int32, batches ...RecordBatch) { + t.Helper() + + artifact, err := BuildSegment(SegmentWriterConfig{IndexIntervalMessages: indexInterval}, batches, created) + if err != nil { + t.Fatalf("BuildSegment: %v", err) + } + if err := s3.UploadSegment(context.Background(), segmentObjectKey(namespace, topic, partition, artifact.BaseOffset), artifact.SegmentBytes); err != nil { + t.Fatalf("UploadSegment: %v", err) + } + if err := s3.UploadIndex(context.Background(), segmentIndexKey(namespace, topic, partition, artifact.BaseOffset), artifact.IndexBytes); err != nil { + t.Fatalf("UploadIndex: %v", err) + } +} + +func buildRecoveryBatch(baseOffset, firstTimestamp int64, timestampDeltas []int64) RecordBatch { + records := make([][]byte, 0, len(timestampDeltas)) + maxTimestamp := firstTimestamp + for i, delta := range timestampDeltas { + records = append(records, buildRecoveryRecord(delta, int64(i))) + if ts := firstTimestamp + delta; ts > maxTimestamp { + maxTimestamp = ts + } + } + + bodyLen := 0 + for _, record := range records { + bodyLen += len(record) + } + batch := make([]byte, recordBatchHeaderLen+bodyLen) + binary.BigEndian.PutUint64(batch[0:8], uint64(baseOffset)) + binary.BigEndian.PutUint32(batch[8:12], uint32(len(batch)-batchFrameHeaderLen)) + batch[16] = 2 + binary.BigEndian.PutUint64(batch[27:35], uint64(firstTimestamp)) + binary.BigEndian.PutUint64(batch[35:43], uint64(maxTimestamp)) + binary.BigEndian.PutUint64(batch[43:51], uint64(^uint64(0))) + binary.BigEndian.PutUint16(batch[51:53], uint16(^uint16(0))) + binary.BigEndian.PutUint32(batch[53:57], uint32(^uint32(0))) + binary.BigEndian.PutUint32(batch[57:61], uint32(len(records))) + + offset := recordBatchHeaderLen + for _, record := range records { + copy(batch[offset:], record) + offset += len(record) + } + binary.BigEndian.PutUint32(batch[23:27], uint32(len(records)-1)) + binary.BigEndian.PutUint32(batch[17:21], crc32Checksum(batch[21:])) + return RecordBatch{ + BaseOffset: baseOffset, + LastOffsetDelta: int32(len(records) - 1), + MessageCount: int32(len(records)), + Bytes: batch, + } +} + +func buildRecoveryRecord(timestampDelta, offsetDelta int64) []byte { + payload := bytes.NewBuffer(nil) + payload.WriteByte(0) + payload.Write(encodeRecoveryVarint(timestampDelta)) + payload.Write(encodeRecoveryVarint(offsetDelta)) + payload.Write(encodeRecoveryVarint(-1)) + payload.Write(encodeRecoveryVarint(-1)) + payload.Write(encodeRecoveryVarint(0)) + + record := bytes.NewBuffer(nil) + record.Write(encodeRecoveryVarint(int64(payload.Len()))) + record.Write(payload.Bytes()) + return record.Bytes() +} + +func encodeRecoveryVarint(value int64) []byte { + zigzag := uint64(value<<1) ^ uint64(value>>63) + out := make([]byte, 0, 10) + for { + b := byte(zigzag & 0x7f) + zigzag >>= 7 + if zigzag != 0 { + b |= 0x80 + } + out = append(out, b) + if zigzag == 0 { + return out + } + } +} + +func crc32Checksum(data []byte) uint32 { + return crc32.Checksum(data, crcTable) +} diff --git a/pkg/storage/recovery_test.go b/pkg/storage/recovery_test.go index 4671e4a8..0a88fe68 100644 --- a/pkg/storage/recovery_test.go +++ b/pkg/storage/recovery_test.go @@ -132,12 +132,7 @@ func uploadRecoverySegment(t *testing.T, s3 *MemoryS3Client, namespace string, t t.Helper() artifact, err := BuildSegment(SegmentWriterConfig{IndexIntervalMessages: 1}, []RecordBatch{ - { - BaseOffset: baseOffset, - LastOffsetDelta: 0, - MessageCount: 1, - Bytes: make([]byte, 70), - }, + buildRecoveryBatch(baseOffset, created.UnixMilli(), []int64{0}), }, created) if err != nil { t.Fatalf("BuildSegment: %v", err) From ebb6324ebb079dd773e784c903d391fed54a28cb Mon Sep 17 00:00:00 2001 From: 2pk03 Date: Tue, 19 May 2026 10:36:33 +0200 Subject: [PATCH 3/3] fix exact restore cutoff --- docs/operations.md | 5 +-- pkg/storage/index.go | 14 ++++++-- pkg/storage/index_test.go | 27 ++++++++++++++ pkg/storage/recovery.go | 3 +- pkg/storage/recovery_exact.go | 5 ++- pkg/storage/recovery_exact_test.go | 56 ++++++++++++++++++++++++++++++ 6 files changed, 104 insertions(+), 6 deletions(-) diff --git a/docs/operations.md b/docs/operations.md index 11f19269..5a8a0ee6 100644 --- a/docs/operations.md +++ b/docs/operations.md @@ -235,7 +235,7 @@ The restore image must include `/bin/sh` and `etcdctl`. Override with `KAFSCALE_ ### Topic Recovery On The DR Spine -For broker topic data, KafScale now supports **segment-granular recovery into a new topic** on the DR side. This is intentionally not an in-place rollback on the primary cluster. +For broker topic data, KafScale now supports **recovery into a new topic** on the DR side. This is intentionally not an in-place rollback on the primary cluster. Use `kafscale-cli restore` to create a fresh target topic, copy `.kfs` segment/index pairs up to a cutoff timestamp, and set the recovered topic's next offsets: @@ -250,7 +250,8 @@ Operational semantics: - Recovery runs against the existing KafScale S3 + etcd control plane, including `KAFSCALE_S3_BUCKET`, `KAFSCALE_S3_REGION`, `KAFSCALE_S3_ENDPOINT`, `KAFSCALE_S3_PATH_STYLE`, and `KAFSCALE_ETCD_ENDPOINTS`. - The target topic must be new. KafScale refuses to restore over an existing persisted topic. -- Recovery is **segment-granular**. The cutoff uses the immutable segment creation time, then copies contiguous segment/index pairs up to the first segment created after that timestamp. +- Recovery copies every fully eligible segment/index pair, then inspects the first intersecting segment and truncates it at the first record whose timestamp exceeds the cutoff. +- If the intersecting batch is compressed, exact truncation is not safe, so the restore fails and the operator should choose an earlier cutoff or a segment boundary instead. - Offsets are preserved inside the recovered topic so replay, validation, and downstream cutover can happen without rewriting the source topic. - The safer pattern is restore, validate, then cut consumers or downstream jobs over deliberately. diff --git a/pkg/storage/index.go b/pkg/storage/index.go index ec54699e..241b3340 100644 --- a/pkg/storage/index.go +++ b/pkg/storage/index.go @@ -22,7 +22,9 @@ import ( ) const ( - indexMagic = "IDX\x00" + indexMagic = "IDX\x00" + indexHeaderLen = 16 + indexEntryLen = 12 ) // IndexBuilder tracks offsets and file positions for sparse indexing. @@ -92,7 +94,7 @@ func ParseIndex(data []byte) ([]*IndexEntry, error) { } func parseIndexMetadata(data []byte) (int32, []*IndexEntry, error) { - if len(data) < 16 { + if len(data) < indexHeaderLen { return 0, nil, fmt.Errorf("index too small") } if string(data[:4]) != indexMagic { @@ -118,6 +120,14 @@ func parseIndexMetadata(data []byte) (int32, []*IndexEntry, error) { if err := binary.Read(reader, binary.BigEndian, &reserved); err != nil { return 0, nil, err } + if count < 0 { + return 0, nil, fmt.Errorf("invalid index entry count %d", count) + } + remaining := len(data) - indexHeaderLen + requiredBytes := int64(count) * indexEntryLen + if requiredBytes > int64(remaining) { + return 0, nil, fmt.Errorf("index truncated: need %d bytes for %d entries, have %d", requiredBytes, count, remaining) + } entries := make([]*IndexEntry, count) for i := int32(0); i < count; i++ { var offset int64 diff --git a/pkg/storage/index_test.go b/pkg/storage/index_test.go index 0d6af813..c26ada50 100644 --- a/pkg/storage/index_test.go +++ b/pkg/storage/index_test.go @@ -16,6 +16,7 @@ package storage import ( + "encoding/binary" "strings" "testing" ) @@ -119,3 +120,29 @@ func TestIndexRoundTrip(t *testing.T) { } } } + +func TestParseIndexRejectsNegativeCount(t *testing.T) { + data := make([]byte, indexHeaderLen) + copy(data, indexMagic) + binary.BigEndian.PutUint16(data[4:6], 1) + binary.BigEndian.PutUint32(data[6:10], uint32(^uint32(0))) + binary.BigEndian.PutUint32(data[10:14], 1) + + _, err := ParseIndex(data) + if err == nil || !strings.Contains(err.Error(), "invalid index entry count") { + t.Fatalf("expected invalid count error, got: %v", err) + } +} + +func TestParseIndexRejectsTruncatedEntries(t *testing.T) { + data := make([]byte, indexHeaderLen) + copy(data, indexMagic) + binary.BigEndian.PutUint16(data[4:6], 1) + binary.BigEndian.PutUint32(data[6:10], 2) + binary.BigEndian.PutUint32(data[10:14], 1) + + _, err := ParseIndex(data) + if err == nil || !strings.Contains(err.Error(), "index truncated") { + t.Fatalf("expected truncated index error, got: %v", err) + } +} diff --git a/pkg/storage/recovery.go b/pkg/storage/recovery.go index cf2b400f..3945caec 100644 --- a/pkg/storage/recovery.go +++ b/pkg/storage/recovery.go @@ -175,9 +175,10 @@ func RecoverTopicToTimestamp(ctx context.Context, s3 S3Client, cfg TopicRecovery for _, partition := range partitions { segments := segmentsByPartition[partition] sort.Slice(segments, func(i, j int) bool { return segments[i].BaseOffset < segments[j].BaseOffset }) - lastCandidate := -1 + lastCandidate := len(segments) - 1 for i, segment := range segments { if segment.CreatedAt.After(cfg.RestoreTo) { + lastCandidate = i break } lastCandidate = i diff --git a/pkg/storage/recovery_exact.go b/pkg/storage/recovery_exact.go index b448f39d..5aad6156 100644 --- a/pkg/storage/recovery_exact.go +++ b/pkg/storage/recovery_exact.go @@ -164,7 +164,10 @@ func scanRecord(reader *bytes.Reader) (timestampDelta int64, offsetDelta int32, if recordLen < 0 { return 0, 0, fmt.Errorf("invalid record length") } - recordData := make([]byte, recordLen) + if recordLen > int64(reader.Len()) { + return 0, 0, fmt.Errorf("record length %d exceeds remaining batch bytes %d", recordLen, reader.Len()) + } + recordData := make([]byte, int(recordLen)) if _, err := io.ReadFull(reader, recordData); err != nil { return 0, 0, err } diff --git a/pkg/storage/recovery_exact_test.go b/pkg/storage/recovery_exact_test.go index fa561714..9e6fe80d 100644 --- a/pkg/storage/recovery_exact_test.go +++ b/pkg/storage/recovery_exact_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/binary" "hash/crc32" + "strings" "testing" "time" ) @@ -106,6 +107,49 @@ func TestRecoverTopicToTimestampSkipsFinalSegmentWhenItsFirstRecordIsAfterCutoff } } +func TestRecoverTopicToTimestampTruncatesFirstPostCutoffSegment(t *testing.T) { + s3 := NewMemoryS3Client() + created := time.Date(2026, 5, 18, 12, 0, 0, 0, time.UTC) + cutoff := created.Add(90 * time.Second) + uploadRecoverySegmentWithBatches(t, s3, "default", "orders", 0, created, 1, buildRecoveryBatch(0, created.UnixMilli(), []int64{0})) + uploadRecoverySegmentWithBatches(t, s3, "default", "orders", 0, created.Add(2*time.Minute), 1, buildRecoveryBatch(1, cutoff.Add(-30*time.Second).UnixMilli(), []int64{0, 45_000})) + + result, err := RecoverTopicToTimestamp(context.Background(), s3, TopicRecoveryConfig{ + SourceNamespace: "default", + SourceTopic: "orders", + TargetNamespace: "default", + TargetTopic: "orders-restore", + RestoreTo: cutoff, + }) + if err != nil { + t.Fatalf("RecoverTopicToTimestamp: %v", err) + } + if result.SegmentsCopied != 2 { + t.Fatalf("expected 2 copied segments, got %d", result.SegmentsCopied) + } + if result.Partitions[0].LastOffset != 1 { + t.Fatalf("expected last offset 1, got %d", result.Partitions[0].LastOffset) + } + + segmentBytes, err := s3.DownloadSegment(context.Background(), "default/orders-restore/0/segment-00000000000000000001.kfs", nil) + if err != nil { + t.Fatalf("download restored segment: %v", err) + } + if got := int32(binary.BigEndian.Uint32(segmentBytes[16:20])); got != 1 { + t.Fatalf("expected 1 message in restored segment header, got %d", got) + } + if got := CountRecordBatchMessages(segmentBytes[segmentHeaderLen : len(segmentBytes)-segmentFooterLen]); got != 1 { + t.Fatalf("expected 1 message in restored segment body, got %d", got) + } + lastOffset, err := parseSegmentFooter(segmentBytes[len(segmentBytes)-segmentFooterLen:]) + if err != nil { + t.Fatalf("parse footer: %v", err) + } + if lastOffset != 1 { + t.Fatalf("expected footer last offset 1, got %d", lastOffset) + } +} + func TestRecoverTopicToTimestampRejectsCompressedIntersectingBatch(t *testing.T) { s3 := NewMemoryS3Client() created := time.Date(2026, 5, 18, 12, 0, 0, 0, time.UTC) @@ -126,6 +170,18 @@ func TestRecoverTopicToTimestampRejectsCompressedIntersectingBatch(t *testing.T) } } +func TestScanRecordRejectsLengthsBeyondRemainingBytes(t *testing.T) { + reader := bytes.NewReader(append(encodeRecoveryVarint(32), 0x00)) + + _, _, err := scanRecord(reader) + if err == nil { + t.Fatal("expected oversized record length to fail") + } + if !strings.Contains(err.Error(), "exceeds remaining batch bytes") { + t.Fatalf("unexpected error: %v", err) + } +} + func uploadRecoverySegmentWithBatches(t *testing.T, s3 *MemoryS3Client, namespace, topic string, partition int32, created time.Time, indexInterval int32, batches ...RecordBatch) { t.Helper()