From 9f401b532e986d84c8ad2c1c52990c0a39335c18 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 8 May 2026 09:30:06 +0200 Subject: [PATCH] commitlog: Don't lock while compressing Commitlog compression is coordinated externally, and it is safe to compress the same segment concurrently. Therefore, we can release the lock before starting the actual compression work, thereby avoiding to interfere with writes. Also, just ignore the currently active segment instead of panicking. --- crates/commitlog/src/lib.rs | 39 ++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 3922f002a84..76b5c8d668a 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -330,16 +330,37 @@ impl Commitlog { } /// Compress the segments at the offsets provided, marking them as immutable. + /// + /// `offsets` must contain the exact segment offsets, no rounding to the + /// nearest offset is performed. If a segment is not found on disk, an error + /// is returned and no further segments from the list are processed. + /// + /// The latest, writable segment will not be compressed. If `offsets` + /// contains its offset, an error is returned. + /// + /// This method acquires a read lock on this `Commitlog` instance, but + /// releases it once the compression work starts. Concurrent compression + /// tasks on the same segment are safe, but external coordination is + /// required to avoid duplicate work. + /// + /// Attempting to compress a segment that is already compressed incurs a + /// small overhead to open the file and determining its format, but + /// otherwise does nothing. pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<()> { - // even though `compress_segment` takes &self, we take an - // exclusive lock to avoid any weirdness happening. - #[allow(clippy::readonly_write_lock)] - let inner = self.inner.write().unwrap(); - assert!(!offsets.contains(&inner.head.min_tx_offset())); - // TODO: parallelize, maybe - offsets - .iter() - .try_for_each(|&offset| inner.repo.compress_segment(offset)) + let (repo, head_offset) = { + let inner = self.inner.read().unwrap(); + let repo = inner.repo.clone(); + let head_offset = inner.head.min_tx_offset(); + + (repo, head_offset) + }; + if offsets.contains(&head_offset) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("refusing to compress mutable segment {head_offset}"), + )); + } + offsets.iter().try_for_each(|&offset| repo.compress_segment(offset)) } /// Remove all data from the log and reopen it.