diff --git a/src/faucet/rate_limiter.rs b/src/faucet/rate_limiter.rs index 86415c9..2c04c6e 100644 --- a/src/faucet/rate_limiter.rs +++ b/src/faucet/rate_limiter.rs @@ -86,6 +86,20 @@ impl RateLimiterCore { Ok((faucet_info, id)) } + /// Same shape as [`parse_request_path`] but expects a trailing `/refund` segment. + fn parse_refund_path(path: &str) -> Result<(FaucetInfo, String)> { + let mut path_info = path.split('/'); + if path_info.next_back() != Some("refund") { + return Err(Error::RustError( + "rate limiter refund path must end with /refund".into(), + )); + } + let id = path_info.next_back().unwrap_or_default().to_string(); + let faucet_info = FaucetInfo::from_str(path_info.next_back().unwrap_or_default()) + .map_err(|e| Error::RustError(e.to_string()))?; + Ok((faucet_info, id)) + } + async fn get_rate_limit( &self, faucet_info: &FaucetInfo, @@ -184,6 +198,35 @@ impl RateLimiterCore { Ok(()) } + async fn refund_last_drip(&self, faucet_info: &FaucetInfo, id: &str) -> Result<()> { + let drip_amount = faucet_info.drip_amount(); + let dripped = self + .storage + .get::("dripped") + .await + .ok() + .flatten() + .unwrap_or_else(|| DripAmount::zero(faucet_info.token_type())); + let claimed_key = format!("claimed_{id}"); + let claimed = self + .storage + .get::(&claimed_key) + .await + .ok() + .flatten() + .unwrap_or_else(|| DripAmount::zero(faucet_info.token_type())); + let updated_dripped = dripped.saturating_sub(&drip_amount); + let updated_claimed = claimed.saturating_sub(&drip_amount); + self.storage.put("dripped", updated_dripped.clone()).await?; + self.storage + .put(&claimed_key, updated_claimed.clone()) + .await?; + log::info!( + "{faucet_info} Rate limiter refund for {id}: claimed={updated_claimed:?}, dripped={updated_dripped:?}" + ); + Ok(()) + } + #[allow(dead_code)] async fn handle_request(&self, path: &str, now: DateTime) -> Result> { let (faucet_info, id) = Self::parse_request_path(path)?; @@ -208,6 +251,7 @@ impl RateLimiterCore { pub struct RateLimiter { #[cfg(not(test))] state: State, + env: Env, } #[cfg(not(test))] @@ -215,6 +259,29 @@ impl RateLimiter { fn parse_request_path(path: &str) -> Result<(FaucetInfo, String)> { RateLimiterCore::::parse_request_path(path) } + + fn parse_refund_path(path: &str) -> Result<(FaucetInfo, String)> { + RateLimiterCore::::parse_refund_path(path) + } + + fn refund_authorized(req: &Request, env: &Env) -> Result { + let Ok(expected) = env + .secret("RATE_LIMITER_REFUND_SECRET") + .map(|s| s.to_string()) + else { + return Ok(false); + }; + let expected = expected.trim(); + if expected.is_empty() { + return Ok(false); + } + let bearer = req + .headers() + .get("Authorization")? + .and_then(|h| h.strip_prefix("Bearer ").map(|t| t.trim().to_string())) + .unwrap_or_default(); + Ok(bearer == expected) + } fn create_core(&self) -> RateLimiterCore> { RateLimiterCore::new(DurableObjectStorage::new(&self.state)) } @@ -240,15 +307,28 @@ impl RateLimiter { .update_rate_limit(faucet_info, id, now, claimed, dripped) .await } + + async fn refund_rate_limit_for_path(&self, path: &str) -> Result<()> { + let (faucet_info, id) = Self::parse_refund_path(path)?; + self.create_core().refund_last_drip(&faucet_info, &id).await + } } #[cfg(not(test))] impl DurableObject for RateLimiter { - fn new(state: State, _env: Env) -> Self { - Self { state } + fn new(state: State, env: Env) -> Self { + Self { state, env } } async fn fetch(&self, req: Request) -> Result { + if req.method() == Method::Post { + if !Self::refund_authorized(&req, &self.env)? { + return Response::error("Unauthorized", 401); + } + self.refund_rate_limit_for_path(&req.path()).await?; + return Response::ok(""); + } + let now = Utc::now(); let (faucet_info, id) = Self::parse_request_path(&req.path())?; let (is_allowed, retry_after, claimed, dripped) = @@ -455,6 +535,50 @@ mod tests { assert_eq!(id, "test_wallet_123"); } + #[tokio::test] + async fn test_parse_refund_path_ok() { + let path = "http://do/rate_limiter/CalibnetFIL/test_wallet_123/refund"; + let (faucet_info, id) = + RateLimiterCore::::parse_refund_path(path).unwrap(); + assert_eq!(faucet_info, FaucetInfo::CalibnetFIL); + assert_eq!(id, "test_wallet_123"); + } + + #[tokio::test] + async fn test_refund_subtracts_last_drip() { + use crate::utils::drip_amount::TokenType; + use mockall::predicate::eq; + + let wallet_id = "w1"; + let faucet_info = FaucetInfo::CalibnetFIL; + let drip = faucet_info.drip_amount(); + let drip_for_claimed = drip.clone(); + let zero = DripAmount::zero(TokenType::Native); + + let mut mock_storage = MockRateLimiterStorage::new(); + mock_storage + .expect_get::() + .with(eq("dripped")) + .return_once(move |_| Ok(Some(drip.clone()))); + mock_storage + .expect_get::() + .with(eq(format!("claimed_{wallet_id}"))) + .return_once(move |_| Ok(Some(drip_for_claimed.clone()))); + mock_storage + .expect_put::() + .with(eq("dripped"), eq(zero.clone())) + .return_once(|_, _| Ok(())); + mock_storage + .expect_put::() + .with(eq(format!("claimed_{wallet_id}")), eq(zero)) + .return_once(|_, _| Ok(())); + + let core = RateLimiterCore::new(mock_storage); + core.refund_last_drip(&faucet_info, wallet_id) + .await + .unwrap(); + } + /// Checks path parsing with an invalid path. #[tokio::test] async fn test_parse_request_path_invalid_faucet() { diff --git a/src/faucet/server.rs b/src/faucet/server.rs index d7ceb08..919b505 100644 --- a/src/faucet/server.rs +++ b/src/faucet/server.rs @@ -116,6 +116,53 @@ async fn query_rate_limiter( .await } +/// Undoes one drip allocation in the rate limiter DO after a failed on-chain submission. +/// Server-internal: do not call from a browser-facing server function (would defeat rate limiting). +pub async fn refund_rate_limit_by_key( + faucet_info: FaucetInfo, + wallet_addr: AnyAddress, +) -> Result<(), ServerFnError> { + SendWrapper::new(async move { + let Extension(env): Extension> = extract().await?; + if env + .secret("RATE_LIMITER_DISABLED") + .map(|v| v.to_string().to_lowercase() == "true") + .unwrap_or(false) + { + return Ok(()); + } + let token = match env.secret("RATE_LIMITER_REFUND_SECRET") { + Ok(s) if !s.to_string().trim().is_empty() => s.to_string(), + _ => { + log::warn!("RATE_LIMITER_REFUND_SECRET unset/empty: skipping rate limit refund"); + return Ok(()); + } + }; + let stub = env + .durable_object("RATE_LIMITER")? + .id_from_name(&faucet_info.to_string())? + .get_stub()?; + let headers = Headers::new(); + headers.set("Authorization", &format!("Bearer {}", token.trim()))?; + let mut init = RequestInit::new(); + init.with_method(Method::Post).with_headers(headers); + let request = Request::new_with_init( + &format!("http://do/rate_limiter/{faucet_info}/{wallet_addr}/refund"), + &init, + )?; + let status = stub + .fetch_with_request(request) + .await + .map_err(ServerFnError::new)? + .status_code(); + if !(200..300).contains(&status) { + log::warn!("rate limit refund DO returned HTTP {status} (faucet={faucet_info})"); + } + Ok(()) + }) + .await +} + /// Checks if the request can proceed based on the rate limit for the given faucet. pub async fn check_rate_limit( faucet_info: FaucetInfo, diff --git a/src/faucet/server_api.rs b/src/faucet/server_api.rs index 0d39dba..f330e66 100644 --- a/src/faucet/server_api.rs +++ b/src/faucet/server_api.rs @@ -23,8 +23,8 @@ use alloy::{sol, sol_types::SolCall}; #[cfg(feature = "ssr")] use super::server::{ - check_rate_limit, read_faucet_secret, secret_key, sign_with_eth_secret_key, - sign_with_secret_key, + check_rate_limit, read_faucet_secret, refund_rate_limit_by_key, secret_key, + sign_with_eth_secret_key, sign_with_secret_key, }; #[cfg(feature = "ssr")] @@ -439,14 +439,20 @@ async fn handle_native_claim( ) .await { - Ok(LotusJson(smsg)) => { - let cid = rpc.mpool_push(smsg).await.map_err(ServerFnError::new)?; - let tx_hash = rpc + Ok(LotusJson(smsg)) => match rpc.mpool_push(smsg).await.map_err(ServerFnError::new) { + Ok(cid) => rpc .eth_get_transaction_hash_by_cid(cid) .await - .map_err(ServerFnError::new)?; - Ok(tx_hash) - } + .map_err(ServerFnError::new), + Err(e) => { + let _ = refund_rate_limit_by_key( + faucet_info, + AnyAddress::Filecoin(LotusJson(id_address)), + ) + .await; + Err(e) + } + }, Err(err) => Err(handle_faucet_error(err)), } } @@ -469,13 +475,17 @@ async fn handle_erc20_claim( let gas_price = rpc.gas_price().await.map_err(ServerFnError::new)?; match signed_erc20_transfer(eth_to, nonce, gas_price, faucet_info).await { - Ok(signed) => { - let tx_hash = rpc - .send_eth_transaction_signed(&signed) - .await - .map_err(ServerFnError::new)?; - Ok(tx_hash) - } + Ok(signed) => match rpc + .send_eth_transaction_signed(&signed) + .await + .map_err(ServerFnError::new) + { + Ok(tx_hash) => Ok(tx_hash), + Err(e) => { + let _ = refund_rate_limit_by_key(faucet_info, AnyAddress::Ethereum(eth_to)).await; + Err(e) + } + }, Err(err) => Err(handle_faucet_error(err)), } } diff --git a/src/utils/drip_amount.rs b/src/utils/drip_amount.rs index 2cd44fc..4b2e830 100644 --- a/src/utils/drip_amount.rs +++ b/src/utils/drip_amount.rs @@ -1,4 +1,5 @@ -use fvm_shared::{bigint::Zero, econ::TokenAmount, sector::StoragePower}; +use fvm_shared::{econ::TokenAmount, sector::StoragePower}; +use num_traits::Zero as _; use serde::{Deserialize, Serialize}; use std::ops::{Add, AddAssign, Mul}; @@ -27,6 +28,19 @@ impl DripAmount { TokenType::Datacap => DripAmount::Storage(StoragePower::zero()), } } + + /// Subtract `rhs` from self, saturating at the appropriate zero representation. + pub fn saturating_sub(&self, rhs: &DripAmount) -> DripAmount { + match (self, rhs) { + (DripAmount::Token(a), DripAmount::Token(b)) => { + DripAmount::Token(if *a <= *b { TokenAmount::zero() } else { a - b }) + } + (DripAmount::Storage(a), DripAmount::Storage(b)) => { + DripAmount::Storage(if a <= b { StoragePower::zero() } else { a - b }) + } + _ => unreachable!("DripAmount variant mismatch"), + } + } } impl Add<&DripAmount> for &DripAmount {