diff --git a/crates/host-rpc/src/notifier.rs b/crates/host-rpc/src/notifier.rs index 31ca439..a439ba4 100644 --- a/crates/host-rpc/src/notifier.rs +++ b/crates/host-rpc/src/notifier.rs @@ -11,7 +11,7 @@ use alloy::{ use futures_util::{StreamExt, TryStreamExt, stream}; use signet_node_types::{HostNotification, HostNotificationKind, HostNotifier, RevertRange}; use signet_types::primitives::{RecoveredBlock, SealedBlock, SignetHeaderV1, TransactionSigned}; -use std::{collections::VecDeque, sync::Arc, time::Instant}; +use std::{cmp::Ordering, collections::VecDeque, sync::Arc, time::Instant}; use tracing::{debug, info, warn}; /// Default seconds per slot (Ethereum mainnet). @@ -506,7 +506,29 @@ where /// Process a backfill batch if pending. /// /// Backfills by number up to `(latest - buffer_capacity / 2)` to leave - /// half the buffer depth for hash-based frontfill of recent blocks. + /// half the buffer depth for hash-based frontfill of recent blocks. If + /// `backfill_from` is already inside that headroom, the ceiling is + /// raised to `tip` so the next frontfill segment's parent is in the DB. + /// + /// # Return values + /// + /// - `None` — backfill is complete (no pending cursor, or the cursor + /// has reached `tip + 1`). `backfill_from` is cleared and the caller + /// should advance to frontfill via the `newHeads` subscription. + /// - `Some(Ok(notification))` — a batch was fetched successfully; the + /// notification carries the chain segment for `[from, to]`. + /// `backfill_from` is advanced to `to + 1`, or cleared if the batch + /// reached the effective ceiling. + /// - `Some(Err(_))` — an error occurred. Notable cases: + /// - `BackfillContinuityBreak` if the cursor has drifted past + /// `tip + 1` (provider tip regressed, e.g. reorg or lagging + /// replica behind a load balancer) — silently switching to + /// frontfill from this state would emit a segment whose parent + /// isn't in the DB. + /// - `BackfillContinuityBreak` if the first fetched block's parent + /// hash doesn't match the last emitted block. + /// - Any provider/RPC error from `get_block_number` or + /// `fetch_range`. #[tracing::instrument( level = "info", skip_all, @@ -534,14 +556,34 @@ where } }; - let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64 / 2); - if from > backfill_ceiling { - self.backfill_from = None; - info!("backfill complete, switching to frontfill"); - return None; + // `from == tip + 1` is the normal completion state (last batch + // ended at `to == tip`). `from > tip + 1` means the provider's + // tip regressed below our cursor (reorg, lagging replica) — + // silently switching to frontfill from there would emit a + // segment whose parent isn't in the DB. + match from.cmp(&tip.saturating_add(1)) { + Ordering::Equal => { + self.backfill_from = None; + info!("backfill complete, switching to frontfill"); + return None; + } + Ordering::Greater => { + warn!(from, tip, "backfill cursor ahead of tip; provider tip regressed"); + return Some(Err(RpcHostError::BackfillContinuityBreak)); + } + Ordering::Less => {} } - let to = backfill_ceiling.min(from + self.backfill_batch_size - 1); + // Normally cap each batch to `(tip - buffer_capacity / 2)` so the + // hash-walk frontfill has headroom on new tips. But if `from` is + // already inside that headroom (e.g., after restart with persisted + // head close to tip), we must still ingest up to `tip` — otherwise + // the processor receives a one-block frontfill segment whose parent + // is not yet in the DB. + let backfill_ceiling = tip.saturating_sub(self.buffer_capacity as u64 / 2); + let effective_ceiling = if from > backfill_ceiling { tip } else { backfill_ceiling }; + + let to = effective_ceiling.min(from + self.backfill_batch_size - 1); span.record("to", to); let blocks = match self.fetch_range(from, to).await { @@ -577,7 +619,7 @@ where self.last_emitted = Some((n, h)); } - let backfill_done = to >= backfill_ceiling; + let backfill_done = to >= effective_ceiling; if backfill_done { self.backfill_from = None; } else {