Skip to content

Commit b4ce419

Browse files
authoredApr 16, 2025
Merge pull request #84 from buffrr/sync-cleanups
Refactor sync checks
2 parents 697c180 + 5f12d2b commit b4ce419

File tree

9 files changed

+158
-43
lines changed

9 files changed

+158
-43
lines changed
 

‎client/src/client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use spaces_protocol::{
1414
validate::{TxChangeSet, UpdateKind, Validator},
1515
Bytes, Covenant, FullSpaceOut, RevokeReason, SpaceOut,
1616
};
17-
use spaces_wallet::bitcoin::Transaction;
17+
use spaces_wallet::bitcoin::{Network, Transaction};
1818

1919
use crate::{
2020
source::BitcoinRpcError,
@@ -27,7 +27,7 @@ pub trait BlockSource {
2727
fn get_median_time(&self) -> Result<u64, BitcoinRpcError>;
2828
fn in_mempool(&self, txid: &Txid, height: u32) -> Result<bool, BitcoinRpcError>;
2929
fn get_block_count(&self) -> Result<u64, BitcoinRpcError>;
30-
fn get_best_chain(&self) -> Result<ChainAnchor, BitcoinRpcError>;
30+
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError>;
3131
}
3232

3333
#[derive(Debug, Clone)]

‎client/src/config.rs

+4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ pub struct Args {
7979
/// Skip maintaining historical root anchors
8080
#[arg(long, env = "SPACED_SKIP_ANCHORS", default_value = "false")]
8181
skip_anchors: bool,
82+
/// The specified Bitcoin RPC is a light client
83+
#[arg(long, env = "SPACED_BITCOIN_RPC_LIGHT", default_value = "false")]
84+
bitcoin_rpc_light: bool,
8285
}
8386

8487
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, ValueEnum, Serialize, Deserialize)]
@@ -164,6 +167,7 @@ impl Args {
164167
let rpc = BitcoinRpc::new(
165168
&args.bitcoin_rpc_url.expect("bitcoin rpc url"),
166169
bitcoin_rpc_auth,
170+
!args.bitcoin_rpc_light
167171
);
168172

169173
let genesis = Spaced::genesis(&rpc, args.chain).await?;

‎client/src/format.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,10 @@ pub fn print_list_unspent(utxos: Vec<WalletOutput>, format: Format) {
152152
pub fn print_server_info(info: ServerInfo, format: Format) {
153153
match format {
154154
Format::Text => {
155-
println!("CHAIN: {}", info.chain);
156-
println!(" Height {}", info.tip.height);
157-
println!(" Hash {}", info.tip.hash);
155+
println!("Network: {}", info.network);
156+
println!("Height {}", info.tip.height);
157+
println!("Hash {}", info.tip.hash);
158+
println!("Progress {:.2}%", info.progress * 100.0);
158159
}
159160
Format::Json => {
160161
println!("{}", serde_json::to_string_pretty(&info).unwrap());

‎client/src/rpc.rs

+51-14
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,16 @@ pub(crate) type Responder<T> = oneshot::Sender<T>;
6666

6767
#[derive(Debug, Clone, Serialize, Deserialize)]
6868
pub struct ServerInfo {
69-
pub chain: ExtendedNetwork,
69+
pub network: String,
7070
pub tip: ChainAnchor,
71+
pub chain: ChainInfo,
72+
pub progress: f32,
73+
}
74+
75+
#[derive(Debug, Clone, Serialize, Deserialize)]
76+
pub struct ChainInfo {
77+
blocks: u32,
78+
headers: u32,
7179
}
7280

7381
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -99,8 +107,8 @@ pub enum ChainStateCommand {
99107
txs: Vec<String>,
100108
resp: Responder<anyhow::Result<Vec<Option<TxChangeSet>>>>,
101109
},
102-
GetTip {
103-
resp: Responder<anyhow::Result<ChainAnchor>>,
110+
GetServerInfo {
111+
resp: Responder<anyhow::Result<ServerInfo>>,
104112
},
105113
GetSpace {
106114
hash: SpaceKey,
@@ -723,13 +731,12 @@ impl RpcServerImpl {
723731
#[async_trait]
724732
impl RpcServer for RpcServerImpl {
725733
async fn get_server_info(&self) -> Result<ServerInfo, ErrorObjectOwned> {
726-
let chain = self.wallet_manager.network;
727-
let tip = self
734+
let info = self
728735
.store
729-
.get_tip()
736+
.get_server_info()
730737
.await
731738
.map_err(|error| ErrorObjectOwned::owned(-1, error.to_string(), None::<String>))?;
732-
Ok(ServerInfo { chain, tip })
739+
Ok(info)
733740
}
734741

735742
async fn get_space(
@@ -1083,7 +1090,7 @@ impl AsyncChainState {
10831090
rpc,
10841091
chain_state,
10851092
)
1086-
.await?;
1093+
.await?;
10871094

10881095
Ok(block
10891096
.block_meta
@@ -1092,6 +1099,7 @@ impl AsyncChainState {
10921099
.find(|tx| &tx.changeset.txid == txid))
10931100
}
10941101

1102+
10951103
async fn get_indexed_block(
10961104
index: &mut Option<LiveSnapshot>,
10971105
height_or_hash: HeightOrHash,
@@ -1173,9 +1181,9 @@ impl AsyncChainState {
11731181
let result = emulator.apply_package(tip.height + 1, txs);
11741182
let _ = resp.send(result);
11751183
}
1176-
ChainStateCommand::GetTip { resp } => {
1184+
ChainStateCommand::GetServerInfo { resp } => {
11771185
let tip = chain_state.tip.read().expect("read meta").clone();
1178-
_ = resp.send(Ok(tip))
1186+
_ = resp.send(get_server_info(client, rpc, tip).await)
11791187
}
11801188
ChainStateCommand::GetSpace { hash, resp } => {
11811189
let result = chain_state.get_space_info(&hash);
@@ -1204,7 +1212,7 @@ impl AsyncChainState {
12041212
rpc,
12051213
chain_state,
12061214
)
1207-
.await;
1215+
.await;
12081216
let _ = resp.send(res);
12091217
}
12101218
ChainStateCommand::GetTxMeta { txid, resp } => {
@@ -1266,7 +1274,7 @@ impl AsyncChainState {
12661274
File::open(anchors_path)
12671275
.or_else(|e| Err(anyhow!("Could not open anchors file: {}", e)))?,
12681276
)
1269-
.or_else(|e| Err(anyhow!("Could not read anchors file: {}", e)))?;
1277+
.or_else(|e| Err(anyhow!("Could not read anchors file: {}", e)))?;
12701278
return Ok(anchors);
12711279
}
12721280

@@ -1498,9 +1506,9 @@ impl AsyncChainState {
14981506
resp_rx.await?
14991507
}
15001508

1501-
pub async fn get_tip(&self) -> anyhow::Result<ChainAnchor> {
1509+
pub async fn get_server_info(&self) -> anyhow::Result<ServerInfo> {
15021510
let (resp, resp_rx) = oneshot::channel();
1503-
self.sender.send(ChainStateCommand::GetTip { resp }).await?;
1511+
self.sender.send(ChainStateCommand::GetServerInfo { resp }).await?;
15041512
resp_rx.await?
15051513
}
15061514

@@ -1561,3 +1569,32 @@ fn get_space_key(space_or_hash: &str) -> Result<SpaceKey, ErrorObjectOwned> {
15611569

15621570
Ok(SpaceKey::from(hash))
15631571
}
1572+
1573+
1574+
async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainAnchor) -> anyhow::Result<ServerInfo> {
1575+
#[derive(Deserialize)]
1576+
struct Info {
1577+
pub chain: String,
1578+
pub headers: u32,
1579+
pub blocks: u32,
1580+
}
1581+
1582+
let info: Info = rpc
1583+
.send_json(client, &rpc.get_blockchain_info())
1584+
.await
1585+
.map_err(|e| anyhow!("Could not retrieve blockchain info ({})", e))?;
1586+
1587+
Ok(ServerInfo {
1588+
network: info.chain,
1589+
tip,
1590+
chain: ChainInfo {
1591+
blocks: info.blocks,
1592+
headers: info.headers,
1593+
},
1594+
progress: if info.headers != 0 && info.headers >= tip.height {
1595+
tip.height as f32 / info.headers as f32
1596+
} else {
1597+
0.0
1598+
},
1599+
})
1600+
}

‎client/src/source.rs

+57-15
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ use std::{
1212
use base64::Engine;
1313
use bitcoin::{Block, BlockHash, Txid};
1414
use hex::FromHexError;
15-
use log::error;
15+
use log::{error, warn};
1616
use reqwest::StatusCode;
1717
use serde::{de::DeserializeOwned, Deserialize, Serialize};
1818
use serde_json::Value;
1919
use spaces_protocol::constants::ChainAnchor;
2020
use spaces_wallet::{bitcoin, bitcoin::Transaction};
2121
use threadpool::ThreadPool;
2222
use tokio::time::Instant;
23-
23+
use spaces_protocol::bitcoin::Network;
2424
use crate::{client::BlockSource, std_wait};
2525

2626
const BITCOIN_RPC_IN_WARMUP: i32 = -28; // Client still warming up
@@ -34,9 +34,11 @@ pub struct BitcoinRpc {
3434
id: Arc<AtomicU64>,
3535
auth_token: Option<String>,
3636
url: String,
37+
legacy: bool
3738
}
3839

3940
pub struct BlockFetcher {
41+
chain: Network,
4042
src: BitcoinBlockSource,
4143
job_id: Arc<AtomicUsize>,
4244
sender: std::sync::mpsc::SyncSender<BlockEvent>,
@@ -121,18 +123,23 @@ trait ErrorForRpcBlocking {
121123
}
122124

123125
impl BitcoinRpc {
124-
pub fn new(url: &str, auth: BitcoinRpcAuth) -> Self {
126+
pub fn new(url: &str, auth: BitcoinRpcAuth, legacy: bool) -> Self {
125127
Self {
126128
id: Default::default(),
127129
auth_token: auth.to_token(),
128130
url: url.to_string(),
131+
legacy,
129132
}
130133
}
131134

132-
pub fn make_request(&self, method: &str, params: serde_json::Value) -> BitcoinRpcRequest {
135+
pub fn make_request(&self, method: &str, params: Value) -> BitcoinRpcRequest {
133136
let id = self.id.fetch_add(1, Ordering::Relaxed);
134137
let body = serde_json::json!({
135-
"jsonrpc": "1.0",
138+
"jsonrpc": if self.legacy {
139+
"1.0"
140+
} else {
141+
"2.0"
142+
},
136143
"id": id.to_string(),
137144
"method": method,
138145
"params": params,
@@ -381,12 +388,14 @@ impl BitcoinRpcAuth {
381388

382389
impl BlockFetcher {
383390
pub fn new(
391+
chain: Network,
384392
src: BitcoinBlockSource,
385393
num_workers: usize,
386394
) -> (Self, std::sync::mpsc::Receiver<BlockEvent>) {
387395
let (tx, rx) = std::sync::mpsc::sync_channel(12);
388396
(
389397
Self {
398+
chain,
390399
src,
391400
job_id: Arc::new(AtomicUsize::new(0)),
392401
sender: tx,
@@ -401,10 +410,15 @@ impl BlockFetcher {
401410
}
402411

403412
fn should_sync(
413+
expected_chain: Network,
404414
source: &BitcoinBlockSource,
405415
start: ChainAnchor,
406416
) -> Result<Option<ChainAnchor>, BlockFetchError> {
407-
let tip = source.get_best_chain()?;
417+
let tip = match source.get_best_chain(Some(start.height), expected_chain)? {
418+
Some(tip) => tip,
419+
None => return Ok(None),
420+
};
421+
408422
if start.height > tip.height {
409423
return Err(BlockFetchError::BlockMismatch);
410424
}
@@ -437,6 +451,7 @@ impl BlockFetcher {
437451
let current_task = self.job_id.clone();
438452
let task_sender = self.sender.clone();
439453
let num_workers = self.num_workers;
454+
let chain = self.chain;
440455

441456
_ = std::thread::spawn(move || {
442457
let mut last_check = Instant::now() - Duration::from_secs(2);
@@ -451,7 +466,7 @@ impl BlockFetcher {
451466
}
452467
last_check = Instant::now();
453468

454-
let tip = match BlockFetcher::should_sync(&task_src, checkpoint) {
469+
let tip = match BlockFetcher::should_sync(chain, &task_src, checkpoint) {
455470
Ok(t) => t,
456471
Err(e) => {
457472
_ = task_sender.send(BlockEvent::Error(e));
@@ -872,21 +887,48 @@ impl BlockSource for BitcoinBlockSource {
872887
.send_json_blocking(&self.client, &self.rpc.get_block_count())?)
873888
}
874889

875-
fn get_best_chain(&self) -> Result<ChainAnchor, BitcoinRpcError> {
890+
fn get_best_chain(&self, tip: Option<u32>, expected_chain: Network) -> Result<Option<ChainAnchor>, BitcoinRpcError> {
876891
#[derive(Deserialize)]
877892
struct Info {
878-
#[serde(rename = "blocks")]
879-
height: u64,
893+
pub chain: String,
894+
pub blocks: u32,
895+
pub headers: u32,
880896
#[serde(rename = "bestblockhash")]
881-
hash: BlockHash,
897+
pub best_block_hash: BlockHash,
882898
}
883899
let info: Info = self
884900
.rpc
885901
.send_json_blocking(&self.client, &self.rpc.get_blockchain_info())?;
886902

887-
Ok(ChainAnchor {
888-
hash: info.hash,
889-
height: info.height as _,
890-
})
903+
let expected_chain = match expected_chain {
904+
Network::Bitcoin => "main",
905+
Network::Regtest => "regtest",
906+
_ => "test"
907+
};
908+
if info.chain != expected_chain {
909+
warn!("Invalid chain from connected rpc node - expected {}, got {}", expected_chain, info.chain);
910+
return Ok(None);
911+
}
912+
913+
let synced = info.headers == info.blocks;
914+
let best_chain = if !synced {
915+
let block_hash = self.get_block_hash(info.blocks)?;
916+
ChainAnchor {
917+
hash: block_hash,
918+
height: info.blocks,
919+
}
920+
} else {
921+
ChainAnchor {
922+
hash: info.best_block_hash,
923+
height: info.headers,
924+
}
925+
};
926+
927+
// If the source is still syncing, and we have a higher tip, wait.
928+
if !synced && tip.is_some_and(|tip| tip > info.blocks) {
929+
return Ok(None);
930+
}
931+
932+
Ok(Some(best_chain))
891933
}
892934
}

‎client/src/sync.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,11 @@ impl Spaced {
189189
start_block.hash, start_block.height
190190
);
191191

192-
let (fetcher, receiver) = BlockFetcher::new(source.clone(), self.num_workers);
192+
let (fetcher, receiver) = BlockFetcher::new(
193+
self.network.fallback_network(),
194+
source.clone(),
195+
self.num_workers,
196+
);
193197
fetcher.start(start_block);
194198

195199
let mut shutdown_signal = shutdown.subscribe();

0 commit comments

Comments
 (0)