From 8d2e4169fd414193c91311217703a48ae3187427 Mon Sep 17 00:00:00 2001 From: ananas-block Date: Wed, 10 Dec 2025 00:52:12 +0000 Subject: [PATCH 1/2] fix: forester compress and closed account cleanup --- forester/src/compressible/compressor.rs | 12 +- forester/src/compressible/mod.rs | 2 +- forester/src/compressible/state.rs | 80 ---------- forester/src/compressible/subscriber.rs | 144 +++++++++++++++++- forester/src/lib.rs | 22 ++- forester/src/main.rs | 2 +- forester/tests/e2e_test.rs | 5 +- forester/tests/test_batch_append_spent.rs | 3 +- forester/tests/test_compressible_ctoken.rs | 2 +- .../src/compressible/compress_and_close.rs | 10 ++ xtask/src/create_ctoken_account.rs | 16 +- 11 files changed, 190 insertions(+), 108 deletions(-) diff --git a/forester/src/compressible/compressor.rs b/forester/src/compressible/compressor.rs index 2c75b65765..bec2d9dd6b 100644 --- a/forester/src/compressible/compressor.rs +++ b/forester/src/compressible/compressor.rs @@ -17,7 +17,7 @@ use solana_sdk::{ signature::{Keypair, Signature}, signer::Signer, }; -use tracing::{debug, warn}; +use tracing::debug; use super::{state::CompressibleAccountTracker, types::CompressibleAccountState}; use crate::Result; @@ -196,9 +196,6 @@ impl Compressor { accounts.len() ); - // Collect pubkeys for sync before creating instruction - let pubkeys: Vec<_> = account_states.iter().map(|state| state.pubkey).collect(); - let ix = Instruction { program_id: registry_program_id, accounts, @@ -206,6 +203,8 @@ impl Compressor { }; // Send transaction + // Note: Account removal from tracker is handled by LogSubscriber which parses + // the "compress_and_close:" logs emitted by the registry program let signature = rpc .create_and_send_transaction( &[ix], @@ -215,11 +214,6 @@ impl Compressor { .await .map_err(|e| anyhow::anyhow!("Failed to send transaction: {}", e))?; - // Sync accounts to verify they're closed - if let Err(e) = self.tracker.sync_accounts(&*rpc, &pubkeys).await { - warn!("Failed to sync accounts after compression: {:?}. Tracker will update via subscriptions.", e); - } - Ok(signature) } } diff --git a/forester/src/compressible/mod.rs b/forester/src/compressible/mod.rs index ee22882838..2cd8d30b5b 100644 --- a/forester/src/compressible/mod.rs +++ b/forester/src/compressible/mod.rs @@ -9,5 +9,5 @@ pub use bootstrap::bootstrap_compressible_accounts; pub use compressor::Compressor; pub use config::CompressibleConfig; pub use state::CompressibleAccountTracker; -pub use subscriber::AccountSubscriber; +pub use subscriber::{AccountSubscriber, LogSubscriber}; pub use types::CompressibleAccountState; diff --git a/forester/src/compressible/state.rs b/forester/src/compressible/state.rs index 82a047a57d..accda1359c 100644 --- a/forester/src/compressible/state.rs +++ b/forester/src/compressible/state.rs @@ -156,86 +156,6 @@ impl CompressibleAccountTracker { Ok(()) } - - /// Query accounts and update tracker: remove non-existent accounts, update lamports for existing ones - pub async fn sync_accounts( - &self, - rpc: &R, - pubkeys: &[Pubkey], - ) -> Result<()> { - // Query all accounts at once using get_multiple_accounts - let accounts = rpc.get_multiple_accounts(pubkeys).await?; - - for (pubkey, account_opt) in pubkeys.iter().zip(accounts.iter()) { - match account_opt { - Some(account) => { - // Check if account is closed (lamports == 0) - if account.lamports == 0 { - self.remove(pubkey); - debug!("Removed closed account {} (lamports == 0)", pubkey); - continue; - } - - // Re-deserialize account data to verify it's still valid - let ctoken = match CToken::try_from_slice(&account.data) { - Ok(ct) => ct, - Err(e) => { - self.remove(pubkey); - debug!( - "Removed account {} (deserialization failed: {:?})", - pubkey, e - ); - continue; - } - }; - - // Verify Compressible extension still exists - let has_compressible_ext = ctoken.extensions.as_ref().is_some_and(|exts| { - exts.iter() - .any(|ext| matches!(ext, ExtensionStruct::Compressible(_))) - }); - - if !has_compressible_ext { - self.remove(pubkey); - debug!( - "Removed account {} (missing Compressible extension)", - pubkey - ); - continue; - } - - // Account is valid - update state - if let Some(mut state) = self.accounts.get_mut(pubkey) { - match calculate_compressible_slot(&ctoken, account.lamports) { - Ok(compressible_slot) => { - state.account = ctoken; - state.lamports = account.lamports; - state.compressible_slot = compressible_slot; - debug!( - "Updated account {}: lamports={}, compressible_slot={}", - pubkey, account.lamports, compressible_slot - ); - } - Err(e) => { - warn!( - "Failed to calculate compressible slot for account {}: {}. Removing from tracker.", - pubkey, e - ); - drop(state); - self.remove(pubkey); - } - } - } - } - None => { - // Account doesn't exist - remove from tracker - self.remove(pubkey); - debug!("Removed non-existent account {}", pubkey); - } - } - } - Ok(()) - } } impl Default for CompressibleAccountTracker { diff --git a/forester/src/compressible/subscriber.rs b/forester/src/compressible/subscriber.rs index ccfd57b657..c2d857747a 100644 --- a/forester/src/compressible/subscriber.rs +++ b/forester/src/compressible/subscriber.rs @@ -5,29 +5,38 @@ use light_ctoken_interface::{COMPRESSIBLE_TOKEN_ACCOUNT_SIZE, CTOKEN_PROGRAM_ID} use solana_account_decoder::UiAccountEncoding; use solana_client::{ nonblocking::pubsub_client::PubsubClient, - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_response::{Response as RpcResponse, RpcKeyedAccount}, + rpc_config::{ + RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, + }, + rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse}, }; use solana_rpc_client_api::filter::RpcFilterType; use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; -use tokio::sync::oneshot; +use tokio::sync::broadcast; use tracing::{debug, error, info}; use super::state::CompressibleAccountTracker; use crate::Result; +/// Registry program ID for subscribing to compress_and_close logs +const REGISTRY_PROGRAM_ID_STR: &str = "Lighton6oQpVkeewmo2mcPTQQp7kYHr4fWpAgJyEmDX"; + +/// Log prefix emitted by registry program when closing accounts +const COMPRESS_AND_CLOSE_LOG_PREFIX: &str = "compress_and_close:"; + /// Subscribes to account changes for all compressible CToken accounts pub struct AccountSubscriber { ws_url: String, tracker: Arc, - shutdown_rx: oneshot::Receiver<()>, + shutdown_rx: broadcast::Receiver<()>, } impl AccountSubscriber { pub fn new( ws_url: String, tracker: Arc, - shutdown_rx: oneshot::Receiver<()>, + shutdown_rx: broadcast::Receiver<()>, ) -> Self { Self { ws_url, @@ -45,7 +54,6 @@ impl AccountSubscriber { .map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?; let program_id = Pubkey::new_from_array(CTOKEN_PROGRAM_ID); - // Subscribe to compressed token program accounts with filter for compressible account size let (mut subscription, unsubscribe) = pubsub_client .program_subscribe( @@ -87,7 +95,7 @@ impl AccountSubscriber { } } } - _ = &mut self.shutdown_rx => { + _ = self.shutdown_rx.recv() => { info!("Shutdown signal received"); unsubscribe().await; break; @@ -154,3 +162,125 @@ impl AccountSubscriber { } } } + +/// Subscribes to registry program logs to detect compress_and_close operations +/// and remove closed accounts from the tracker by parsing log messages directly +pub struct LogSubscriber { + ws_url: String, + tracker: Arc, + shutdown_rx: broadcast::Receiver<()>, +} + +impl LogSubscriber { + pub fn new( + ws_url: String, + tracker: Arc, + shutdown_rx: broadcast::Receiver<()>, + ) -> Self { + Self { + ws_url, + tracker, + shutdown_rx, + } + } + + pub async fn run(&mut self) -> Result<()> { + info!("Starting log subscriber at {}", self.ws_url); + + // Connect to WebSocket + let pubsub_client = PubsubClient::new(&self.ws_url) + .await + .map_err(|e| anyhow::anyhow!("Failed to connect to WebSocket: {}", e))?; + + let registry_program_id = Pubkey::from_str(REGISTRY_PROGRAM_ID_STR) + .map_err(|e| anyhow::anyhow!("Invalid registry program ID: {}", e))?; + + // Subscribe to logs mentioning the registry program + let filter = RpcTransactionLogsFilter::Mentions(vec![registry_program_id.to_string()]); + let config = RpcTransactionLogsConfig { + commitment: Some(CommitmentConfig::confirmed()), + }; + + let (mut subscription, unsubscribe) = pubsub_client + .logs_subscribe(filter, config) + .await + .map_err(|e| anyhow::anyhow!("Failed to subscribe to logs: {}", e))?; + + info!( + "Log subscription established for registry program {}", + registry_program_id + ); + + // Process subscription messages + loop { + tokio::select! { + result = subscription.next() => { + match result { + Some(response) => { + self.handle_log_notification(response); + } + None => { + error!("Log subscription stream closed unexpectedly"); + unsubscribe().await; + return Err(anyhow::anyhow!("Log subscription stream closed")); + } + } + } + _ = self.shutdown_rx.recv() => { + info!("Shutdown signal received for log subscriber"); + unsubscribe().await; + break; + } + } + } + + info!("Log subscriber stopped"); + Ok(()) + } + + fn handle_log_notification(&self, response: RpcResponse) { + let logs_response = response.value; + + // Skip failed transactions + if logs_response.err.is_some() { + debug!("Skipping failed transaction {}", logs_response.signature); + return; + } + + // Parse logs looking for compress_and_close entries + let mut removed_count = 0; + for log in &logs_response.logs { + // Look for our log prefix: "Program log: compress_and_close:" + // The actual log format is "Program log: compress_and_close:" + if let Some(pubkey_str) = log + .strip_prefix("Program log: ") + .and_then(|s| s.strip_prefix(COMPRESS_AND_CLOSE_LOG_PREFIX)) + { + match Pubkey::from_str(pubkey_str) { + Ok(pubkey) => { + if self.tracker.remove(&pubkey).is_some() { + debug!( + "Removed closed account {} from tracker (compress_and_close log)", + pubkey + ); + removed_count += 1; + } + } + Err(e) => { + error!( + "Invalid pubkey in compress_and_close log '{}': {}", + pubkey_str, e + ); + } + } + } + } + + if removed_count > 0 { + info!( + "Removed {} closed accounts from transaction {}", + removed_count, logs_response.signature + ); + } + } +} diff --git a/forester/src/lib.rs b/forester/src/lib.rs index ade1fb161b..de5254d0bb 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -158,7 +158,7 @@ pub async fn run_pipeline( rpc_rate_limiter: Option, send_tx_rate_limiter: Option, shutdown_service: oneshot::Receiver<()>, - shutdown_compressible: Option>, + shutdown_compressible: Option>, shutdown_bootstrap: Option>, work_report_sender: mpsc::Sender, ) -> Result<()> { @@ -225,7 +225,10 @@ pub async fn run_pipeline( let tracker_clone = tracker.clone(); let ws_url = compressible_config.ws_url.clone(); - // Spawn subscriber + // Create a second receiver for the log subscriber + let shutdown_rx_log = shutdown_rx.resubscribe(); + + // Spawn account subscriber tokio::spawn(async move { let mut subscriber = compressible::AccountSubscriber::new(ws_url, tracker_clone, shutdown_rx); @@ -234,6 +237,21 @@ pub async fn run_pipeline( } }); + // Spawn log subscriber to detect compress_and_close operations + let tracker_clone_log = tracker.clone(); + let ws_url_log = compressible_config.ws_url.clone(); + + tokio::spawn(async move { + let mut log_subscriber = compressible::LogSubscriber::new( + ws_url_log, + tracker_clone_log, + shutdown_rx_log, + ); + if let Err(e) = log_subscriber.run().await { + tracing::error!("Log subscriber error: {:?}", e); + } + }); + // Spawn bootstrap task if let Some(shutdown_bootstrap_rx) = shutdown_bootstrap { let tracker_clone = tracker.clone(); diff --git a/forester/src/main.rs b/forester/src/main.rs index d56cafc143..c157ac54c5 100644 --- a/forester/src/main.rs +++ b/forester/src/main.rs @@ -41,7 +41,7 @@ async fn main() -> Result<(), ForesterError> { let (shutdown_receiver_compressible, shutdown_receiver_bootstrap) = if config.compressible_config.is_some() { let (shutdown_sender_compressible, shutdown_receiver_compressible) = - oneshot::channel(); + tokio::sync::broadcast::channel(1); let (shutdown_sender_bootstrap, shutdown_receiver_bootstrap) = oneshot::channel(); tokio::spawn(async move { diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index abb43eb646..c1312ba6a8 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -759,12 +759,13 @@ async fn setup_forester_pipeline( ) -> ( tokio::task::JoinHandle>, oneshot::Sender<()>, - oneshot::Sender<()>, + tokio::sync::broadcast::Sender<()>, oneshot::Sender<()>, mpsc::Receiver, ) { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let (shutdown_compressible_sender, shutdown_compressible_receiver) = oneshot::channel(); + let (shutdown_compressible_sender, shutdown_compressible_receiver) = + tokio::sync::broadcast::channel(1); let (shutdown_bootstrap_sender, shutdown_bootstrap_receiver) = oneshot::channel(); let (work_report_sender, work_report_receiver) = mpsc::channel(100); diff --git a/forester/tests/test_batch_append_spent.rs b/forester/tests/test_batch_append_spent.rs index a0b189e191..9df0c0ee47 100644 --- a/forester/tests/test_batch_append_spent.rs +++ b/forester/tests/test_batch_append_spent.rs @@ -322,7 +322,8 @@ async fn test_batch_sequence() { async fn run_forester(config: &ForesterConfig, duration: Duration) { let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - let (shutdown_compressible_sender, shutdown_compressible_receiver) = oneshot::channel(); + let (shutdown_compressible_sender, shutdown_compressible_receiver) = + tokio::sync::broadcast::channel(1); let (work_report_sender, _) = mpsc::channel(100); let service_handle = tokio::spawn(run_pipeline::( diff --git a/forester/tests/test_compressible_ctoken.rs b/forester/tests/test_compressible_ctoken.rs index c2a9fb2d7a..3c19d085ad 100644 --- a/forester/tests/test_compressible_ctoken.rs +++ b/forester/tests/test_compressible_ctoken.rs @@ -205,7 +205,7 @@ async fn test_compressible_ctoken_compression() { .expect("Failed to airdrop lamports"); // Setup tracker and subscriber let tracker = Arc::new(CompressibleAccountTracker::new()); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); let mut subscriber = AccountSubscriber::new( "ws://localhost:8900".to_string(), tracker.clone(), diff --git a/programs/registry/src/compressible/compress_and_close.rs b/programs/registry/src/compressible/compress_and_close.rs index 534865160b..8a51a1c30d 100644 --- a/programs/registry/src/compressible/compress_and_close.rs +++ b/programs/registry/src/compressible/compress_and_close.rs @@ -53,6 +53,16 @@ pub fn process_compress_and_close<'c: 'info, 'info>( Transfer2CpiAccounts::try_from_account_infos(fee_payer, ctx.remaining_accounts) .map_err(ProgramError::from)?; + // Emit logs for closed accounts (used by forester to track closures) + for idx in &indices { + if let Ok(source_account) = transfer2_accounts + .packed_accounts + .get_u8(idx.source_index, "source_account") + { + msg!("compress_and_close:{}", source_account.key); + } + } + let instruction = compress_and_close_ctoken_accounts_with_indices( ctx.accounts.authority.key(), authority_index, diff --git a/xtask/src/create_ctoken_account.rs b/xtask/src/create_ctoken_account.rs index 5d1fa00f7a..be9097d43c 100644 --- a/xtask/src/create_ctoken_account.rs +++ b/xtask/src/create_ctoken_account.rs @@ -4,7 +4,10 @@ use clap::Parser; use dirs::home_dir; use light_client::rpc::{LightClient, LightClientConfig, Rpc}; use light_ctoken_sdk::ctoken::{CompressibleParams, CreateCTokenAccount}; -use solana_sdk::signature::{read_keypair_file, Keypair, Signer}; +use solana_sdk::{ + signature::{read_keypair_file, Keypair, Signer}, + transaction::Transaction, +}; #[derive(Debug, Parser)] pub struct Options { @@ -82,9 +85,14 @@ pub async fn create_ctoken_account(options: Options) -> anyhow::Result<()> { .with_compressible(compressible_params) .instruction()?; - let signature = rpc - .create_and_send_transaction(&[create_ix], &payer.pubkey(), &[&payer, &account_keypair]) - .await?; + let transaction = Transaction::new_signed_with_payer( + &[create_ix], + Some(&payer.pubkey()), + &[&payer, &account_keypair], + rpc.get_latest_blockhash().await?.0, + ); + + let signature = rpc.send_transaction(&transaction).await?; println!( "[{}/{}] Account: {} | Mint: {} | Sig: {:?}", From 1d03a619d7e32418fbe51cfe76b81a31ad2fa136 Mon Sep 17 00:00:00 2001 From: ananas-block Date: Wed, 10 Dec 2025 01:13:47 +0000 Subject: [PATCH 2/2] fix: compressible test --- forester/tests/test_compressible_ctoken.rs | 38 ++++++++++++++++++---- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/forester/tests/test_compressible_ctoken.rs b/forester/tests/test_compressible_ctoken.rs index 3c19d085ad..77d5fd0f60 100644 --- a/forester/tests/test_compressible_ctoken.rs +++ b/forester/tests/test_compressible_ctoken.rs @@ -1,6 +1,8 @@ use std::{sync::Arc, time::Duration}; -use forester::compressible::{AccountSubscriber, CompressibleAccountTracker, Compressor}; +use forester::compressible::{ + AccountSubscriber, CompressibleAccountTracker, Compressor, LogSubscriber, +}; use forester_utils::{ forester_epoch::get_epoch_phases, rpc_pool::{SolanaRpcPool, SolanaRpcPoolBuilder}, @@ -203,16 +205,35 @@ async fn test_compressible_ctoken_compression() { rpc.airdrop_lamports(&payer.pubkey(), 10_000_000_000) .await .expect("Failed to airdrop lamports"); - // Setup tracker and subscriber + // Setup tracker and subscribers let tracker = Arc::new(CompressibleAccountTracker::new()); let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1); - let mut subscriber = AccountSubscriber::new( + let shutdown_rx_log = shutdown_tx.subscribe(); + + // Spawn account subscriber to track new/updated accounts + let mut account_subscriber = AccountSubscriber::new( "ws://localhost:8900".to_string(), tracker.clone(), shutdown_rx, ); - let subscriber_handle = tokio::spawn(async move { - subscriber.run().await.expect("Subscriber failed to run"); + let account_subscriber_handle = tokio::spawn(async move { + account_subscriber + .run() + .await + .expect("Account subscriber failed to run"); + }); + + // Spawn log subscriber to detect compress_and_close operations + let mut log_subscriber = LogSubscriber::new( + "ws://localhost:8900".to_string(), + tracker.clone(), + shutdown_rx_log, + ); + let log_subscriber_handle = tokio::spawn(async move { + log_subscriber + .run() + .await + .expect("Log subscriber failed to run"); }); sleep(Duration::from_secs(2)).await; // Create mint @@ -332,7 +353,12 @@ async fn test_compressible_ctoken_compression() { shutdown_tx .send(()) .expect("Failed to send shutdown signal"); - subscriber_handle.await.expect("Subscriber task panicked"); + account_subscriber_handle + .await + .expect("Account subscriber task panicked"); + log_subscriber_handle + .await + .expect("Log subscriber task panicked"); } /// Test that bootstrap process picks up existing compressible token accounts