Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions forester/src/compressible/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use solana_sdk::{
signature::{Keypair, Signature},
signer::Signer,
};
use tracing::{debug, warn};
use tracing::debug;

use super::{state::CompressibleAccountTracker, types::CompressibleAccountState};
use crate::Result;
Expand Down Expand Up @@ -196,16 +196,15 @@ impl<R: Rpc> Compressor<R> {
accounts.len()
);

// Collect pubkeys for sync before creating instruction
let pubkeys: Vec<_> = account_states.iter().map(|state| state.pubkey).collect();

let ix = Instruction {
program_id: registry_program_id,
accounts,
data: instruction.data(),
};

// Send transaction
// Note: Account removal from tracker is handled by LogSubscriber which parses
// the "compress_and_close:<pubkey>" logs emitted by the registry program
Comment on lines +206 to +207

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Race condition: Accounts can be selected for compression multiple times.

By delegating removal to LogSubscriber and removing the immediate sync, there's a window between transaction confirmation and log processing where get_ready_to_compress can return the same accounts again:

  1. compress_batch sends transaction for account A
  2. Transaction confirms, account A closed on-chain
  3. (race window) Another batch selection occurs
  4. get_ready_to_compress still returns account A (tracker not yet updated)
  5. Second compress_batch attempts to compress already-closed account A
  6. LogSubscriber finally processes logs and updates tracker

Impact: Duplicate compression attempts will fail on-chain, wasting compute units and transaction fees. Under high concurrency, this could be frequent.

Solution: Add a "pending compression" state to the tracker:

pub async fn compress_batch(
    &self,
    account_states: &[CompressibleAccountState],
    registered_forester_pda: Pubkey,
) -> Result<Signature> {
    // Mark accounts as pending before sending transaction
    let pubkeys: Vec<_> = account_states.iter().map(|s| s.pubkey).collect();
    self.tracker.mark_pending_compression(&pubkeys);
    
    // ... build and send transaction ...
    
    // On error, unmark as pending
    let signature = match rpc.create_and_send_transaction(...).await {
        Ok(sig) => sig,
        Err(e) => {
            self.tracker.unmark_pending_compression(&pubkeys);
            return Err(e.into());
        }
    };
    
    Ok(signature)
}

Then modify get_ready_to_compress to exclude pending accounts.

let signature = rpc
.create_and_send_transaction(
&[ix],
Expand All @@ -215,11 +214,6 @@ impl<R: Rpc> Compressor<R> {
.await
.map_err(|e| anyhow::anyhow!("Failed to send transaction: {}", e))?;

// Sync accounts to verify they're closed
if let Err(e) = self.tracker.sync_accounts(&*rpc, &pubkeys).await {
warn!("Failed to sync accounts after compression: {:?}. Tracker will update via subscriptions.", e);
}

Ok(signature)
}
}
2 changes: 1 addition & 1 deletion forester/src/compressible/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ pub use bootstrap::bootstrap_compressible_accounts;
pub use compressor::Compressor;
pub use config::CompressibleConfig;
pub use state::CompressibleAccountTracker;
pub use subscriber::AccountSubscriber;
pub use subscriber::{AccountSubscriber, LogSubscriber};
pub use types::CompressibleAccountState;
80 changes: 0 additions & 80 deletions forester/src/compressible/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,86 +156,6 @@ impl CompressibleAccountTracker {

Ok(())
}

