From 3352401a9a97a1bbbcbc29a9bdf74e5c328f0528 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 1 May 2026 18:12:39 +0000 Subject: [PATCH 1/2] Initial plan From 4e895ae6cccf48fbdc28c96e6c7db44a9006beac Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 1 May 2026 18:18:38 +0000 Subject: [PATCH 2/2] feat: Stepable interface - rename ReorgJunctionBlock to ReorgJunctionBlockRef, add ReorgJunctionBlock returning *pbbstream.BlockMeta Agent-Logs-Url: https://github.com/streamingfast/bstream/sessions/3177c805-f746-444a-843d-05a665ff7b0c Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- CHANGELOG.md | 5 +++++ cursor_resolver.go | 10 +++++----- cursor_resolver_test.go | 4 ++-- forkable/forkable.go | 26 ++++++++++++++++++-------- forkable/forkable_test.go | 17 +++++++++-------- forkable/init_test.go | 4 ++++ hub/subscription_test.go | 6 +++++- interfaces.go | 3 ++- preprocess.go | 11 +++++++++-- types.go | 11 +++++++++-- 10 files changed, 68 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3aa7d66..9ddf849 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `BlockTimestampGate`: new gate that lets blocks through once a block's timestamp meets or exceeds a given `time.Time`, supporting both inclusive and exclusive gate types. +- `Stepable` interface: added `ReorgJunctionBlock() *pbbstream.BlockMeta` returning full block metadata (id, number, parentId, parentNum, libNum, timestamp) for the reorg junction point. + +### Changed + +- `Stepable` interface: renamed `ReorgJunctionBlock() BlockRef` to `ReorgJunctionBlockRef() BlockRef`. ## 2026-01-02 diff --git a/cursor_resolver.go b/cursor_resolver.go index bb41307..51e241e 100644 --- a/cursor_resolver.go +++ b/cursor_resolver.go @@ -107,10 +107,10 @@ func (f *cursorResolver) ProcessBlock(blk *pbbstream.Block, obj any) error { if err := f.sendUndoBlocks(undoBlocks, reorgJunctionBlock); err != nil { return err } - if err := f.sendMergedBlocksBetween(StepIrreversible, f.cursor.LIB.Num(), reorgJunctionBlock.Num()); err != nil { + if err := f.sendMergedBlocksBetween(StepIrreversible, f.cursor.LIB.Num(), reorgJunctionBlock.Number); err != nil { return err } - if err := f.sendMergedBlocksBetween(StepNewIrreversible, reorgJunctionBlock.Num(), blk.Number); err != nil { + if err := f.sendMergedBlocksBetween(StepNewIrreversible, reorgJunctionBlock.Number, blk.Number); err != nil { return err } @@ -120,7 +120,7 @@ func (f *cursorResolver) ProcessBlock(blk *pbbstream.Block, obj any) error { } -func (f *cursorResolver) sendUndoBlocks(undoBlocks []*pbbstream.Block, reorgJunctionBlock BlockRef) error { +func (f *cursorResolver) sendUndoBlocks(undoBlocks []*pbbstream.Block, reorgJunctionBlock *pbbstream.BlockMeta) error { for _, blk := range undoBlocks { block := blk obj := &wrappedObject{ @@ -188,7 +188,7 @@ func (f *cursorResolver) seenIrreversible(id string) *BlockWithObj { return nil } -func (f *cursorResolver) resolve(ctx context.Context) (undoBlocks []*pbbstream.Block, reorgJunctionBlock BlockRef, err error) { +func (f *cursorResolver) resolve(ctx context.Context) (undoBlocks []*pbbstream.Block, reorgJunctionBlock *pbbstream.BlockMeta, err error) { block := f.cursor.Block lib := f.cursor.LIB step := f.cursor.Step @@ -200,7 +200,7 @@ func (f *cursorResolver) resolve(ctx context.Context) (undoBlocks []*pbbstream.B for { if blkObj := f.seenIrreversible(previousID); blkObj != nil { - reorgJunctionBlock = blkObj.Block.AsRef() + reorgJunctionBlock = blkObj.Block.ToBlocKMeta() break } diff --git a/cursor_resolver_test.go b/cursor_resolver_test.go index 7b913e5..5ec4f30 100644 --- a/cursor_resolver_test.go +++ b/cursor_resolver_test.go @@ -287,7 +287,7 @@ func TestCursorResolver(t *testing.T) { var received []resp handler := HandlerFunc(func(blk *pbbstream.Block, obj any) error { var reorgTarget string - if rt := obj.(Stepable).ReorgJunctionBlock(); rt != nil { + if rt := obj.(Stepable).ReorgJunctionBlockRef(); rt != nil { reorgTarget = rt.String() } received = append(received, resp{ @@ -592,7 +592,7 @@ func TestCursorResolverWithHoles(t *testing.T) { assert.Equal(t, blk.AsRef().String(), expected[i].blk.String()) assert.Equal(t, obj.(Stepable).Step().String(), expected[i].step.String()) var seenReorgTarget string - if rt := obj.(Stepable).ReorgJunctionBlock(); rt != nil { + if rt := obj.(Stepable).ReorgJunctionBlockRef(); rt != nil { seenReorgTarget = rt.String() } var expectedReorgTarget string diff --git a/forkable/forkable.go b/forkable/forkable.go index c4f7b47..343b13a 100644 --- a/forkable/forkable.go +++ b/forkable/forkable.go @@ -334,7 +334,7 @@ func (p *Forkable) blocksFromCursor(cursor *bstream.Cursor) ([]*bstream.Preproce reorgJunctionBlock := p.forkDB.BlockForID(blockID) preprocessedUndos := make([]*bstream.PreprocessedBlock, len(undos)) for i := range undos { - preprocessedUndos[i] = wrapBlockForkableObject(undos[i], bstream.StepUndo, head, cursor.LIB, reorgJunctionBlock.AsRef()) + preprocessedUndos[i] = wrapBlockForkableObject(undos[i], bstream.StepUndo, head, cursor.LIB, reorgJunctionBlock.Object.(*ForkableBlock).Block.ToBlocKMeta()) } newCursor := &bstream.Cursor{ @@ -437,7 +437,7 @@ func (p *Forkable) blocksThroughCursor(startBlock uint64, cursor *bstream.Cursor return out, nil } -func wrapBlockForkableObject(blk *ForkableBlock, step bstream.StepType, head bstream.BlockRef, lib bstream.BlockRef, reorgJunctionBlock bstream.BlockRef) *bstream.PreprocessedBlock { +func wrapBlockForkableObject(blk *ForkableBlock, step bstream.StepType, head bstream.BlockRef, lib bstream.BlockRef, reorgJunctionBlock *pbbstream.BlockMeta) *bstream.PreprocessedBlock { if blk.Block.PartialIndex != 0 && step == bstream.StepNew { if blk.Block.LastPartial { step = bstream.StepNewPartial @@ -466,7 +466,7 @@ type ForkableObject struct { StepCount int // Total number of steps in multi-block steps. StepIndex int // Index for the current block StepBlocks []*bstream.PreprocessedBlock // You can decide to process them when StepCount == StepIndex +1 or when StepIndex == 0 only. - reorgJunctionBlock bstream.BlockRef + reorgJunctionBlock *pbbstream.BlockMeta parentBlock bstream.BlockRef headBlock bstream.BlockRef @@ -494,7 +494,17 @@ func (fobj *ForkableObject) FinalBlockHeight() uint64 { return fobj.lastLIBSent.Num() } -func (fobj *ForkableObject) ReorgJunctionBlock() bstream.BlockRef { +func (fobj *ForkableObject) ReorgJunctionBlockRef() bstream.BlockRef { + if !fobj.step.Matches(bstream.StepUndo | bstream.StepUndoPartial) { + return nil + } + if fobj.reorgJunctionBlock == nil { + return nil + } + return bstream.NewBlockRef(fobj.reorgJunctionBlock.Id, fobj.reorgJunctionBlock.Number) +} + +func (fobj *ForkableObject) ReorgJunctionBlock() *pbbstream.BlockMeta { if !fobj.step.Matches(bstream.StepUndo | bstream.StepUndoPartial) { return nil } @@ -664,7 +674,7 @@ func (p *Forkable) ProcessBlock(blk *pbbstream.Block, obj any) error { ppBlk := &ForkableBlock{Block: blk, Obj: obj} - var reorgJunctionBlock bstream.BlockRef + var reorgJunctionBlock *pbbstream.BlockMeta var undos, partialUndos, redos []*ForkableBlock if p.matchFilter(bstream.StepUndo) { if triggersNewLongestChain && p.lastBlockSent != nil { @@ -835,7 +845,7 @@ func ids(blocks []*ForkableBlock) (ids []string) { return } -func (p *Forkable) sentChainSwitchSegments(currentHeadBlockID string, newHeadBlock Partialer, newHeadsPreviousID string) (undos []*ForkableBlock, partialUndos []*ForkableBlock, redos []*ForkableBlock, junctionBlock bstream.BlockRef) { +func (p *Forkable) sentChainSwitchSegments(currentHeadBlockID string, newHeadBlock Partialer, newHeadsPreviousID string) (undos []*ForkableBlock, partialUndos []*ForkableBlock, redos []*ForkableBlock, junctionBlock *pbbstream.BlockMeta) { if currentHeadBlockID == newHeadsPreviousID { return } @@ -844,7 +854,7 @@ func (p *Forkable) sentChainSwitchSegments(currentHeadBlockID string, newHeadBlo if undoIDs != nil || partialUndoIDs != nil { if junction := p.forkDB.BlockForID(junctionBlockID); junction != nil { - junctionBlock = junction.AsRef() + junctionBlock = junction.Object.(*ForkableBlock).Block.ToBlocKMeta() } } @@ -871,7 +881,7 @@ func (p *Forkable) sentChainSegment(ids []string, doingRedos bool) (ppBlocks []* return } -func (p *Forkable) processCompleteBlocks(currentBlock *pbbstream.Block, blocks []*ForkableBlock, step bstream.StepType, reorgJunctionBlock bstream.BlockRef) error { +func (p *Forkable) processCompleteBlocks(currentBlock *pbbstream.Block, blocks []*ForkableBlock, step bstream.StepType, reorgJunctionBlock *pbbstream.BlockMeta) error { var objs []*bstream.PreprocessedBlock for _, block := range blocks { diff --git a/forkable/forkable_test.go b/forkable/forkable_test.go index 23737d5..986ec46 100644 --- a/forkable/forkable_test.go +++ b/forkable/forkable_test.go @@ -211,7 +211,7 @@ func TestForkable_ProcessBlock(t *testing.T) { headBlock: tinyBlk("00000004b"), // cause of this block: tinyBlk("00000003a"), lastLIBSent: tinyBlk("00000001a"), - reorgJunctionBlock: tinyBlk("00000002a"), + reorgJunctionBlock: tinyBlkMeta("00000002a"), StepBlocks: []*bstream.PreprocessedBlock{ {bTestBlock("00000003a", "00000002a"), "00000003a"}, }, @@ -238,7 +238,7 @@ func TestForkable_ProcessBlock(t *testing.T) { parentBlock: tinyBlk("00000003b"), StepCount: 2, StepIndex: 0, - reorgJunctionBlock: tinyBlk("00000002a"), + reorgJunctionBlock: tinyBlkMeta("00000002a"), headBlock: tinyBlk("00000005a"), block: tinyBlk("00000004b"), lastLIBSent: tinyBlk("00000001a"), @@ -253,7 +253,7 @@ func TestForkable_ProcessBlock(t *testing.T) { parentBlock: tinyBlk("00000002a"), StepCount: 2, StepIndex: 1, - reorgJunctionBlock: tinyBlk("00000002a"), + reorgJunctionBlock: tinyBlkMeta("00000002a"), headBlock: tinyBlk("00000005a"), block: tinyBlk("00000003b"), lastLIBSent: tinyBlk("00000001a"), @@ -530,7 +530,7 @@ func TestForkable_ProcessBlock(t *testing.T) { headBlock: tinyBlk("00000003a"), block: tinyBlk("00000002b"), lastLIBSent: tinyBlk("00000001a"), - reorgJunctionBlock: tinyBlk("00000001a"), + reorgJunctionBlock: tinyBlkMeta("00000001a"), StepCount: 1, StepIndex: 0, StepBlocks: []*bstream.PreprocessedBlock{ @@ -591,7 +591,7 @@ func TestForkable_ProcessBlock(t *testing.T) { headBlock: tinyBlk("00000003a"), block: tinyBlk("00000002b"), lastLIBSent: tinyBlk("00000001a"), - reorgJunctionBlock: tinyBlk("00000001a"), + reorgJunctionBlock: tinyBlkMeta("00000001a"), StepCount: 1, StepIndex: 0, StepBlocks: []*bstream.PreprocessedBlock{ @@ -978,7 +978,7 @@ func TestForkable_ProcessBlock(t *testing.T) { headBlock: tinyBlk("00000005a"), block: tinyBlk("00000004b"), lastLIBSent: tinyBlk("00000001a"), - reorgJunctionBlock: tinyBlk("00000003a"), + reorgJunctionBlock: tinyBlkMeta("00000003a"), StepCount: 1, StepIndex: 0, StepBlocks: []*bstream.PreprocessedBlock{ @@ -1039,7 +1039,7 @@ func TestForkable_ProcessBlock(t *testing.T) { headBlock: tinyBlk("00000005a"), block: tinyBlk("00000003b"), lastLIBSent: tinyBlk("00000001a"), - reorgJunctionBlock: tinyBlk("00000002a"), + reorgJunctionBlock: tinyBlkMeta("00000002a"), StepCount: 1, StepIndex: 0, StepBlocks: []*bstream.PreprocessedBlock{ @@ -1115,7 +1115,8 @@ func TestForkable_ProcessBlock(t *testing.T) { } else if p.results[i].reorgJunctionBlock == nil { assert.Nil(t, c.expectedResult[i].reorgJunctionBlock) } else { - assert.Equal(t, c.expectedResult[i].reorgJunctionBlock.String(), p.results[i].reorgJunctionBlock.String()) + assert.Equal(t, c.expectedResult[i].reorgJunctionBlock.Id, p.results[i].reorgJunctionBlock.Id) + assert.Equal(t, c.expectedResult[i].reorgJunctionBlock.Number, p.results[i].reorgJunctionBlock.Number) } if c.expectedResult[i].parentBlock != nil { assert.Equal(t, c.expectedResult[i].parentBlock.String(), p.results[i].parentBlock.String()) diff --git a/forkable/init_test.go b/forkable/init_test.go index 740266a..85909b8 100644 --- a/forkable/init_test.go +++ b/forkable/init_test.go @@ -45,6 +45,10 @@ func tinyBlk(id string) bstream.BlockRef { return bstream.TestBlock(id, "").AsRef() } +func tinyBlkMeta(id string) *pbbstream.BlockMeta { + return bstream.TestBlock(id, "").ToBlocKMeta() +} + func bTestBlock(id, previousID string) *pbbstream.Block { return bstream.TestBlock(id, previousID) } diff --git a/hub/subscription_test.go b/hub/subscription_test.go index 6021ebb..28f6784 100644 --- a/hub/subscription_test.go +++ b/hub/subscription_test.go @@ -108,7 +108,11 @@ func (t *testStepable) FinalBlockHeight() uint64 { return 0 } -func (t *testStepable) ReorgJunctionBlock() bstream.BlockRef { +func (t *testStepable) ReorgJunctionBlockRef() bstream.BlockRef { + return nil +} + +func (t *testStepable) ReorgJunctionBlock() *pbbstream.BlockMeta { return nil } diff --git a/interfaces.go b/interfaces.go index c4a9c2a..ec20e28 100644 --- a/interfaces.go +++ b/interfaces.go @@ -57,7 +57,8 @@ type Cursorable interface { type Stepable interface { Step() StepType FinalBlockHeight() uint64 - ReorgJunctionBlock() BlockRef + ReorgJunctionBlockRef() BlockRef + ReorgJunctionBlock() *pbbstream.BlockMeta } type Liveable interface { diff --git a/preprocess.go b/preprocess.go index 37edd26..01d6707 100644 --- a/preprocess.go +++ b/preprocess.go @@ -61,7 +61,7 @@ func (p *Preprocessor) ProcessBlock(blk *pbbstream.Block, obj any) (err error) { type preprocessedForkableObject struct { cursor *Cursor step StepType - reorgJunctionBlock BlockRef + reorgJunctionBlock *pbbstream.BlockMeta obj any } @@ -77,7 +77,14 @@ func (fobj *preprocessedForkableObject) Cursor() *Cursor { return fobj.cursor } -func (fobj *preprocessedForkableObject) ReorgJunctionBlock() BlockRef { +func (fobj *preprocessedForkableObject) ReorgJunctionBlockRef() BlockRef { + if fobj.reorgJunctionBlock == nil { + return nil + } + return NewBlockRef(fobj.reorgJunctionBlock.Id, fobj.reorgJunctionBlock.Number) +} + +func (fobj *preprocessedForkableObject) ReorgJunctionBlock() *pbbstream.BlockMeta { return fobj.reorgJunctionBlock } diff --git a/types.go b/types.go index c62e30f..6778167 100644 --- a/types.go +++ b/types.go @@ -128,7 +128,7 @@ type BlockWithObj struct { type wrappedObject struct { obj any cursor *Cursor - reorgJunctionBlock BlockRef + reorgJunctionBlock *pbbstream.BlockMeta } func (w *wrappedObject) FinalBlockHeight() uint64 { @@ -138,7 +138,14 @@ func (w *wrappedObject) FinalBlockHeight() uint64 { return w.cursor.LIB.Num() } -func (w *wrappedObject) ReorgJunctionBlock() BlockRef { +func (w *wrappedObject) ReorgJunctionBlockRef() BlockRef { + if w.reorgJunctionBlock == nil { + return nil + } + return NewBlockRef(w.reorgJunctionBlock.Id, w.reorgJunctionBlock.Number) +} + +func (w *wrappedObject) ReorgJunctionBlock() *pbbstream.BlockMeta { return w.reorgJunctionBlock }