From 8d38520e23348a1c2a5d9a0286dd3b3c59366020 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Fri, 8 May 2026 13:48:14 -0700 Subject: [PATCH] Revert "Defer commitlog compression when under load (#4974)" This reverts commit ca1b45f6c2174cba912c5622940362691778e8a1. --- crates/core/src/db/relational_db.rs | 313 ++++------------------------ 1 file changed, 43 insertions(+), 270 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 7287accc4ee..d8cd4884bcc 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -57,11 +57,9 @@ use spacetimedb_table::page_pool::PagePool; use spacetimedb_table::table::{RowRef, TableScanIter}; use spacetimedb_table::table_index::IndexKey; use std::borrow::Cow; -use std::collections::VecDeque; use std::io; use std::ops::RangeBounds; use std::sync::Arc; -use std::time::{Duration, Instant}; use tokio::sync::watch; pub use super::persistence::{DiskSizeFn, Durability, Persistence}; @@ -1637,11 +1635,6 @@ fn apply_history( } pub type LocalDurability = Arc>; - -const COMMITLOG_COMPRESSION_IDLE_WINDOW: Duration = Duration::from_millis(500); -const COMMITLOG_COMPRESSION_IDLE_POLL_INTERVAL: Duration = Duration::from_millis(100); -const COMMITLOG_COMPRESSION_FORCE_SEGMENT_BACKLOG: usize = 8; - /// Initialize local durability with the default parameters. /// /// Also returned is a [`DiskSizeFn`] as required by [`RelationalDB::open`]. @@ -1689,281 +1682,61 @@ pub async fn local_history(replica_dir: &ReplicaDir) -> io::Result::default(), None)).await } -async fn commitlog_segments_to_compress( - durability: LocalDurability, - prev_snapshot_offset: TxOffset, - snapshot_offset: TxOffset, -) -> io::Result> { - // Return segment start offsets in `[prev_snapshot_offset, snapshot_offset)`. - // If either offset falls inside a segment, round down to the containing - // segment. The segment containing `snapshot_offset` must stay uncompressed, - // because it can contain transactions newer than the snapshot. - asyncify(move || { - let segment_offsets = durability.existing_segment_offsets()?; - let start_idx = segment_offsets - .binary_search(&prev_snapshot_offset) - // if the snapshot is in the middle of a segment, we want to round down. - // [0, 2].binary_search(1) will return Err(1), so we subtract 1. - .unwrap_or_else(|i| i.saturating_sub(1)); - let segment_offsets = &segment_offsets[start_idx..]; - let end_idx = segment_offsets - .binary_search(&snapshot_offset) - .unwrap_or_else(|i| i.saturating_sub(1)); - // in this case, segment_offsets[end_idx] is the segment that contains the snapshot, - // which we don't want to compress, so an exclusive range is correct. - Ok(segment_offsets[..end_idx].to_vec()) - }) - .await -} - -#[derive(Default)] -struct CommitlogCompressionState { - // Latest snapshot offset whose older segments have all been processed. - compressed_snapshot_offset: TxOffset, - // Newest snapshot offset represented by `pending_segments`. Once the queue is - // drained, this is promoted into `compressed_snapshot_offset`. - pending_snapshot_offset: Option, - // Segment start offsets waiting for compression, processed oldest first. - pending_segments: VecDeque, - // Time at which write load first appeared idle during the current idle window. - idle_since: Option, -} - -impl CommitlogCompressionState { - async fn enqueue_snapshot(&mut self, durability: LocalDurability, snapshot_offset: TxOffset) -> io::Result<()> { - // Coalesce snapshot events while compression is behind. If work is already - // pending, only enqueue the segment offsets between the previous pending - // snapshot and the new one. - let prev_snapshot_offset = self.pending_snapshot_offset.unwrap_or(self.compressed_snapshot_offset); - self.pending_segments - .extend(commitlog_segments_to_compress(durability, prev_snapshot_offset, snapshot_offset).await?); - self.pending_snapshot_offset = Some(snapshot_offset); - Ok(()) - } - - fn mark_caught_up(&mut self) { - // Only advance the checkpoint after every segment for the pending snapshot - // has been attempted successfully. - if self.pending_segments.is_empty() - && let Some(snapshot_offset) = self.pending_snapshot_offset.take() - { - self.compressed_snapshot_offset = snapshot_offset; - } - } - - fn has_pending_segments(&self) -> bool { - !self.pending_segments.is_empty() - } - - fn pending_segment_count(&self) -> usize { - self.pending_segments.len() - } - - fn reset_idle(&mut self) { - self.idle_since = None; - } - - fn idle_window_elapsed(&mut self) -> bool { - // The first idle poll starts the timer; later idle polls measure against - // that same instant until new writes are queued or compression work is done. - let now = Instant::now(); - self.idle_since.get_or_insert(now).elapsed() >= COMMITLOG_COMPRESSION_IDLE_WINDOW - } - - async fn compress_next_segment( - &mut self, - durability: LocalDurability, - clog_tx: &mut Option>, - ) -> bool { - // Return `false` on compression failure so the caller can back off before - // retrying the same segment. - let Some(segment_offset) = self.pending_segments.front().copied() else { - return true; - }; - - if let Err(err) = asyncify(move || durability.compress_segments(&[segment_offset])).await { - tracing::warn!("failed to compress commitlog segment {segment_offset}: {err}"); - return false; - } - - self.pending_segments.pop_front(); - self.mark_caught_up(); - - if let Some(clog_tx) = clog_tx - && let Err(err) = clog_tx.try_send(segment_offset) - { - tracing::warn!("failed to send offset {segment_offset} after compression: {err}"); - } - - true - } - - async fn compress_segments_while_idle( - &mut self, - durability: LocalDurability, - clog_tx: &mut Option>, - ) -> bool { - while self.has_pending_segments() { - if durability.queue_depth() != 0 { - return true; - } - if !self.compress_next_segment(durability.clone(), clog_tx).await { - return false; - } - tokio::task::yield_now().await; - } - - true - } -} - -async fn handle_commitlog_snapshot_event( - state: &mut CommitlogCompressionState, - durability: LocalDurability, - snap_tx: &mut Option>, - snapshot_offset: TxOffset, -) { - // Keep the test hooks in the same place as the state transition so callers - // can't forget to reset the idle window after new work is enqueued. - if let Some(snap_tx) = snap_tx - && let Err(err) = snap_tx.try_send(snapshot_offset) - { - tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}"); - } - - if let Err(err) = state.enqueue_snapshot(durability, snapshot_offset).await { - tracing::warn!("failed to get commitlog segments to compress: {err}"); - } - state.reset_idle(); -} - -/// Watches snapshot creation events and compresses commitlog segments older -/// than the snapshot once write load appears idle. +/// Watches snapshot creation events and compresses all commitlog segments older +/// than the snapshot. /// /// Suitable **only** for non-replicated databases. -/// -/// Commitlog compression state machine: -/// -/// ```text -/// startup -/// | -/// | enqueue segments in [0, latest snapshot) -/// v -/// +------------------+ no work / queue drained +------------------+ -/// | pending segments | ----------------------------------> | no pending work | -/// | in memory | | wait for snapshot| -/// +--------+---------+ <---------------------------------- +---------+--------+ -/// | snapshot created -/// | enqueue older segments -/// v -/// backlog >= threshold? -/// / \ -/// yes no -----------------------+ -/// | | -/// v v -/// +------------------------+ +------------------------+ -/// | compress one segment | | wait until durability | -/// | immediately | | queue is empty for | -/// +------------+-----------+ | IDLE_WINDOW | -/// | +-----------+------------+ -/// | | -/// | v -/// | +------------------------+ -/// | | compress next only | -/// | | while queue is empty | -/// | +-----------+------------+ -/// | | -/// +---------------------------+ -/// | -/// v -/// re-check pending segments -/// -/// snapshot observed while pending: -/// enqueue new older segments and reset the idle timer -/// ``` -/// -/// Compression is treated as write-load idle maintenance unless the uncompressed -/// segment backlog grows large enough to force bounded progress under load. -/// Startup is handled as an initial catch-up pass because pending compression -/// work is not persisted across restarts; recompressing an already-compressed -/// segment is a no-op in the commitlog storage layer. pub async fn snapshot_watching_commitlog_compressor( mut snapshot_rx: watch::Receiver, mut clog_tx: Option>, mut snap_tx: Option>, durability: LocalDurability, ) { - let initial_snapshot_offset = *snapshot_rx.borrow_and_update(); - let mut state = CommitlogCompressionState::default(); - - // `snapshot_rx` starts at the latest snapshot already on disk. Treat that as - // an initial catch-up target rather than a completed compression checkpoint, - // because pending compression work is not persisted across restarts. - if initial_snapshot_offset > 0 - && let Err(err) = state - .enqueue_snapshot(durability.clone(), initial_snapshot_offset) - .await - { - tracing::warn!("failed to get initial commitlog segments to compress: {err}"); - } + let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update(); + while snapshot_rx.changed().await.is_ok() { + let snapshot_offset = *snapshot_rx.borrow_and_update(); + let durability = durability.clone(); - loop { - state.mark_caught_up(); + if let Some(snap_tx) = &mut snap_tx + && let Err(err) = snap_tx.try_send(snapshot_offset) + { + tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}"); + } + + let res: io::Result<_> = asyncify(move || { + let segment_offsets = durability.existing_segment_offsets()?; + let start_idx = segment_offsets + .binary_search(&prev_snapshot_offset) + // if the snapshot is in the middle of a segment, we want to round down. + // [0, 2].binary_search(1) will return Err(1), so we subtract 1. + .unwrap_or_else(|i| i.saturating_sub(1)); + let segment_offsets = &segment_offsets[start_idx..]; + let end_idx = segment_offsets + .binary_search(&snapshot_offset) + .unwrap_or_else(|i| i.saturating_sub(1)); + // in this case, segment_offsets[end_idx] is the segment that contains the snapshot, + // which we don't want to compress, so an exclusive range is correct. + let segment_offsets = &segment_offsets[..end_idx]; + durability.compress_segments(segment_offsets)?; + let n = segment_offsets.len(); + let last_compressed_segment = if n > 0 { Some(segment_offsets[n - 1]) } else { None }; + Ok(last_compressed_segment) + }) + .await; - if !state.has_pending_segments() { - // With no backlog, block until a new snapshot creates more work. - if snapshot_rx.changed().await.is_err() { - break; + let last_compressed_segment = match res { + Ok(opt_offset) => opt_offset, + Err(err) => { + tracing::warn!("failed to compress segments: {err}"); + continue; } - let snapshot_offset = *snapshot_rx.borrow_and_update(); - handle_commitlog_snapshot_event(&mut state, durability.clone(), &mut snap_tx, snapshot_offset).await; - continue; - } - - if state.pending_segment_count() >= COMMITLOG_COMPRESSION_FORCE_SEGMENT_BACKLOG { - // Under sustained write load we still need bounded progress so old - // uncompressed segments do not accumulate forever. - tracing::debug!( - pending_segments = state.pending_segment_count(), - "forcing commitlog compression; segment backlog exceeded threshold" - ); - if !state.compress_next_segment(durability.clone(), &mut clog_tx).await { - tokio::time::sleep(COMMITLOG_COMPRESSION_IDLE_WINDOW).await; - } - state.reset_idle(); - tokio::task::yield_now().await; - continue; - } + }; + prev_snapshot_offset = snapshot_offset; - tokio::select! { - res = snapshot_rx.changed() => { - // New snapshots extend the pending range and restart the idle window. - if res.is_err() { - break; - } - let snapshot_offset = *snapshot_rx.borrow_and_update(); - handle_commitlog_snapshot_event( - &mut state, - durability.clone(), - &mut snap_tx, - snapshot_offset, - ) - .await; - } - _ = tokio::time::sleep(COMMITLOG_COMPRESSION_IDLE_POLL_INTERVAL) => { - if durability.queue_depth() == 0 { - // Once no writes have been queued for long enough, drain as - // many segments as possible, but stop if write load resumes. - if state.idle_window_elapsed() { - if !state.compress_segments_while_idle(durability.clone(), &mut clog_tx).await { - tokio::time::sleep(COMMITLOG_COMPRESSION_IDLE_WINDOW).await; - } - state.reset_idle(); - } - } else { - state.reset_idle(); - } - } + if let Some((clog_tx, last_compressed_segment)) = clog_tx.as_mut().zip(last_compressed_segment) + && let Err(err) = clog_tx.try_send(last_compressed_segment) + { + tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}"); } } }