/// Query accounts and update tracker: remove non-existent accounts, update lamports for existing ones
pub async fn sync_accounts<R: light_client::rpc::Rpc>(
&self,
rpc: &R,
pubkeys: &[Pubkey],
) -> Result<()> {
// Query all accounts at once using get_multiple_accounts
let accounts = rpc.get_multiple_accounts(pubkeys).await?;

for (pubkey, account_opt) in pubkeys.iter().zip(accounts.iter()) {
match account_opt {
Some(account) => {
// Check if account is closed (lamports == 0)
if account.lamports == 0 {
self.remove(pubkey);
debug!("Removed closed account {} (lamports == 0)", pubkey);
continue;
}

// Re-deserialize account data to verify it's still valid
let ctoken = match CToken::try_from_slice(&account.data) {
Ok(ct) => ct,
Err(e) => {
self.remove(pubkey);
debug!(
"Removed account {} (deserialization failed: {:?})",
pubkey, e
);
continue;
}
};

// Verify Compressible extension still exists
let has_compressible_ext = ctoken.extensions.as_ref().is_some_and(|exts| {
exts.iter()
.any(|ext| matches!(ext, ExtensionStruct::Compressible(_)))
});

if !has_compressible_ext {
self.remove(pubkey);
debug!(
"Removed account {} (missing Compressible extension)",
pubkey
);
continue;
}

// Account is valid - update state
if let Some(mut state) = self.accounts.get_mut(pubkey) {
match calculate_compressible_slot(&ctoken, account.lamports) {
Ok(compressible_slot) => {
state.account = ctoken;
state.lamports = account.lamports;
state.compressible_slot = compressible_slot;
debug!(
"Updated account {}: lamports={}, compressible_slot={}",
pubkey, account.lamports, compressible_slot
);
}
Err(e) => {
warn!(
"Failed to calculate compressible slot for account {}: {}. Removing from tracker.",
pubkey, e
);
drop(state);
self.remove(pubkey);
}
}
}
}
None => {
// Account doesn't exist - remove from tracker
self.remove(pubkey);
debug!("Removed non-existent account {}", pubkey);
}
}
}
Ok(())
}
}

