Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changes

- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [#3262](https://github.com/evstack/ev-node/pull/3262)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix grammatical issue in the changelog entry.

The phrase "DA blocks too far ahead" is missing a verb.

📝 Proposed fix
-- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [`#3262`](https://github.com/evstack/ev-node/pull/3262)
+- Add automatic DA retriever walkback when P2P stalls and DA blocks are too far ahead [`#3262`](https://github.com/evstack/ev-node/pull/3262)

Alternative phrasing:

-- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [`#3262`](https://github.com/evstack/ev-node/pull/3262)
+- Add automatic DA retriever walkback when P2P stalls and DA blocks get too far ahead [`#3262`](https://github.com/evstack/ev-node/pull/3262)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- Add automatic DA retriever walkback when P2P stalls and DA blocks too far ahead [#3262](https://github.com/evstack/ev-node/pull/3262)
- Add automatic DA retriever walkback when P2P stalls and DA blocks are too far ahead [`#3262`](https://github.com/evstack/ev-node/pull/3262)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@CHANGELOG.md` at line 14, Fix the grammatical error in the changelog entry
that reads "Add automatic DA retriever walkback when P2P stalls and DA blocks
too far ahead" by inserting the missing verb—e.g., change it to "Add automatic
DA retriever walkback when P2P stalls and DA blocks are too far ahead" (or
another equivalent phrasing) so the sentence is grammatically correct; update
the entry text exactly where that line appears.

- Add max bytes contraints in simple solo sequnecer [#3312](https://github.com/evstack/ev-node/pull/3312)
- Add support for otlp in execution/grpc. [#3300](https://github.com/evstack/ev-node/pull/3300)
- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286)
Expand Down
13 changes: 7 additions & 6 deletions block/internal/da/async_block_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rs/zerolog"
"google.golang.org/protobuf/proto"

"github.com/evstack/ev-node/block/internal/common"
datypes "github.com/evstack/ev-node/pkg/da/types"
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
)
Expand Down Expand Up @@ -186,31 +187,31 @@ func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.Subscr

// HandleCatchup fetches a single height via Retrieve and caches it.
// Also applies the prefetch window for speculative forward fetching.
func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) error {
func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
if err := ctx.Err(); err != nil {
return err
return nil, err
}

if _, err := f.cache.Get(ctx, newBlockDataKey(daHeight)); err != nil {
if err := f.fetchAndCacheBlock(ctx, daHeight); err != nil {
return err
return nil, err
}
}
// Speculatively prefetch ahead.
target := daHeight + f.prefetchWindow
for h := daHeight + 1; h <= target; h++ {
if err := ctx.Err(); err != nil {
return err
return nil, err
}
if _, err := f.cache.Get(ctx, newBlockDataKey(h)); err == nil {
continue // Already cached.
}
if err := f.fetchAndCacheBlock(ctx, h); err != nil {
return err
return nil, err
}
}

return nil
return nil, nil
}

// fetchAndCacheBlock fetches a block via Retrieve and caches it.
Expand Down
48 changes: 42 additions & 6 deletions block/internal/da/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/common"
datypes "github.com/evstack/ev-node/pkg/da/types"
)

Expand All @@ -23,9 +24,13 @@ type SubscriberHandler interface {
HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error

// HandleCatchup is called for each height during sequential catchup.
// The subscriber advances localDAHeight only after this returns (true, nil).
// The subscriber advances localDAHeight only after this returns nil.
// Returning an error rolls back localDAHeight and triggers a backoff retry.
HandleCatchup(ctx context.Context, height uint64) error
// The returned events are the DAHeightEvents produced for this height
// (may be nil/empty). The subscriber does not interpret them; they are
// returned so that higher-level callers (via the Subscriber) can inspect
// the results without coupling to the handler's internals.
HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error)
}

// SubscriberConfig holds configuration for creating a Subscriber.
Expand All @@ -39,6 +44,14 @@ type SubscriberConfig struct {
FetchBlockTimestamp bool // the timestamp comes with an extra api call before Celestia v0.29.1-mocha.

StartHeight uint64 // initial localDAHeight

// WalkbackChecker is an optional callback invoked after each successful
// HandleCatchup call. It receives the DA height just processed and the
// events returned by the handler. If it returns a non-zero DA height,
// the subscriber rewinds to that height so it re-fetches on the next
// iteration. Return 0 to continue normally.
// This is nil for subscribers that don't need walkback (e.g. async block retriever).
WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64
}

// Subscriber is a shared DA subscription primitive that encapsulates the
Expand All @@ -48,9 +61,10 @@ type SubscriberConfig struct {
//
// Used by both DAFollower (syncing) and asyncBlockRetriever (forced inclusion).
type Subscriber struct {
client Client
logger zerolog.Logger
handler SubscriberHandler
client Client
logger zerolog.Logger
handler SubscriberHandler
walkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64

// namespaces to subscribe on. When multiple, they are merged.
namespaces [][]byte
Expand Down Expand Up @@ -91,6 +105,7 @@ func NewSubscriber(cfg SubscriberConfig) *Subscriber {
client: cfg.Client,
logger: cfg.Logger,
handler: cfg.Handler,
walkbackChecker: cfg.WalkbackChecker,
namespaces: cfg.Namespaces,
catchupSignal: make(chan struct{}, 1),
daBlockTime: cfg.DABlockTime,
Expand Down Expand Up @@ -159,6 +174,23 @@ func (s *Subscriber) HasReachedHead() bool {
return s.headReached.Load()
}

// RewindTo sets localDAHeight back to the given height and signals the catchup
// loop so that DA heights are re-fetched. This is used when the primary source
// (P2P) stalls and DA needs to take over for the missing range.
func (s *Subscriber) RewindTo(daHeight uint64) {
for {
cur := s.localDAHeight.Load()
if daHeight >= cur {
return
}
if s.localDAHeight.CompareAndSwap(cur, daHeight) {
s.headReached.Store(false)
s.signalCatchup()
return
}
}
}

// signalCatchup sends a non-blocking signal to wake catchupLoop.
func (s *Subscriber) signalCatchup() {
select {
Expand Down Expand Up @@ -356,7 +388,7 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
continue
}

if err := s.handler.HandleCatchup(ctx, local); err != nil {
if events, err := s.handler.HandleCatchup(ctx, local); err != nil {
// Roll back so we can retry after backoff.
s.localDAHeight.Store(local)
if errors.Is(err, datypes.ErrHeightFromFuture) && local >= highest {
Expand All @@ -366,6 +398,10 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
if !s.shouldContinueCatchup(ctx, err, local) {
return
}
} else if s.walkbackChecker != nil {
if rewindTo := s.walkbackChecker(local, events); rewindTo > 0 {
s.RewindTo(rewindTo)
}
}
}
}
Expand Down
51 changes: 45 additions & 6 deletions block/internal/da/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/evstack/ev-node/block/internal/common"
datypes "github.com/evstack/ev-node/pkg/da/types"
testmocks "github.com/evstack/ev-node/test/mocks"
)
Expand All @@ -24,9 +25,9 @@ func (m *MockSubscriberHandler) HandleEvent(ctx context.Context, ev datypes.Subs
return args.Error(0)
}

func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) error {
func (m *MockSubscriberHandler) HandleCatchup(ctx context.Context, height uint64) ([]common.DAHeightEvent, error) {
args := m.Called(ctx, height)
return args.Error(0)
return args.Get(0).([]common.DAHeightEvent), args.Error(1)
}

func TestSubscriber_RunCatchup(t *testing.T) {
Expand All @@ -49,8 +50,8 @@ func TestSubscriber_RunCatchup(t *testing.T) {
// It should process observed heights [100..101] then stop when local passes highestSeen.
sub.updateHighest(101)
sub.seenSubscriptionEvent.Store(true)
mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return(nil).Once()
mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return(nil).Once()
mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).Return([]common.DAHeightEvent(nil), nil).Once()
mockHandler.On("HandleCatchup", mock.Anything, uint64(101)).Return([]common.DAHeightEvent(nil), nil).Once()

sub.runCatchup(ctx)

Expand Down Expand Up @@ -84,13 +85,13 @@ func TestSubscriber_RunCatchup(t *testing.T) {
Run(func(args mock.Arguments) {
callCount++
}).
Return(errors.New("network failure")).Once()
Return([]common.DAHeightEvent(nil), errors.New("network failure")).Once()

mockHandler.On("HandleCatchup", mock.Anything, uint64(100)).
Run(func(args mock.Arguments) {
callCount++
}).
Return(nil).Once()
Return([]common.DAHeightEvent(nil), nil).Once()

sub.runCatchup(ctx)

Expand All @@ -101,6 +102,44 @@ func TestSubscriber_RunCatchup(t *testing.T) {
})
}

func TestSubscriber_RewindTo(t *testing.T) {
t.Run("no_op_when_target_is_equal_or_higher", func(t *testing.T) {
sub := NewSubscriber(SubscriberConfig{
Client: testmocks.NewMockClient(t),
Logger: zerolog.Nop(),
Handler: new(MockSubscriberHandler),
Namespaces: [][]byte{[]byte("ns")},
StartHeight: 100,
DABlockTime: time.Millisecond,
})
sub.localDAHeight.Store(100)

sub.RewindTo(100)
assert.Equal(t, uint64(100), sub.LocalDAHeight())

sub.RewindTo(200)
assert.Equal(t, uint64(100), sub.LocalDAHeight())
})

t.Run("rewinds_local_height_and_clears_head", func(t *testing.T) {
sub := NewSubscriber(SubscriberConfig{
Client: testmocks.NewMockClient(t),
Logger: zerolog.Nop(),
Handler: new(MockSubscriberHandler),
Namespaces: [][]byte{[]byte("ns")},
StartHeight: 100,
DABlockTime: time.Millisecond,
})
sub.localDAHeight.Store(150)
sub.headReached.Store(true)

sub.RewindTo(120)

assert.Equal(t, uint64(120), sub.LocalDAHeight())
assert.False(t, sub.HasReachedHead())
})
}

func TestSubscriber_RunSubscription_InlineDoesNotPrematurelyReachHead(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
Expand Down
53 changes: 33 additions & 20 deletions block/internal/syncing/da_follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type DAFollower interface {
HasReachedHead() bool
// QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints).
QueuePriorityHeight(daHeight uint64)
// RewindTo moves the subscriber back to a lower DA height so it
// re-fetches from there. Used by the syncer for walkback when a gap
// is detected between the node height and the DA events.
RewindTo(daHeight uint64)
}

// daFollower is the concrete implementation of DAFollower.
Expand All @@ -48,6 +52,9 @@ type DAFollowerConfig struct {
DataNamespace []byte // may be nil or equal to Namespace
StartDAHeight uint64
DABlockTime time.Duration
// WalkbackChecker is forwarded to the underlying Subscriber.
// See da.SubscriberConfig.WalkbackChecker for details.
WalkbackChecker func(daHeight uint64, events []common.DAHeightEvent) uint64
}

// NewDAFollower creates a new daFollower.
Expand All @@ -65,12 +72,13 @@ func NewDAFollower(cfg DAFollowerConfig) DAFollower {
}

f.subscriber = da.NewSubscriber(da.SubscriberConfig{
Client: cfg.Client,
Logger: cfg.Logger,
Namespaces: [][]byte{cfg.Namespace, dataNs},
DABlockTime: cfg.DABlockTime,
Handler: f,
StartHeight: cfg.StartDAHeight,
Client: cfg.Client,
Logger: cfg.Logger,
Namespaces: [][]byte{cfg.Namespace, dataNs},
DABlockTime: cfg.DABlockTime,
Handler: f,
StartHeight: cfg.StartDAHeight,
WalkbackChecker: cfg.WalkbackChecker,
})

return f
Expand All @@ -91,6 +99,11 @@ func (f *daFollower) HasReachedHead() bool {
return f.subscriber.HasReachedHead()
}

// RewindTo moves the subscriber back to a lower DA height for re-fetching.
func (f *daFollower) RewindTo(daHeight uint64) {
f.subscriber.RewindTo(daHeight)
}

// HandleEvent processes a subscription event. When the follower is
// caught up (ev.Height == localDAHeight) and blobs are available, it processes
// them inline — avoiding a DA re-fetch round trip. Otherwise, it just lets
Expand Down Expand Up @@ -123,7 +136,7 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve

// HandleCatchup retrieves events at a single DA height and pipes them
// to the event sink. Checks priority heights first.
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
// 1. Drain stale or future priority heights from P2P hints
for priorityHeight := f.popPriorityHeight(); priorityHeight != 0; priorityHeight = f.popPriorityHeight() {
if priorityHeight < daHeight {
Expand All @@ -134,46 +147,46 @@ func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
Uint64("da_height", priorityHeight).
Msg("fetching priority DA height from P2P hint")

if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if _, err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
if errors.Is(err, datypes.ErrHeightFromFuture) {
// Priority hint points to a future height — silently ignore.
f.logger.Debug().Uint64("priority_da_height", priorityHeight).
Msg("priority hint is from future, ignoring")
continue
}
// Roll back so daHeight is attempted again next cycle after backoff.
return err
return nil, err
}
break // continue with daHeight
break
}

// 2. Normal sequential fetch
if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil {
return err
events, err := f.fetchAndPipeHeight(ctx, daHeight)
if err != nil {
return nil, err
}
return nil

return events, nil
}

// fetchAndPipeHeight retrieves events at a single DA height and pipes them.
// It does NOT handle ErrHeightFromFuture — callers must decide how to react
// because the correct response depends on whether this is a normal sequential
// catchup or a priority-hint fetch.
func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) error {
func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
events, err := f.retriever.RetrieveFromDA(ctx, daHeight)
if err != nil {
if errors.Is(err, datypes.ErrBlobNotFound) {
return nil
return nil, nil
}
return err
return nil, err
}

for _, event := range events {
if err := f.eventSink.PipeEvent(ctx, event); err != nil {
return err
return nil, err
}
}

return nil
return events, nil
}

// QueuePriorityHeight queues a DA height for priority retrieval.
Expand Down
Loading
Loading