From e0af06ed1c6e4442826338d7ea3e405885596b00 Mon Sep 17 00:00:00 2001 From: Chen22 Date: Sat, 29 Jul 2023 18:32:06 -0700 Subject: [PATCH 1/2] add recover rollup command --- src/node/src/command.rs | 104 ++++++++++++++++++++++++--- src/node/src/node_test_base.rs | 9 ++- src/node/src/recover.rs | 54 ++++++++++++-- src/storage/src/mutation_store.rs | 113 ++++++++++++++++++++++++------ 4 files changed, 242 insertions(+), 38 deletions(-) diff --git a/src/node/src/command.rs b/src/node/src/command.rs index 5f6c2274..a380051f 100644 --- a/src/node/src/command.rs +++ b/src/node/src/command.rs @@ -16,7 +16,7 @@ // use crate::indexer_impl::IndexerNodeImpl; -use crate::recover::{Recover, RecoverConfig}; +use crate::recover::{Recover, RecoverConfig, RecoverType}; use crate::rollup_executor::RollupExecutorConfig; use crate::storage_node_light_impl::{StorageNodeV2Config, StorageNodeV2Impl}; use crate::system_impl::SystemImpl; @@ -35,7 +35,7 @@ use db3_storage::db_store_v2::{DBStoreV2, DBStoreV2Config}; use db3_storage::doc_store::DocStoreConfig; use db3_storage::key_store::KeyStore; use db3_storage::key_store::KeyStoreConfig; -use db3_storage::mutation_store::MutationStoreConfig; +use db3_storage::mutation_store::{MutationStore, MutationStoreConfig}; use db3_storage::state_store::{StateStore, StateStoreConfig}; use db3_storage::system_store::{SystemRole, SystemStore, SystemStoreConfig}; use ethers::prelude::LocalWallet; @@ -183,6 +183,35 @@ pub enum RecoverCommand { verbose: bool, }, // TODO: support recover rollup + #[clap(name = "rollup")] + Rollup { + /// The database path for mutation + #[clap(long, default_value = "./mutation_db")] + mutation_db_path: String, + /// The database path for state + #[clap(long, default_value = "./state_db")] + state_db_path: String, + /// The database path for doc db + #[clap(long, default_value = "./doc_db")] + doc_db_path: String, + #[clap(short, long, default_value = "./rollup_meta_db")] + meta_db_path: String, + #[clap(short, long, default_value = "./keys")] + key_root_path: String, + #[clap(short, long, default_value = "./recover_rollup_temp")] + recover_temp_path: String, + #[clap( + short, + long, + default_value = "0x0000000000000000000000000000000000000000" + )] + admin_addr: String, + /// this is just for upgrade the node + #[clap(long, default_value = "100000")] + doc_id_start: i64, + #[clap(short, long)] + verbose: bool, + }, } impl DB3Command { fn build_wallet(key_root_path: &str) -> std::result::Result { @@ -382,6 +411,41 @@ impl DB3Command { info!("exit standalone indexer") } DB3Command::Recover { cmd } => match cmd { + RecoverCommand::Rollup { + mutation_db_path, + state_db_path, + doc_db_path, + meta_db_path, + key_root_path, + recover_temp_path, + admin_addr, + doc_id_start, + verbose, + } => { + let log_level = if verbose { + LevelFilter::DEBUG + } else { + LevelFilter::INFO + }; + + tracing_subscriber::fmt().with_max_level(log_level).init(); + info!("{ABOUT}"); + let recover = Self::create_recover( + mutation_db_path, + meta_db_path, + state_db_path, + doc_db_path, + key_root_path, + recover_temp_path, + admin_addr, + doc_id_start, + RecoverType::Rollup, + ) + .await; + info!("start recovering index node"); + recover.recover_stat().unwrap(); + recover.recover_from_ar().await.unwrap(); + } RecoverCommand::Index { meta_db_path, state_db_path, @@ -401,6 +465,7 @@ impl DB3Command { tracing_subscriber::fmt().with_max_level(log_level).init(); info!("{ABOUT}"); let recover = Self::create_recover( + "".to_string(), meta_db_path, state_db_path, doc_db_path, @@ -408,7 +473,7 @@ impl DB3Command { recover_temp_path, admin_addr, doc_id_start, - SystemRole::DataIndexNode, + RecoverType::Index, ) .await; info!("start recovering index node"); @@ -418,6 +483,7 @@ impl DB3Command { } } async fn create_recover( + mutation_db_path: String, meta_db_path: String, state_db_path: String, doc_db_path: String, @@ -425,7 +491,7 @@ impl DB3Command { recover_temp_path: String, _admin_addr: String, doc_id_start: i64, - role: SystemRole, + recover_type: RecoverType, ) -> Recover { let system_store_config = SystemStoreConfig { key_root_path: key_root_path.to_string(), @@ -445,6 +511,10 @@ impl DB3Command { in_memory_db_handle_limit: 16, }; + let enable_doc_store = match recover_type { + RecoverType::Index => true, + RecoverType::Rollup => false, + }; let db_store_config = DBStoreV2Config { db_path: meta_db_path.to_string(), db_store_cf_name: "db_store_cf".to_string(), @@ -454,20 +524,38 @@ impl DB3Command { doc_owner_store_cf_name: "doc_owner_store_cf".to_string(), db_owner_store_cf_name: "db_owner_cf".to_string(), scan_max_limit: 1000, - enable_doc_store: true, + enable_doc_store, doc_store_conf, doc_start_id: doc_id_start, }; let db_store = DBStoreV2::new(db_store_config.clone()).unwrap(); + + let storage = match recover_type { + RecoverType::Rollup => { + let mutation_store_config = MutationStoreConfig { + db_path: mutation_db_path.to_string(), + block_store_cf_name: "block_store_cf".to_string(), + tx_store_cf_name: "tx_store_cf".to_string(), + rollup_store_cf_name: "rollup_store_cf".to_string(), + gc_cf_name: "gc_store_cf".to_string(), + message_max_buffer: 4 * 1024, + scan_max_limit: 50, + block_state_cf_name: "block_state_cf".to_string(), + }; + let store = MutationStore::new(mutation_store_config).unwrap(); + Some(Arc::new(store)) + } + RecoverType::Index => None, + }; + std::fs::create_dir_all(recover_temp_path.as_str()).unwrap(); let recover_config = RecoverConfig { key_root_path: key_root_path.to_string(), temp_data_path: recover_temp_path.to_string(), - enable_mutation_recover: false, - role, + recover_type, }; - Recover::new(recover_config, db_store, system_store) + Recover::new(recover_config, db_store, system_store, storage) .await .unwrap() } diff --git a/src/node/src/node_test_base.rs b/src/node/src/node_test_base.rs index b5d97743..0c0420c1 100644 --- a/src/node/src/node_test_base.rs +++ b/src/node/src/node_test_base.rs @@ -17,7 +17,7 @@ #[cfg(test)] pub mod tests { - use crate::recover::{Recover, RecoverConfig}; + use crate::recover::{Recover, RecoverConfig, RecoverType}; use crate::rollup_executor::{RollupExecutor, RollupExecutorConfig}; use db3_crypto::db3_address::DB3Address; use db3_error::Result; @@ -106,16 +106,14 @@ pub mod tests { let recover_index_config = RecoverConfig { key_root_path: key_root_path.to_string(), temp_data_path: format!("{real_path}/recover_index_temp_data"), - enable_mutation_recover: true, - role: SystemRole::DataIndexNode, + recover_type: RecoverType::Index, }; if let Err(_e) = std::fs::create_dir_all(recover_index_config.temp_data_path.as_str()) { } let recover_rollup_config = RecoverConfig { key_root_path: key_root_path.to_string(), temp_data_path: format!("{real_path}/recover_rollup_temp_data"), - enable_mutation_recover: true, - role: SystemRole::DataRollupNode, + recover_type: RecoverType::Rollup, }; if let Err(_e) = std::fs::create_dir_all(recover_rollup_config.temp_data_path.as_str()) { @@ -193,6 +191,7 @@ pub mod tests { recover_rollup_config, db_store.clone(), system_store.clone(), + None, ) .await?; Ok((rollup_executor, rollup_recover)) diff --git a/src/node/src/recover.rs b/src/node/src/recover.rs index cf032dc2..f5ba294a 100644 --- a/src/node/src/recover.rs +++ b/src/node/src/recover.rs @@ -17,29 +17,38 @@ use crate::ar_toolbox::ArToolBox; use crate::mutation_utils::MutationUtil; +use bytes::BytesMut; +use db3_crypto::id::TxId; use db3_error::{DB3Error, Result}; -use db3_proto::db3_mutation_v2_proto::MutationAction; +use db3_proto::db3_mutation_v2_proto::{MutationAction, MutationBody, MutationHeader}; use db3_storage::ar_fs::{ArFileSystem, ArFileSystemConfig}; use db3_storage::db_store_v2::DBStoreV2; use db3_storage::meta_store_client::MetaStoreClient; +use db3_storage::mutation_store::MutationStore; use db3_storage::system_store::{SystemRole, SystemStore}; use ethers::prelude::Signer; +use prost::Message; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tracing::{debug, info}; +#[derive(Clone)] +pub enum RecoverType { + Index, + Rollup, +} #[derive(Clone)] pub struct RecoverConfig { pub key_root_path: String, pub temp_data_path: String, - pub enable_mutation_recover: bool, - pub role: SystemRole, + pub recover_type: RecoverType, } pub struct Recover { pub config: RecoverConfig, pub ar_toolbox: Arc, pub meta_store: Arc, pub db_store: Arc, + pub storage: Option>, network_id: Arc, } @@ -48,8 +57,13 @@ impl Recover { config: RecoverConfig, db_store: DBStoreV2, system_store: Arc, + storage: Option>, ) -> Result { - let system_config = match system_store.get_config(&config.role) { + let role = match config.recover_type { + RecoverType::Index => SystemRole::DataIndexNode, + RecoverType::Rollup => SystemRole::DataRollupNode, + }; + let system_config = match system_store.get_config(&role) { Ok(Some(system_config)) => system_config, Ok(None) => { return Err(DB3Error::StoreEventError( @@ -87,6 +101,7 @@ impl Recover { ar_toolbox, meta_store, db_store: Arc::new(db_store), + storage, network_id, }) } @@ -95,6 +110,14 @@ impl Recover { Ok(()) } + pub fn recover_stat(&self) -> Result<()> { + self.db_store.recover_db_state()?; + if let Some(s) = &self.storage { + s.recover()?; + } + Ok(()) + } + pub async fn recover_from_ar(&self) -> Result<()> { info!("start recover from arweave"); let last_block = self.db_store.recover_block_state()?; @@ -134,6 +157,13 @@ impl Recover { Ok(from_block) } + pub fn is_recover_rollup(&self) -> bool { + match self.config.recover_type { + RecoverType::Rollup => true, + _ => false, + } + } + /// recover from arweave tx async fn recover_from_arweave_tx(&self, tx: &str, version: Option) -> Result<()> { debug!("recover_from_arweave_tx: {}, version {:?}", tx, version); @@ -160,6 +190,22 @@ impl Recover { order.clone(), &doc_ids_map, )?; + + if self.is_recover_rollup() { + if let Some(s) = &self.storage { + s.update_mutation_stat( + &body.payload, + body.signature.as_str(), + doc_ids.as_str(), + &address, + nonce, + *block, + *order, + self.network_id.load(Ordering::Relaxed), + action, + )?; + } + } } } diff --git a/src/storage/src/mutation_store.rs b/src/storage/src/mutation_store.rs index 5f0c9f03..26141680 100644 --- a/src/storage/src/mutation_store.rs +++ b/src/storage/src/mutation_store.rs @@ -15,7 +15,7 @@ // limitations under the License. // -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use db3_base::times; use db3_crypto::db3_address::DB3Address; use db3_crypto::id::TxId; @@ -480,6 +480,17 @@ impl MutationStore { } } + fn set_block_state(&self, block: u64, order: u32) -> Result<()> { + match self.block_state.lock() { + Ok(mut state) => { + state.block = block; + state.order = order; + Ok(()) + } + Err(e) => Err(DB3Error::WriteStoreError(format!("{e}"))), + } + } + pub fn gc_range_mutation(&self, block_start: u64, block_end: u64) -> Result<()> { if block_start >= block_end { return Err(DB3Error::ReadStoreError("invalid block range".to_string())); @@ -593,10 +604,10 @@ impl MutationStore { Ok((hex_id, block, order)) } - pub fn add_mutation( + fn encode_mutation_header( &self, - payload: &[u8], - signature: &str, + hex_id: &str, + mutation_body_size: u32, doc_ids_map: &str, sender: &DB3Address, nonce: u64, @@ -604,28 +615,14 @@ impl MutationStore { order: u32, network: u64, action: MutationAction, - ) -> Result<(String, u64, u32)> { - let tx_id = TxId::from((payload, signature.as_bytes())); - let hex_id = tx_id.to_hex(); - let mut encoded_id: Vec = Vec::new(); - encoded_id.extend_from_slice(&block.to_be_bytes()); - encoded_id.extend_from_slice(&order.to_be_bytes()); - let mutation_body = MutationBody { - payload: payload.to_vec(), - signature: signature.to_string(), - }; - let mut buf = BytesMut::with_capacity(self.config.message_max_buffer); - mutation_body - .encode(&mut buf) - .map_err(|e| DB3Error::WriteStoreError(format!("{e}")))?; - let buf = buf.freeze(); + ) -> Result { let mutation_header = MutationHeader { block_id: block, order_id: order, sender: sender.as_ref().to_vec(), time: times::get_current_time_in_secs(), id: hex_id.to_string(), - size: buf.len() as u32, + size: mutation_body_size, nonce, network, action: action.into(), @@ -635,7 +632,49 @@ impl MutationStore { mutation_header .encode(&mut header_buf) .map_err(|e| DB3Error::WriteStoreError(format!("{e}")))?; - let header_buf = header_buf.freeze(); + Ok(header_buf.freeze()) + } + + fn encode_mutation_body(&self, payload: &[u8], signature: &str) -> Result { + let mutation_body = MutationBody { + payload: payload.to_vec(), + signature: signature.to_string(), + }; + let mut buf = BytesMut::with_capacity(self.config.message_max_buffer); + mutation_body + .encode(&mut buf) + .map_err(|e| DB3Error::WriteStoreError(format!("{e}")))?; + Ok(buf.freeze()) + } + pub fn add_mutation( + &self, + payload: &[u8], + signature: &str, + doc_ids_map: &str, + sender: &DB3Address, + nonce: u64, + block: u64, + order: u32, + network: u64, + action: MutationAction, + ) -> Result<(String, u64, u32)> { + let tx_id = TxId::from((payload, signature.as_bytes())); + let hex_id = tx_id.to_hex(); + let mut encoded_id: Vec = Vec::new(); + encoded_id.extend_from_slice(&block.to_be_bytes()); + encoded_id.extend_from_slice(&order.to_be_bytes()); + let buf = self.encode_mutation_body(payload, signature)?; + let header_buf = self.encode_mutation_header( + hex_id.as_str(), + buf.len() as u32, + doc_ids_map, + sender, + nonce, + block, + order, + network, + action, + )?; let tx_cf_handle = self .se .cf_handle(self.config.tx_store_cf_name.as_str()) @@ -657,6 +696,38 @@ impl MutationStore { .fetch_add((buf.len() + header_buf.len()) as u64, Ordering::Relaxed); Ok((hex_id, block, order)) } + + pub fn update_mutation_stat( + &self, + payload: &[u8], + signature: &str, + doc_ids_map: &str, + sender: &DB3Address, + nonce: u64, + block: u64, + order: u32, + network: u64, + action: MutationAction, + ) -> Result<()> { + let tx_id = TxId::from((payload, signature.as_bytes())); + let hex_id = tx_id.to_hex(); + let buf = self.encode_mutation_body(payload, signature)?; + let header_buf = self.encode_mutation_header( + hex_id.as_str(), + buf.len() as u32, + doc_ids_map, + sender, + nonce, + block, + order, + network, + action, + )?; + self.mutation_count.fetch_add(1, Ordering::Relaxed); + self.total_mutation_bytes + .fetch_add((buf.len() + header_buf.len()) as u64, Ordering::Relaxed); + self.set_block_state(block, order) + } } #[cfg(test)] From 3ca48504faf2966827bf3b1568fc0432d02cfc83 Mon Sep 17 00:00:00 2001 From: Chen22 Date: Sat, 29 Jul 2023 19:07:12 -0700 Subject: [PATCH 2/2] combine ut to increase test stability --- src/node/src/recover.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/src/node/src/recover.rs b/src/node/src/recover.rs index f5ba294a..41390dac 100644 --- a/src/node/src/recover.rs +++ b/src/node/src/recover.rs @@ -252,25 +252,6 @@ mod tests { use std::thread::sleep; use tempdir::TempDir; - #[tokio::test] - async fn test_get_latest_arweave_tx() { - sleep(std::time::Duration::from_secs(1)); - let tmp_dir_path = TempDir::new("test_get_latest_arweave_tx").expect("create temp dir"); - match NodeTestBase::setup_for_smoke_test(&tmp_dir_path).await { - Ok((rollup_executor, recover)) => { - let result = rollup_executor.process().await; - assert_eq!(true, result.is_ok(), "{:?}", result); - let result = recover.get_latest_arweave_tx().await; - assert_eq!(true, result.is_ok(), "{:?}", result); - let tx = result.unwrap(); - assert!(!tx.is_empty()); - } - Err(e) => { - assert!(false, "{e}"); - } - } - } - #[tokio::test] async fn test_fetch_arware_tx_from_block() { sleep(std::time::Duration::from_secs(3));