Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: L1 tx batcher #20

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ chrono = { version = "0.4.38", default-features = false, features = ["now"] }
colored = "2.1.0"
concurrent-map = "5.0.37"
config = { version = "0.14.0", features = ["toml"], default-features = false }
kanal = "0.1.0-pre8"
parking_lot = "0.12.3"
rand = "0.8.5"
serde = { version = "1.0.214", features = ["derive"] }
Expand Down
153 changes: 153 additions & 0 deletions src/batcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::{collections::VecDeque, sync::Arc, time::Duration};

use bdk_wallet::bitcoin::{self, Amount};
Zk2u marked this conversation as resolved.
Show resolved Hide resolved
use chrono::Utc;
use kanal::{unbounded_async, AsyncSender, SendError};
use parking_lot::{RwLock, RwLockWriteGuard};
use terrors::OneOf;
use tokio::{
select, spawn,
task::{spawn_blocking, JoinHandle},
time::interval,
};
use tracing::{error, info, info_span, Instrument};

use crate::l1::{fee_rate, L1Wallet, Persister, ESPLORA_CLIENT};

pub enum PayoutRequest {
L1(L1PayoutRequest),
}

pub struct L1PayoutRequest {
pub address: bitcoin::Address,
Zk2u marked this conversation as resolved.
Show resolved Hide resolved
pub amount: Amount,
}

pub struct Batcher {
task: Option<JoinHandle<()>>,
payout_sender: Option<AsyncSender<PayoutRequest>>,
cfg: BatcherConfig,
}

#[derive(Debug)]
pub struct BatcherNotStarted;

#[derive(Debug)]
#[allow(dead_code)]
pub struct BatcherNotAvailable(SendError);

#[derive(Debug, Clone)]
pub struct BatcherConfig {
pub period: Duration,
pub max_per_tx: usize,
pub max_in_flight: usize,
}

impl Batcher {
/// Creates a new `Batcher`.
/// You should call `Batcher::start` after this to start the batcher task,
/// otherwise the batcher won't do anything.
Comment on lines +47 to +49
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have been doclinked but it's fine

pub fn new(cfg: BatcherConfig) -> Self {
Self {
task: None,
payout_sender: None,
cfg,
}
}
Zk2u marked this conversation as resolved.
Show resolved Hide resolved

pub fn start(&mut self, l1_wallet: Arc<RwLock<L1Wallet>>) {
let (tx, rx) = unbounded_async();

let cfg = self.cfg.clone();

let span = info_span!("batcher");
let batcher_task = spawn(async move {
let mut batch_interval = interval(cfg.period);
let mut l1_payout_queue: VecDeque<L1PayoutRequest> = VecDeque::new();

loop {
select! {
// biased to ensure that even if we have incoming requests, they don't block
// each batch from being built when it's scheduled
biased;
Zk2u marked this conversation as resolved.
Show resolved Hide resolved
instant = batch_interval.tick() => {
if l1_payout_queue.is_empty() {
continue
}
let span = info_span!("batch processing", batch = ?instant);
let _guard = span.enter();

let mut l1w = l1_wallet.write();

let mut psbt = l1w.build_tx();
psbt.fee_rate(fee_rate());
let num_to_deque = cfg.max_per_tx.min(l1_payout_queue.len());
let mut total_sent = Amount::ZERO;
for req in l1_payout_queue.drain(..num_to_deque) {
psbt.add_recipient(req.address.script_pubkey(), req.amount);
total_sent += req.amount;
}
let mut psbt = match psbt.finish() {
Ok(psbt) => psbt,
Err(e) => {
error!("failed finalizing tx: {e:?}");
continue;
}
};

let l1w = RwLockWriteGuard::downgrade(l1w);

l1w.sign(&mut psbt, Default::default())
.expect("signing should not fail");
let tx = psbt.extract_tx().expect("fully signed psbt");

let l1_wallet = l1_wallet.clone();
let span = info_span!("broadcast l1 tx", batch = ?instant);
spawn(async move {
if let Err(e) = ESPLORA_CLIENT.broadcast(&tx).await {
error!("error broadcasting tx: {e:?}");
}
info!("sent {total_sent} to {num_to_deque} requestors");
// triple nested spawn!
spawn_blocking(move || {
let mut l1w = l1_wallet.write();
l1w.apply_unconfirmed_txs([(tx, Utc::now().timestamp() as u64)]);
l1w.persist(&mut Persister).expect("persist should work");
})
.await
.expect("successful blocking update");
}.instrument(span));
}
req = rx.recv() => match req {
Ok(req) => match req {
PayoutRequest::L1(req) => if l1_payout_queue.len() < cfg.max_in_flight {
l1_payout_queue.push_back(req)
}
},
Err(e) => error!("error receiving PayoutRequest: {e:?}")
}
}
}
}.instrument(span));

self.task = Some(batcher_task);
self.payout_sender = Some(tx);
}

pub async fn queue_payout_request(
&self,
req: PayoutRequest,
) -> Result<(), OneOf<(BatcherNotStarted, BatcherNotAvailable)>> {
let tx = self
.payout_sender
.as_ref()
.ok_or(OneOf::new(BatcherNotStarted))?
.clone();

tx.send(req)
.await
.map_err(|e| OneOf::new(BatcherNotAvailable(e)))?;

Ok(())
}
}
10 changes: 5 additions & 5 deletions src/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ use bdk_wallet::{
rusqlite::{self, Connection},
ChangeSet, KeychainKind, PersistedWallet, Wallet, WalletPersister,
};
use parking_lot::RwLock;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};

use crate::{seed::Seed, AppState, SETTINGS};
use crate::{seed::Seed, SETTINGS};