impl Default for CompressibleAccountTracker {
Expand Down
144 changes: 137 additions & 7 deletions forester/src/compressible/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,38 @@ use light_ctoken_interface::{COMPRESSIBLE_TOKEN_ACCOUNT_SIZE, CTOKEN_PROGRAM_ID}
use solana_account_decoder::UiAccountEncoding;
use solana_client::{
nonblocking::pubsub_client::PubsubClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{Response as RpcResponse, RpcKeyedAccount},
rpc_config::{
RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse},
};
use solana_rpc_client_api::filter::RpcFilterType;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use tokio::sync::oneshot;
use tokio::sync::broadcast;
use tracing::{debug, error, info};

use super::state::CompressibleAccountTracker;
use crate::Result;

/// Registry program ID for subscribing to compress_and_close logs
const REGISTRY_PROGRAM_ID_STR: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX";

/// Log prefix emitted by registry program when closing accounts
const COMPRESS_AND_CLOSE_LOG_PREFIX: &str = "compress_and_close:";

/// Subscribes to account changes for all compressible CToken accounts
pub struct AccountSubscriber {
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: oneshot::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
}

impl AccountSubscriber {
pub fn new(
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: oneshot::Receiver<()>,
shutdown_rx: broadcast::Receiver<()>,
) -> Self {
Self {
ws_url,
Expand All @@ -45,7 +54,6 @@ impl AccountSubscriber {
.map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?;

let program_id = Pubkey::new_from_array(CTOKEN_PROGRAM_ID);

// Subscribe to compressed token program accounts with filter for compressible account size
let (mut subscription, unsubscribe) = pubsub_client
.program_subscribe(
Expand Down Expand Up @@ -87,7 +95,7 @@ impl AccountSubscriber {
}
}
}
_ = &mut self.shutdown_rx => {
_ = self.shutdown_rx.recv() => {
info!("Shutdown signal received");
unsubscribe().await;
break;
Expand Down Expand Up @@ -154,3 +162,125 @@ impl AccountSubscriber {
}
}
}

/// Subscribes to registry program logs to detect compress_and_close operations
/// and remove closed accounts from the tracker by parsing log messages directly
pub struct LogSubscriber {
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: broadcast::Receiver<()>,
}

impl LogSubscriber {
pub fn new(
ws_url: String,
tracker: Arc<CompressibleAccountTracker>,
shutdown_rx: broadcast::Receiver<()>,
) -> Self {
Self {
ws_url,
tracker,
shutdown_rx,
}
}

pub async fn run(&mut self) -> Result<()> {
info!("Starting log subscriber at {}", self.ws_url);

// Connect to WebSocket
let pubsub_client = PubsubClient::new(&self.ws_url)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?;

let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID_STR)
.map_err(|e| anyhow::anyhow!("Invalid registry program ID: {}", e))?;

// Subscribe to logs mentioning the registry program
let filter = RpcTransactionLogsFilter::Mentions(vec![registry_program_id.to_string()]);
let config = RpcTransactionLogsConfig {
commitment: Some(CommitmentConfig::confirmed()),
};

let (mut subscription, unsubscribe) = pubsub_client
.logs_subscribe(filter, config)
.await
.map_err(|e| anyhow::anyhow!("Failed to subscribe to logs: {}", e))?;

info!(
"Log subscription established for registry program {}",
registry_program_id
);

// Process subscription messages
loop {
tokio::select! {
result = subscription.next() => {
match result {
Some(response) => {
self.handle_log_notification(response);
}
None => {
error!("Log subscription stream closed unexpectedly");
unsubscribe().await;
return Err(anyhow::anyhow!("Log subscription stream closed"));
}
}
}
_ = self.shutdown_rx.recv() => {
info!("Shutdown signal received for log subscriber");
unsubscribe().await;
break;
}
}
}

info!("Log subscriber stopped");
Ok(())
}

fn handle_log_notification(&self, response: RpcResponse<RpcLogsResponse>) {
let logs_response = response.value;

// Skip failed transactions
if logs_response.err.is_some() {
debug!("Skipping failed transaction {}", logs_response.signature);
return;
}

// Parse logs looking for compress_and_close entries
let mut removed_count = 0;
for log in &logs_response.logs {
// Look for our log prefix: "Program log: compress_and_close:<pubkey>"
// The actual log format is "Program log: compress_and_close:<pubkey>"
if let Some(pubkey_str) = log
.strip_prefix("Program log: ")
.and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
{
Comment on lines +255 to +258

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Log parsing uses nested strip_prefix which may be fragile to Solana log format changes.

The log parsing at lines 255-258 assumes the exact format "Program log: compress_and_close:<pubkey>". If Solana changes its logging format (e.g., adds timestamps, changes whitespace, or modifies the "Program log: " prefix), this parsing will silently fail.

Consider adding a test to verify the log format and document the expected format in a comment:

         for log in &logs_response.logs {
-            // Look for our log prefix: "Program log: compress_and_close:<pubkey>"
-            // The actual log format is "Program log: compress_and_close:<pubkey>"
+            // Expected Solana log format: "Program log: compress_and_close:<pubkey>"
+            // where "Program log: " is the standard Solana program log prefix
+            // and "compress_and_close:" is our custom event identifier
             if let Some(pubkey_str) = log
                 .strip_prefix("Program log: ")
                 .and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))

Additionally, consider writing an integration test that verifies the actual log format matches expectations.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In forester/src/compressible/subscriber.rs around lines 255 to 258, the code
currently uses nested strip_prefix calls to match the exact "Program log:
compress_and_close:<pubkey>" string which is fragile to minor Solana log format
changes; change the parsing to be more tolerant by first searching for the
"compress_and_close:" token (e.g., find or split_once on "compress_and_close:"),
trim surrounding whitespace, and then validate/extract the pubkey (and
optionally reject unexpected suffixes); add a unit test covering several
realistic log variants (with/without "Program log:", extra whitespace,
timestamps, etc.) and add a short comment documenting the accepted log patterns
and why the more tolerant parsing is used.

match Pubkey::from_str(pubkey_str) {
Ok(pubkey) => {
if self.tracker.remove(&pubkey).is_some() {
debug!(
"Removed closed account {} from tracker (compress_and_close log)",
pubkey
);
removed_count += 1;
}
}
Comment on lines +260 to +268

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Account removal without verification may remove incorrect accounts in edge cases.

The code removes accounts from the tracker immediately upon parsing a pubkey from logs (line 261), without verifying:

  1. The account actually exists in the tracker
  2. The account belongs to the expected program
  3. The transaction that emitted the log actually succeeded in closing the account

While line 245 checks logs_response.err.is_some(), Solana transactions can succeed at the top level but have inner CPI failures. If the compress_and_close CPI fails but the outer transaction succeeds, the log may still be emitted.

Consider adding validation:

                 match Pubkey::from_str(pubkey_str) {
                     Ok(pubkey) => {
-                        if self.tracker.remove(&pubkey).is_some() {
+                        // Only log removal if the account actually existed in the tracker
+                        if let Some(removed_state) = self.tracker.remove(&pubkey) {
                             debug!(
-                                "Removed closed account {} from tracker (compress_and_close log)",
-                                pubkey
+                                "Removed closed account {} from tracker (compress_and_close log, had {} lamports)",
+                                pubkey,
+                                removed_state.lamports
                             );
                             removed_count += 1;
+                        } else {
+                            debug!(
+                                "Received compress_and_close log for {} but account not in tracker",
+                                pubkey
+                            );
                         }
                     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Ok(pubkey) => {
if self.tracker.remove(&pubkey).is_some() {
debug!(
"Removed closed account {} from tracker (compress_and_close log)",
pubkey
);
removed_count += 1;
}
}
Ok(pubkey) => {
// Only log removal if the account actually existed in the tracker
if let Some(removed_state) = self.tracker.remove(&pubkey) {
debug!(
"Removed closed account {} from tracker (compress_and_close log, had {} lamports)",
pubkey,
removed_state.lamports
);
removed_count += 1;
} else {
debug!(
"Received compress_and_close log for {} but account not in tracker",
pubkey
);
}
}
🤖 Prompt for AI Agents
In forester/src/compressible/subscriber.rs around lines 260 to 268, the code
removes tracker entries as soon as a pubkey is parsed from logs without
verifying the account or that the CPI actually succeeded; update the logic so
you first lookup the tracker entry (do not call remove blindly), verify the
stored account owner/program matches the expected compress_and_close program id,
confirm the transaction and any inner CPIs truly succeeded (e.g., ensure
logs_response.err.is_none() and scan logs_response.logs for any "Program <prog>
failed" / failure markers or absence of failure messages for the compress
program), and only then call remove (and increment removed_count) — if any check
fails, leave the tracker untouched and optionally emit a debug/warn explaining
why nothing was removed.

Err(e) => {
error!(
"Invalid pubkey in compress_and_close log '{}': {}",
pubkey_str, e
);
}
}
Comment on lines +255 to +275

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential parsing failure if log entry has trailing whitespace.

The pubkey_str extracted from the log line is used directly in Pubkey::from_str(). If the log entry contains any trailing whitespace or newline characters, the parse will fail with an unhelpful error.

Consider trimming the string before parsing:

             if let Some(pubkey_str) = log
                 .strip_prefix("Program log: ")
                 .and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
             {
-                match Pubkey::from_str(pubkey_str) {
+                match Pubkey::from_str(pubkey_str.trim()) {
                     Ok(pubkey) => {

This is a defensive measure that costs nothing and prevents silent failures if log formatting varies.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some(pubkey_str) = log
.strip_prefix("Program log: ")
.and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
{
match Pubkey::from_str(pubkey_str) {
Ok(pubkey) => {
if self.tracker.remove(&pubkey).is_some() {
debug!(
"Removed closed account {} from tracker (compress_and_close log)",
pubkey
);
removed_count += 1;
}
}
Err(e) => {
error!(
"Invalid pubkey in compress_and_close log '{}': {}",
pubkey_str, e
);
}
}
if let Some(pubkey_str) = log
.strip_prefix("Program log: ")
.and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX))
{
match Pubkey::from_str(pubkey_str.trim()) {
Ok(pubkey) => {
if self.tracker.remove(&pubkey).is_some() {
debug!(
"Removed closed account {} from tracker (compress_and_close log)",
pubkey
);
removed_count += 1;
}
}
Err(e) => {
error!(
"Invalid pubkey in compress_and_close log '{}': {}",
pubkey_str, e
);
}
}
🤖 Prompt for AI Agents
In forester/src/compressible/subscriber.rs around lines 255 to 275, the
extracted pubkey_str may contain trailing whitespace/newlines causing
Pubkey::from_str to fail; trim the extracted string (e.g., let pubkey_trimmed =
pubkey_str.trim()) and use that trimmed value for Pubkey::from_str and in the
error/debug log messages so parsing succeeds with variable log formatting.

}
}

if removed_count > 0 {
info!(
"Removed {} closed accounts from transaction {}",
removed_count, logs_response.signature
);
}
}
}
22 changes: 20 additions & 2 deletions forester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub async fn run_pipeline<R: Rpc>(
rpc_rate_limiter: Option<RateLimiter>,
send_tx_rate_limiter: Option<RateLimiter>,
shutdown_service: oneshot::Receiver<()>,
shutdown_compressible: Option<oneshot::Receiver<()>>,
shutdown_compressible: Option<tokio::sync::broadcast::Receiver<()>>,
shutdown_bootstrap: Option<oneshot::Receiver<()>>,
work_report_sender: mpsc::Sender<WorkReport>,
) -> Result<()> {
Expand Down Expand Up @@ -225,7 +225,10 @@ pub async fn run_pipeline<R: Rpc>(
let tracker_clone = tracker.clone();
let ws_url = compressible_config.ws_url.clone();

// Spawn subscriber
// Create a second receiver for the log subscriber
let shutdown_rx_log = shutdown_rx.resubscribe();

// Spawn account subscriber
tokio::spawn(async move {
let mut subscriber =
compressible::AccountSubscriber::new(ws_url, tracker_clone, shutdown_rx);
Expand All @@ -234,6 +237,21 @@ pub async fn run_pipeline<R: Rpc>(
}
});

// Spawn log subscriber to detect compress_and_close operations
let tracker_clone_log = tracker.clone();
let ws_url_log = compressible_config.ws_url.clone();

tokio::spawn(async move {
let mut log_subscriber = compressible::LogSubscriber::new(
ws_url_log,
tracker_clone_log,
shutdown_rx_log,
);
if let Err(e) = log_subscriber.run().await {
tracing::error!("Log subscriber error: {:?}", e);
}
});

// Spawn bootstrap task
if let Some(shutdown_bootstrap_rx) = shutdown_bootstrap {
let tracker_clone = tracker.clone();
Expand Down
2 changes: 1 addition & 1 deletion forester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn main() -> Result<(), ForesterError> {
let (shutdown_receiver_compressible, shutdown_receiver_bootstrap) =
if config.compressible_config.is_some() {
let (shutdown_sender_compressible, shutdown_receiver_compressible) =
oneshot::channel();
tokio::sync::broadcast::channel(1);
let (shutdown_sender_bootstrap, shutdown_receiver_bootstrap) =
oneshot::channel();
tokio::spawn(async move {
Expand Down
5 changes: 3 additions & 2 deletions forester/tests/e2e_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,12 +759,13 @@ async fn setup_forester_pipeline(
) -> (
tokio::task::JoinHandle<anyhow::Result<()>>,
oneshot::Sender<()>,
oneshot::Sender<()>,
tokio::sync::broadcast::Sender<()>,
oneshot::Sender<()>,
mpsc::Receiver<WorkReport>,
) {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (shutdown_compressible_sender, shutdown_compressible_receiver) = oneshot::channel();
let (shutdown_compressible_sender, shutdown_compressible_receiver) =
tokio::sync::broadcast::channel(1);
let (shutdown_bootstrap_sender, shutdown_bootstrap_receiver) = oneshot::channel();
let (work_report_sender, work_report_receiver) = mpsc::channel(100);

Expand Down
3 changes: 2 additions & 1 deletion forester/tests/test_batch_append_spent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ async fn test_batch_sequence() {

async fn run_forester(config: &ForesterConfig, duration: Duration) {
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
let (shutdown_compressible_sender, shutdown_compressible_receiver) = oneshot::channel();
let (shutdown_compressible_sender, shutdown_compressible_receiver) =
tokio::sync::broadcast::channel(1);
let (work_report_sender, _) = mpsc::channel(100);

let service_handle = tokio::spawn(run_pipeline::<LightClient>(
Expand Down
Loading