diff --git a/Cargo.lock b/Cargo.lock index 7ec137f..d401c53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -606,6 +606,7 @@ dependencies = [ "colored", "concurrent-map", "config", + "kanal", "parking_lot", "rand", "serde", @@ -2318,6 +2319,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "keccak" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 19836d1..b217ea2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ chrono = { version = "0.4.38", default-features = false, features = ["now"] } colored = "2.1.0" concurrent-map = "5.0.37" config = { version = "0.14.0", features = ["toml"], default-features = false } +kanal = "0.1.0-pre8" parking_lot = "0.12.3" rand = "0.8.5" serde = { version = "1.0.214", features = ["derive"] } diff --git a/src/batcher.rs b/src/batcher.rs new file mode 100644 index 0000000..57fd04b --- /dev/null +++ b/src/batcher.rs @@ -0,0 +1,153 @@ +use std::{collections::VecDeque, sync::Arc, time::Duration}; + +use bdk_wallet::bitcoin::{self, Amount}; +use chrono::Utc; +use kanal::{unbounded_async, AsyncSender, SendError}; +use parking_lot::{RwLock, RwLockWriteGuard}; +use terrors::OneOf; +use tokio::{ + select, spawn, + task::{spawn_blocking, JoinHandle}, + time::interval, +}; +use tracing::{error, info, info_span, Instrument}; + +use crate::l1::{fee_rate, L1Wallet, Persister, ESPLORA_CLIENT}; + +pub enum PayoutRequest { + L1(L1PayoutRequest), +} + +pub struct L1PayoutRequest { + pub address: bitcoin::Address, + pub amount: Amount, +} + +pub struct Batcher { + task: Option>, + payout_sender: Option>, + cfg: BatcherConfig, +} + +#[derive(Debug)] +pub struct BatcherNotStarted; + +#[derive(Debug)] +#[allow(dead_code)] +pub struct BatcherNotAvailable(SendError); + +#[derive(Debug, Clone)] +pub struct BatcherConfig { + pub period: Duration, + pub max_per_tx: usize, + pub max_in_flight: usize, +} + +impl Batcher { + /// Creates a new `Batcher`. + /// You should call `Batcher::start` after this to start the batcher task, + /// otherwise the batcher won't do anything. + pub fn new(cfg: BatcherConfig) -> Self { + Self { + task: None, + payout_sender: None, + cfg, + } + } + + pub fn start(&mut self, l1_wallet: Arc>) { + let (tx, rx) = unbounded_async(); + + let cfg = self.cfg.clone(); + + let span = info_span!("batcher"); + let batcher_task = spawn(async move { + let mut batch_interval = interval(cfg.period); + let mut l1_payout_queue: VecDeque = VecDeque::new(); + + loop { + select! { + // biased to ensure that even if we have incoming requests, they don't block + // each batch from being built when it's scheduled + biased; + instant = batch_interval.tick() => { + if l1_payout_queue.is_empty() { + continue + } + let span = info_span!("batch processing", batch = ?instant); + let _guard = span.enter(); + + let mut l1w = l1_wallet.write(); + + let mut psbt = l1w.build_tx(); + psbt.fee_rate(fee_rate()); + let num_to_deque = cfg.max_per_tx.min(l1_payout_queue.len()); + let mut total_sent = Amount::ZERO; + for req in l1_payout_queue.drain(..num_to_deque) { + psbt.add_recipient(req.address.script_pubkey(), req.amount); + total_sent += req.amount; + } + let mut psbt = match psbt.finish() { + Ok(psbt) => psbt, + Err(e) => { + error!("failed finalizing tx: {e:?}"); + continue; + } + }; + + let l1w = RwLockWriteGuard::downgrade(l1w); + + l1w.sign(&mut psbt, Default::default()) + .expect("signing should not fail"); + let tx = psbt.extract_tx().expect("fully signed psbt"); + + let l1_wallet = l1_wallet.clone(); + let span = info_span!("broadcast l1 tx", batch = ?instant); + spawn(async move { + if let Err(e) = ESPLORA_CLIENT.broadcast(&tx).await { + error!("error broadcasting tx: {e:?}"); + } + info!("sent {total_sent} to {num_to_deque} requestors"); + // triple nested spawn! + spawn_blocking(move || { + let mut l1w = l1_wallet.write(); + l1w.apply_unconfirmed_txs([(tx, Utc::now().timestamp() as u64)]); + l1w.persist(&mut Persister).expect("persist should work"); + }) + .await + .expect("successful blocking update"); + }.instrument(span)); + } + req = rx.recv() => match req { + Ok(req) => match req { + PayoutRequest::L1(req) => if l1_payout_queue.len() < cfg.max_in_flight { + l1_payout_queue.push_back(req) + } + }, + Err(e) => error!("error receiving PayoutRequest: {e:?}") + } + } + } + }.instrument(span)); + + self.task = Some(batcher_task); + self.payout_sender = Some(tx); + } + + pub async fn queue_payout_request( + &self, + req: PayoutRequest, + ) -> Result<(), OneOf<(BatcherNotStarted, BatcherNotAvailable)>> { + let tx = self + .payout_sender + .as_ref() + .ok_or(OneOf::new(BatcherNotStarted))? + .clone(); + + tx.send(req) + .await + .map_err(|e| OneOf::new(BatcherNotAvailable(e)))?; + + Ok(()) + } +} diff --git a/src/l1.rs b/src/l1.rs index 8981751..5b67e11 100644 --- a/src/l1.rs +++ b/src/l1.rs @@ -20,10 +20,11 @@ use bdk_wallet::{ rusqlite::{self, Connection}, ChangeSet, KeychainKind, PersistedWallet, Wallet, WalletPersister, }; +use parking_lot::RwLock; use tokio::time::sleep; use tracing::{debug, error, info, warn}; -use crate::{seed::Seed, AppState, SETTINGS}; +use crate::{seed::Seed, SETTINGS}; /// Live updating fee rate in sat/kwu static FEE_RATE: AtomicU64 = AtomicU64::new(250); @@ -141,11 +142,10 @@ impl L1Wallet { /// Spawns a tokio task that scans the chain for the wallet's outputs /// every 30 secs. - pub fn spawn_syncer(state: Arc) { + pub fn spawn_syncer(l1_wallet: Arc>) { tokio::spawn(async move { loop { - let req = state - .l1_wallet + let req = l1_wallet .read() .start_full_scan() .inspect({ @@ -169,7 +169,7 @@ impl L1Wallet { { // in a separate block otherwise compiler gets upset that we're holding // this over the await point - let mut l1w = state.l1_wallet.write(); + let mut l1w = l1_wallet.write(); l1w.apply_update(update) .expect("should be able to connect to db"); l1w.persist(&mut Persister).expect("persist should work"); diff --git a/src/main.rs b/src/main.rs index 4edec15..553873c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ //! A simple faucet server that uses [`axum`] and [`bdk_wallet`] //! to generate and dispense bitcoin. +mod batcher; pub mod hex; pub mod l1; pub mod l2; @@ -28,26 +29,27 @@ use axum::{ Json, Router, }; use axum_client_ip::SecureClientIp; +use batcher::{Batcher, L1PayoutRequest, PayoutRequest}; use bdk_wallet::{ bitcoin::{address::NetworkUnchecked, Address as L1Address}, KeychainKind, }; -use chrono::Utc; use hex::Hex; -use l1::{fee_rate, L1Wallet, Persister, ESPLORA_CLIENT}; +use l1::{L1Wallet, Persister}; use l2::L2Wallet; -use parking_lot::{RwLock, RwLockWriteGuard}; +use parking_lot::RwLock; use pow::{Challenge, Nonce, Solution}; use seed::SavableSeed; use serde::{Deserialize, Serialize}; use settings::SETTINGS; -use tokio::{net::TcpListener, task::spawn_blocking}; +use tokio::net::TcpListener; use tracing::{error, info}; use tracing_subscriber::EnvFilter; pub struct AppState { - l1_wallet: RwLock, + l1_wallet: Arc>, l2_wallet: L2Wallet, + batcher: Batcher, } pub static CRATE_NAME: LazyLock = @@ -78,14 +80,18 @@ async fn main() { l1::spawn_fee_rate_task(); let l2_wallet = L2Wallet::new(&seed).expect("l2 wallet creation to succeed"); + let l1_wallet = Arc::new(RwLock::new(l1_wallet)); + let mut batcher = Batcher::new(SETTINGS.batcher.clone()); + batcher.start(l1_wallet.clone()); + + L1Wallet::spawn_syncer(l1_wallet.clone()); let state = Arc::new(AppState { - l1_wallet: l1_wallet.into(), + l1_wallet, l2_wallet, + batcher, }); - L1Wallet::spawn_syncer(state.clone()); - let app = Router::new() .route("/pow_challenge", get(get_pow_challenge)) .route("/claim_l1/:solution/:address", get(claim_l1)) @@ -127,7 +133,7 @@ async fn claim_l1( SecureClientIp(ip): SecureClientIp, Path((solution, address)): Path<(Hex, L1Address)>, State(state): State>, -) -> Result { +) -> Result<(), (StatusCode, String)> { let IpAddr::V4(ip) = ip else { return Err(( StatusCode::BAD_REQUEST, @@ -147,58 +153,16 @@ async fn claim_l1( ) })?; - let psbt = { - let mut l1w = state.l1_wallet.write(); - let balance = l1w.balance(); - if balance.trusted_spendable() < SETTINGS.sats_per_claim { - return Err(( - StatusCode::SERVICE_UNAVAILABLE, - "not enough bitcoin in the faucet".to_owned(), - )); - } - let mut psbt = l1w - .build_tx() - .fee_rate(fee_rate()) - .add_recipient(address.script_pubkey(), SETTINGS.sats_per_claim) - .clone() - .finish() - .expect("transaction to be constructed"); - let l1w = RwLockWriteGuard::downgrade(l1w); - l1w.sign(&mut psbt, Default::default()) - .expect("signing should not fail"); - psbt - }; - - let tx = psbt.extract_tx().map_err(|e| { - error!("error extracting tx: {e:?}"); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "error extracting tx".to_owned(), - ) - })?; - - if let Err(e) = ESPLORA_CLIENT.broadcast(&tx).await { - error!("error broadcasting tx: {e:?}"); - return Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "error broadcasting".to_owned(), - )); - } - - let txid = tx.compute_txid(); - - let state = state.clone(); - spawn_blocking(move || { - let mut l1w = state.l1_wallet.write(); - l1w.apply_unconfirmed_txs([(tx, Utc::now().timestamp() as u64)]); - l1w.persist(&mut Persister).expect("persist should work"); - }) - .await - .expect("successful blocking update"); - - info!("l1 claim to {address} via tx {}", txid); + state + .batcher + .queue_payout_request(PayoutRequest::L1(L1PayoutRequest { + address, + amount: SETTINGS.sats_per_claim, + })) + .await + .expect("successful queuing"); - Ok(txid.to_string()) + Ok(()) } async fn claim_l2( diff --git a/src/settings.rs b/src/settings.rs index 2069743..96a34cd 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -3,6 +3,7 @@ use std::{ path::PathBuf, str::FromStr, sync::LazyLock, + time::Duration, }; use axum_client_ip::SecureClientIpSource; @@ -10,7 +11,7 @@ use bdk_wallet::bitcoin::{Amount, Network}; use config::Config; use serde::{Deserialize, Serialize}; -use crate::CRATE_NAME; +use crate::{batcher::BatcherConfig, CRATE_NAME}; pub static SETTINGS: LazyLock = LazyLock::new(|| { let args = std::env::args().collect::>(); @@ -49,9 +50,12 @@ pub struct InternalSettings { pub l2_http_endpoint: String, pub sats_per_claim: Amount, pub pow_difficulty: u8, + pub batcher_period: Option, + pub batcher_max_per_batch: Option, + pub batcher_max_in_flight: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug)] /// Settings struct filled with either config values or /// opinionated defaults pub struct Settings { @@ -65,6 +69,7 @@ pub struct Settings { pub l2_http_endpoint: String, pub sats_per_claim: Amount, pub pow_difficulty: u8, + pub batcher: BatcherConfig, } // on L2, we represent 1 btc as 1 "eth" on the rollup @@ -93,6 +98,11 @@ impl TryFrom for Settings { l2_http_endpoint: internal.l2_http_endpoint, sats_per_claim: internal.sats_per_claim, pow_difficulty: internal.pow_difficulty, + batcher: BatcherConfig { + period: Duration::from_secs(internal.batcher_period.unwrap_or(30)), + max_per_tx: internal.batcher_max_per_batch.unwrap_or(250), + max_in_flight: internal.batcher_max_in_flight.unwrap_or(2500), + }, }) } }