Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions crates/host-rpc/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use alloy::{
use futures_util::{StreamExt, TryStreamExt, stream};
use signet_node_types::{HostNotification, HostNotificationKind, HostNotifier, RevertRange};
use signet_types::primitives::{RecoveredBlock, SealedBlock, SignetHeaderV1, TransactionSigned};
use std::{collections::VecDeque, sync::Arc, time::Instant};
use std::{cmp::Ordering, collections::VecDeque, sync::Arc, time::Instant};
use tracing::{debug, info, warn};

/// Default seconds per slot (Ethereum mainnet).
Expand Down Expand Up @@ -506,7 +506,29 @@ where
/// Process a backfill batch if pending.
///
/// Backfills by number up to `(latest - buffer_capacity / 2)` to leave
/// half the buffer depth for hash-based frontfill of recent blocks.
/// half the buffer depth for hash-based frontfill of recent blocks. If
/// `backfill_from` is already inside that headroom, the ceiling is
/// raised to `tip` so the next frontfill segment's parent is in the DB.
///
/// # Return values
///
/// - `None` — backfill is complete (no pending cursor, or the cursor
/// has reached `tip + 1`). `backfill_from` is cleared and the caller
/// should advance to frontfill via the `newHeads` subscription.
/// - `Some(Ok(notification))` — a batch was fetched successfully; the
/// notification carries the chain segment for `[from, to]`.
/// `backfill_from` is advanced to `to + 1`, or cleared if the batch
/// reached the effective ceiling.
/// - `Some(Err(_))` — an error occurred. Notable cases:
/// - `BackfillContinuityBreak` if the cursor has drifted past
/// `tip + 1` (provider tip regressed, e.g. reorg or lagging
/// replica behind a load balancer) — silently switching to
/// frontfill from this state would emit a segment whose parent
/// isn't in the DB.
/// - `BackfillContinuityBreak` if the first fetched block's parent
/// hash doesn't match the last emitted block.
/// - Any provider/RPC error from `get_block_number` or
/// `fetch_range`.
#[tracing::instrument(
level = "info",
skip_all,
Expand Down Expand Up @@ -534,14 +556,34 @@ where
}
};

let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64 / 2);
if from > backfill_ceiling {
self.backfill_from = None;
info!("backfill complete, switching to frontfill");
return None;
// `from == tip + 1` is the normal completion state (last batch
// ended at `to == tip`). `from > tip + 1` means the provider's
// tip regressed below our cursor (reorg, lagging replica) —
// silently switching to frontfill from there would emit a
// segment whose parent isn't in the DB.
match from.cmp(&tip.saturating_add(1)) {
Ordering::Equal => {
self.backfill_from = None;
info!("backfill complete, switching to frontfill");
return None;
}
Ordering::Greater => {
warn!(from, tip, "backfill cursor ahead of tip; provider tip regressed");
return Some(Err(RpcHostError::BackfillContinuityBreak));
}
Ordering::Less => {}
}

let to = backfill_ceiling.min(from + self.backfill_batch_size - 1);
// Normally cap each batch to `(tip - buffer_capacity / 2)` so the
// hash-walk frontfill has headroom on new tips. But if `from` is
// already inside that headroom (e.g., after restart with persisted
// head close to tip), we must still ingest up to `tip` — otherwise
// the processor receives a one-block frontfill segment whose parent
// is not yet in the DB.
let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64 / 2);
let effective_ceiling = if from > backfill_ceiling { tip } else { backfill_ceiling };

let to = effective_ceiling.min(from + self.backfill_batch_size - 1);
span.record("to", to);

let blocks = match self.fetch_range(from, to).await {
Expand Down Expand Up @@ -577,7 +619,7 @@ where
self.last_emitted = Some((n, h));
}

let backfill_done = to >= backfill_ceiling;
let backfill_done = to >= effective_ceiling;
if backfill_done {
self.backfill_from = None;
} else {
Expand Down