/// Live updating fee rate in sat/kwu
static FEE_RATE: AtomicU64 = AtomicU64::new(250);
Expand Down Expand Up @@ -141,11 +142,10 @@ impl L1Wallet {

/// Spawns a tokio task that scans the chain for the wallet's outputs
/// every 30 secs.
pub fn spawn_syncer(state: Arc<AppState>) {
pub fn spawn_syncer(l1_wallet: Arc<RwLock<L1Wallet>>) {
Zk2u marked this conversation as resolved.
Show resolved Hide resolved
tokio::spawn(async move {
loop {
let req = state
.l1_wallet
let req = l1_wallet
.read()
.start_full_scan()
.inspect({
Expand All @@ -169,7 +169,7 @@ impl L1Wallet {
{
// in a separate block otherwise compiler gets upset that we're holding
// this over the await point
let mut l1w = state.l1_wallet.write();
let mut l1w = l1_wallet.write();
l1w.apply_update(update)
.expect("should be able to connect to db");
l1w.persist(&mut Persister).expect("persist should work");
Expand Down
84 changes: 24 additions & 60 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! A simple faucet server that uses [`axum`] and [`bdk_wallet`]
//! to generate and dispense bitcoin.

mod batcher;
pub mod hex;
pub mod l1;
pub mod l2;
Expand Down Expand Up @@ -28,26 +29,27 @@ use axum::{
Json, Router,
};
use axum_client_ip::SecureClientIp;
use batcher::{Batcher, L1PayoutRequest, PayoutRequest};
use bdk_wallet::{
bitcoin::{address::NetworkUnchecked, Address as L1Address},
KeychainKind,
};
use chrono::Utc;
use hex::Hex;
use l1::{fee_rate, L1Wallet, Persister, ESPLORA_CLIENT};
use l1::{L1Wallet, Persister};
use l2::L2Wallet;
use parking_lot::{RwLock, RwLockWriteGuard};
use parking_lot::RwLock;
use pow::{Challenge, Nonce, Solution};
use seed::SavableSeed;
use serde::{Deserialize, Serialize};
use settings::SETTINGS;
use tokio::{net::TcpListener, task::spawn_blocking};
use tokio::net::TcpListener;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;

pub struct AppState {
l1_wallet: RwLock<L1Wallet>,
l1_wallet: Arc<RwLock<L1Wallet>>,
l2_wallet: L2Wallet,
batcher: Batcher,
}

pub static CRATE_NAME: LazyLock<String> =
Expand Down Expand Up @@ -78,14 +80,18 @@ async fn main() {
l1::spawn_fee_rate_task();

let l2_wallet = L2Wallet::new(&seed).expect("l2 wallet creation to succeed");
let l1_wallet = Arc::new(RwLock::new(l1_wallet));
let mut batcher = Batcher::new(SETTINGS.batcher.clone());
batcher.start(l1_wallet.clone());

L1Wallet::spawn_syncer(l1_wallet.clone());

let state = Arc::new(AppState {
l1_wallet: l1_wallet.into(),
l1_wallet,
l2_wallet,
batcher,
});

L1Wallet::spawn_syncer(state.clone());

let app = Router::new()
.route("/pow_challenge", get(get_pow_challenge))
.route("/claim_l1/:solution/:address", get(claim_l1))
Expand Down Expand Up @@ -127,7 +133,7 @@ async fn claim_l1(
SecureClientIp(ip): SecureClientIp,
Path((solution, address)): Path<(Hex<Solution>, L1Address<NetworkUnchecked>)>,
State(state): State<Arc<AppState>>,
) -> Result<String, (StatusCode, String)> {
) -> Result<(), (StatusCode, String)> {
let IpAddr::V4(ip) = ip else {
return Err((
StatusCode::BAD_REQUEST,
Expand All @@ -147,58 +153,16 @@ async fn claim_l1(
)
})?;

let psbt = {
let mut l1w = state.l1_wallet.write();
let balance = l1w.balance();
if balance.trusted_spendable() < SETTINGS.sats_per_claim {
return Err((
StatusCode::SERVICE_UNAVAILABLE,
"not enough bitcoin in the faucet".to_owned(),
));
}
let mut psbt = l1w
.build_tx()
.fee_rate(fee_rate())
.add_recipient(address.script_pubkey(), SETTINGS.sats_per_claim)
.clone()
.finish()
.expect("transaction to be constructed");
let l1w = RwLockWriteGuard::downgrade(l1w);
l1w.sign(&mut psbt, Default::default())
.expect("signing should not fail");
psbt
};

let tx = psbt.extract_tx().map_err(|e| {
error!("error extracting tx: {e:?}");
(
StatusCode::INTERNAL_SERVER_ERROR,
"error extracting tx".to_owned(),
)
})?;

if let Err(e) = ESPLORA_CLIENT.broadcast(&tx).await {
error!("error broadcasting tx: {e:?}");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"error broadcasting".to_owned(),
));
}

let txid = tx.compute_txid();

let state = state.clone();
spawn_blocking(move || {
let mut l1w = state.l1_wallet.write();
l1w.apply_unconfirmed_txs([(tx, Utc::now().timestamp() as u64)]);
l1w.persist(&mut Persister).expect("persist should work");
})
.await
.expect("successful blocking update");

info!("l1 claim to {address} via tx {}", txid);
state
.batcher
.queue_payout_request(PayoutRequest::L1(L1PayoutRequest {
address,
amount: SETTINGS.sats_per_claim,
}))
.await
.expect("successful queuing");

Ok(txid.to_string())
Ok(())
}

async fn claim_l2(
Expand Down
Loading