Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `BlockTimestampGate`: new gate that lets blocks through once a block's timestamp meets or exceeds a given `time.Time`, supporting both inclusive and exclusive gate types.
- `Stepable` interface: added `ReorgJunctionBlock() *pbbstream.BlockMeta` returning full block metadata (id, number, parentId, parentNum, libNum, timestamp) for the reorg junction point.

### Changed

- `Stepable` interface: renamed `ReorgJunctionBlock() BlockRef` to `ReorgJunctionBlockRef() BlockRef`.

## 2026-01-02

Expand Down
10 changes: 5 additions & 5 deletions cursor_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ func (f *cursorResolver) ProcessBlock(blk *pbbstream.Block, obj any) error {
if err := f.sendUndoBlocks(undoBlocks, reorgJunctionBlock); err != nil {
return err
}
if err := f.sendMergedBlocksBetween(StepIrreversible, f.cursor.LIB.Num(), reorgJunctionBlock.Num()); err != nil {
if err := f.sendMergedBlocksBetween(StepIrreversible, f.cursor.LIB.Num(), reorgJunctionBlock.Number); err != nil {
return err
}
if err := f.sendMergedBlocksBetween(StepNewIrreversible, reorgJunctionBlock.Num(), blk.Number); err != nil {
if err := f.sendMergedBlocksBetween(StepNewIrreversible, reorgJunctionBlock.Number, blk.Number); err != nil {
return err
}

Expand All @@ -120,7 +120,7 @@ func (f *cursorResolver) ProcessBlock(blk *pbbstream.Block, obj any) error {

}

func (f *cursorResolver) sendUndoBlocks(undoBlocks []*pbbstream.Block, reorgJunctionBlock BlockRef) error {
func (f *cursorResolver) sendUndoBlocks(undoBlocks []*pbbstream.Block, reorgJunctionBlock *pbbstream.BlockMeta) error {
for _, blk := range undoBlocks {
block := blk
obj := &wrappedObject{
Expand Down Expand Up @@ -188,7 +188,7 @@ func (f *cursorResolver) seenIrreversible(id string) *BlockWithObj {
return nil
}

func (f *cursorResolver) resolve(ctx context.Context) (undoBlocks []*pbbstream.Block, reorgJunctionBlock BlockRef, err error) {
func (f *cursorResolver) resolve(ctx context.Context) (undoBlocks []*pbbstream.Block, reorgJunctionBlock *pbbstream.BlockMeta, err error) {
block := f.cursor.Block
lib := f.cursor.LIB
step := f.cursor.Step
Expand All @@ -200,7 +200,7 @@ func (f *cursorResolver) resolve(ctx context.Context) (undoBlocks []*pbbstream.B

for {
if blkObj := f.seenIrreversible(previousID); blkObj != nil {
reorgJunctionBlock = blkObj.Block.AsRef()
reorgJunctionBlock = blkObj.Block.ToBlocKMeta()
break
}

Expand Down
4 changes: 2 additions & 2 deletions cursor_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestCursorResolver(t *testing.T) {
var received []resp
handler := HandlerFunc(func(blk *pbbstream.Block, obj any) error {
var reorgTarget string
if rt := obj.(Stepable).ReorgJunctionBlock(); rt != nil {
if rt := obj.(Stepable).ReorgJunctionBlockRef(); rt != nil {
reorgTarget = rt.String()
}
received = append(received, resp{
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestCursorResolverWithHoles(t *testing.T) {
assert.Equal(t, blk.AsRef().String(), expected[i].blk.String())
assert.Equal(t, obj.(Stepable).Step().String(), expected[i].step.String())
var seenReorgTarget string
if rt := obj.(Stepable).ReorgJunctionBlock(); rt != nil {
if rt := obj.(Stepable).ReorgJunctionBlockRef(); rt != nil {
seenReorgTarget = rt.String()
}
var expectedReorgTarget string
Expand Down
26 changes: 18 additions & 8 deletions forkable/forkable.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (p *Forkable) blocksFromCursor(cursor *bstream.Cursor) ([]*bstream.Preproce
reorgJunctionBlock := p.forkDB.BlockForID(blockID)
preprocessedUndos := make([]*bstream.PreprocessedBlock, len(undos))
for i := range undos {
preprocessedUndos[i] = wrapBlockForkableObject(undos[i], bstream.StepUndo, head, cursor.LIB, reorgJunctionBlock.AsRef())
preprocessedUndos[i] = wrapBlockForkableObject(undos[i], bstream.StepUndo, head, cursor.LIB, reorgJunctionBlock.Object.(*ForkableBlock).Block.ToBlocKMeta())
}

newCursor := &bstream.Cursor{
Expand Down Expand Up @@ -437,7 +437,7 @@ func (p *Forkable) blocksThroughCursor(startBlock uint64, cursor *bstream.Cursor
return out, nil
}

func wrapBlockForkableObject(blk *ForkableBlock, step bstream.StepType, head bstream.BlockRef, lib bstream.BlockRef, reorgJunctionBlock bstream.BlockRef) *bstream.PreprocessedBlock {
func wrapBlockForkableObject(blk *ForkableBlock, step bstream.StepType, head bstream.BlockRef, lib bstream.BlockRef, reorgJunctionBlock *pbbstream.BlockMeta) *bstream.PreprocessedBlock {
if blk.Block.PartialIndex != 0 && step == bstream.StepNew {
if blk.Block.LastPartial {
step = bstream.StepNewPartial
Expand Down Expand Up @@ -466,7 +466,7 @@ type ForkableObject struct {
StepCount int // Total number of steps in multi-block steps.
StepIndex int // Index for the current block
StepBlocks []*bstream.PreprocessedBlock // You can decide to process them when StepCount == StepIndex +1 or when StepIndex == 0 only.
reorgJunctionBlock bstream.BlockRef
reorgJunctionBlock *pbbstream.BlockMeta

parentBlock bstream.BlockRef
headBlock bstream.BlockRef
Expand Down Expand Up @@ -494,7 +494,17 @@ func (fobj *ForkableObject) FinalBlockHeight() uint64 {
return fobj.lastLIBSent.Num()
}

func (fobj *ForkableObject) ReorgJunctionBlock() bstream.BlockRef {
func (fobj *ForkableObject) ReorgJunctionBlockRef() bstream.BlockRef {
if !fobj.step.Matches(bstream.StepUndo | bstream.StepUndoPartial) {
return nil
}
if fobj.reorgJunctionBlock == nil {
return nil
}
return bstream.NewBlockRef(fobj.reorgJunctionBlock.Id, fobj.reorgJunctionBlock.Number)
}

func (fobj *ForkableObject) ReorgJunctionBlock() *pbbstream.BlockMeta {
if !fobj.step.Matches(bstream.StepUndo | bstream.StepUndoPartial) {
return nil
}
Expand Down Expand Up @@ -664,7 +674,7 @@ func (p *Forkable) ProcessBlock(blk *pbbstream.Block, obj any) error {

ppBlk := &ForkableBlock{Block: blk, Obj: obj}

var reorgJunctionBlock bstream.BlockRef
var reorgJunctionBlock *pbbstream.BlockMeta
var undos, partialUndos, redos []*ForkableBlock
if p.matchFilter(bstream.StepUndo) {
if triggersNewLongestChain && p.lastBlockSent != nil {
Expand Down Expand Up @@ -835,7 +845,7 @@ func ids(blocks []*ForkableBlock) (ids []string) {
return
}

func (p *Forkable) sentChainSwitchSegments(currentHeadBlockID string, newHeadBlock Partialer, newHeadsPreviousID string) (undos []*ForkableBlock, partialUndos []*ForkableBlock, redos []*ForkableBlock, junctionBlock bstream.BlockRef) {
func (p *Forkable) sentChainSwitchSegments(currentHeadBlockID string, newHeadBlock Partialer, newHeadsPreviousID string) (undos []*ForkableBlock, partialUndos []*ForkableBlock, redos []*ForkableBlock, junctionBlock *pbbstream.BlockMeta) {
if currentHeadBlockID == newHeadsPreviousID {
return
}
Expand All @@ -844,7 +854,7 @@ func (p *Forkable) sentChainSwitchSegments(currentHeadBlockID string, newHeadBlo

if undoIDs != nil || partialUndoIDs != nil {
if junction := p.forkDB.BlockForID(junctionBlockID); junction != nil {
junctionBlock = junction.AsRef()
junctionBlock = junction.Object.(*ForkableBlock).Block.ToBlocKMeta()
}
}

Expand All @@ -871,7 +881,7 @@ func (p *Forkable) sentChainSegment(ids []string, doingRedos bool) (ppBlocks []*
return
}

func (p *Forkable) processCompleteBlocks(currentBlock *pbbstream.Block, blocks []*ForkableBlock, step bstream.StepType, reorgJunctionBlock bstream.BlockRef) error {
func (p *Forkable) processCompleteBlocks(currentBlock *pbbstream.Block, blocks []*ForkableBlock, step bstream.StepType, reorgJunctionBlock *pbbstream.BlockMeta) error {
var objs []*bstream.PreprocessedBlock

for _, block := range blocks {
Expand Down
17 changes: 9 additions & 8 deletions forkable/forkable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestForkable_ProcessBlock(t *testing.T) {
headBlock: tinyBlk("00000004b"), // cause of this
block: tinyBlk("00000003a"),
lastLIBSent: tinyBlk("00000001a"),
reorgJunctionBlock: tinyBlk("00000002a"),
reorgJunctionBlock: tinyBlkMeta("00000002a"),
StepBlocks: []*bstream.PreprocessedBlock{
{bTestBlock("00000003a", "00000002a"), "00000003a"},
},
Expand All @@ -238,7 +238,7 @@ func TestForkable_ProcessBlock(t *testing.T) {
parentBlock: tinyBlk("00000003b"),
StepCount: 2,
StepIndex: 0,
reorgJunctionBlock: tinyBlk("00000002a"),
reorgJunctionBlock: tinyBlkMeta("00000002a"),
headBlock: tinyBlk("00000005a"),
block: tinyBlk("00000004b"),
lastLIBSent: tinyBlk("00000001a"),
Expand All @@ -253,7 +253,7 @@ func TestForkable_ProcessBlock(t *testing.T) {
parentBlock: tinyBlk("00000002a"),
StepCount: 2,
StepIndex: 1,
reorgJunctionBlock: tinyBlk("00000002a"),
reorgJunctionBlock: tinyBlkMeta("00000002a"),
headBlock: tinyBlk("00000005a"),
block: tinyBlk("00000003b"),
lastLIBSent: tinyBlk("00000001a"),
Expand Down Expand Up @@ -530,7 +530,7 @@ func TestForkable_ProcessBlock(t *testing.T) {
headBlock: tinyBlk("00000003a"),
block: tinyBlk("00000002b"),
lastLIBSent: tinyBlk("00000001a"),
reorgJunctionBlock: tinyBlk("00000001a"),
reorgJunctionBlock: tinyBlkMeta("00000001a"),
StepCount: 1,
StepIndex: 0,
StepBlocks: []*bstream.PreprocessedBlock{
Expand Down Expand Up @@ -591,7 +591,7 @@ func TestForkable_ProcessBlock(t *testing.T) {
headBlock: tinyBlk("00000003a"),
block: tinyBlk("00000002b"),
lastLIBSent: tinyBlk("00000001a"),
reorgJunctionBlock: tinyBlk("00000001a"),
reorgJunctionBlock: tinyBlkMeta("00000001a"),
StepCount: 1,
StepIndex: 0,
StepBlocks: []*bstream.PreprocessedBlock{
Expand Down Expand Up @@ -978,7 +978,7 @@ func TestForkable_ProcessBlock(t *testing.T) {
headBlock: tinyBlk("00000005a"),
block: tinyBlk("00000004b"),
lastLIBSent: tinyBlk("00000001a"),
reorgJunctionBlock: tinyBlk("00000003a"),
reorgJunctionBlock: tinyBlkMeta("00000003a"),
StepCount: 1,
StepIndex: 0,
StepBlocks: []*bstream.PreprocessedBlock{
Expand Down Expand Up @@ -1039,7 +1039,7 @@ func TestForkable_ProcessBlock(t *testing.T) {
headBlock: tinyBlk("00000005a"),
block: tinyBlk("00000003b"),
lastLIBSent: tinyBlk("00000001a"),
reorgJunctionBlock: tinyBlk("00000002a"),
reorgJunctionBlock: tinyBlkMeta("00000002a"),
StepCount: 1,
StepIndex: 0,
StepBlocks: []*bstream.PreprocessedBlock{
Expand Down Expand Up @@ -1115,7 +1115,8 @@ func TestForkable_ProcessBlock(t *testing.T) {
} else if p.results[i].reorgJunctionBlock == nil {
assert.Nil(t, c.expectedResult[i].reorgJunctionBlock)
} else {
assert.Equal(t, c.expectedResult[i].reorgJunctionBlock.String(), p.results[i].reorgJunctionBlock.String())
assert.Equal(t, c.expectedResult[i].reorgJunctionBlock.Id, p.results[i].reorgJunctionBlock.Id)
assert.Equal(t, c.expectedResult[i].reorgJunctionBlock.Number, p.results[i].reorgJunctionBlock.Number)
}
if c.expectedResult[i].parentBlock != nil {
assert.Equal(t, c.expectedResult[i].parentBlock.String(), p.results[i].parentBlock.String())
Expand Down
4 changes: 4 additions & 0 deletions forkable/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func tinyBlk(id string) bstream.BlockRef {
return bstream.TestBlock(id, "").AsRef()
}

func tinyBlkMeta(id string) *pbbstream.BlockMeta {
return bstream.TestBlock(id, "").ToBlocKMeta()
}

func bTestBlock(id, previousID string) *pbbstream.Block {
return bstream.TestBlock(id, previousID)
}
Expand Down
6 changes: 5 additions & 1 deletion hub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ func (t *testStepable) FinalBlockHeight() uint64 {
return 0
}

func (t *testStepable) ReorgJunctionBlock() bstream.BlockRef {
func (t *testStepable) ReorgJunctionBlockRef() bstream.BlockRef {
return nil
}

func (t *testStepable) ReorgJunctionBlock() *pbbstream.BlockMeta {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type Cursorable interface {
type Stepable interface {
Step() StepType
FinalBlockHeight() uint64
ReorgJunctionBlock() BlockRef
ReorgJunctionBlockRef() BlockRef
ReorgJunctionBlock() *pbbstream.BlockMeta
}

type Liveable interface {
Expand Down
11 changes: 9 additions & 2 deletions preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *Preprocessor) ProcessBlock(blk *pbbstream.Block, obj any) (err error) {
type preprocessedForkableObject struct {
cursor *Cursor
step StepType
reorgJunctionBlock BlockRef
reorgJunctionBlock *pbbstream.BlockMeta
obj any
}

Expand All @@ -77,7 +77,14 @@ func (fobj *preprocessedForkableObject) Cursor() *Cursor {
return fobj.cursor
}

func (fobj *preprocessedForkableObject) ReorgJunctionBlock() BlockRef {
func (fobj *preprocessedForkableObject) ReorgJunctionBlockRef() BlockRef {
if fobj.reorgJunctionBlock == nil {
return nil
}
return NewBlockRef(fobj.reorgJunctionBlock.Id, fobj.reorgJunctionBlock.Number)
}

func (fobj *preprocessedForkableObject) ReorgJunctionBlock() *pbbstream.BlockMeta {
return fobj.reorgJunctionBlock
}

Expand Down
11 changes: 9 additions & 2 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type BlockWithObj struct {
type wrappedObject struct {
obj any
cursor *Cursor
reorgJunctionBlock BlockRef
reorgJunctionBlock *pbbstream.BlockMeta
}

func (w *wrappedObject) FinalBlockHeight() uint64 {
Expand All @@ -138,7 +138,14 @@ func (w *wrappedObject) FinalBlockHeight() uint64 {
return w.cursor.LIB.Num()
}

func (w *wrappedObject) ReorgJunctionBlock() BlockRef {
func (w *wrappedObject) ReorgJunctionBlockRef() BlockRef {
if w.reorgJunctionBlock == nil {
return nil
}
return NewBlockRef(w.reorgJunctionBlock.Id, w.reorgJunctionBlock.Number)
}

func (w *wrappedObject) ReorgJunctionBlock() *pbbstream.BlockMeta {
return w.reorgJunctionBlock
}

Expand Down