From 868a1dfc1f02540ec40da10ca3f549eb190e80b4 Mon Sep 17 00:00:00 2001 From: yukang Date: Sat, 16 Nov 2024 09:57:36 +0800 Subject: [PATCH] add db migration and tests --- src/main.rs | 10 +- src/migration/{migrate.rs => db_migrate.rs} | 6 +- src/migration/migration.rs | 28 +----- src/migration/migrations/sample.rs | 37 +++++++ src/migration/mod.rs | 6 +- src/migration/tests.rs | 1 - src/migration/tests/migrate.rs | 94 ++++++++++++++++++ src/migration/tests/mod.rs | 1 + src/store.rs | 102 +++++++++++++------- 9 files changed, 209 insertions(+), 76 deletions(-) rename src/migration/{migrate.rs => db_migrate.rs} (95%) create mode 100644 src/migration/migrations/sample.rs delete mode 100644 src/migration/tests.rs create mode 100644 src/migration/tests/migrate.rs create mode 100644 src/migration/tests/mod.rs diff --git a/src/main.rs b/src/main.rs index c34d8f04..926d1304 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ use ckb_chain_spec::ChainSpec; +use ckb_hash::blake2b_256; use ckb_resource::Resource; +use core::default::Default; use fnn::actors::RootActor; use fnn::cch::CchMessage; use fnn::ckb::{ @@ -13,15 +15,11 @@ use fnn::tasks::{ }; use fnn::watchtower::{WatchtowerActor, WatchtowerMessage}; use fnn::{start_cch, start_network, start_rpc, Config}; - -use core::default::Default; +use ractor::Actor; +use secp256k1::Secp256k1; use std::path::Path; use std::sync::Arc; use std::time::Duration; - -use ckb_hash::blake2b_256; -use ractor::Actor; -use secp256k1::Secp256k1; use tokio::sync::{mpsc, RwLock}; use tokio::{select, signal}; use tracing::{debug, error, info, info_span, trace}; diff --git a/src/migration/migrate.rs b/src/migration/db_migrate.rs similarity index 95% rename from src/migration/migrate.rs rename to src/migration/db_migrate.rs index 7b65371a..0448cb13 100644 --- a/src/migration/migrate.rs +++ b/src/migration/db_migrate.rs @@ -4,12 +4,12 @@ use rocksdb::{ops::Open, DB}; use std::{cmp::Ordering, path::Path, sync::Arc}; /// migrate helper -pub struct Migrate { +pub struct DbMigrate { migrations: Migrations, db: Arc, } -impl Migrate { +impl DbMigrate { /// Construct new migrate pub fn new>(path: P) -> Self { let mut migrations = Migrations::default(); @@ -17,7 +17,7 @@ impl Migrate { let db = Arc::new(DB::open_default(path).expect("Failed to open rocksdb")); - Migrate { migrations, db } + DbMigrate { migrations, db } } /// Check if database's version is matched with the executable binary version. diff --git a/src/migration/migration.rs b/src/migration/migration.rs index 29c4a300..f54f222b 100644 --- a/src/migration/migration.rs +++ b/src/migration/migration.rs @@ -22,12 +22,6 @@ pub struct Migrations { } impl Migrations { - pub fn new() -> Self { - Migrations { - migrations: BTreeMap::new(), - } - } - pub fn add_migration(&mut self, migration: Arc) { self.migrations .insert(migration.version().to_string(), migration); @@ -50,7 +44,7 @@ impl Migrations { String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8") } None => { - return Ordering::Equal; + return Ordering::Less; } }; @@ -65,26 +59,6 @@ impl Migrations { db_version.as_str().cmp(latest_version) } - /// Check if the migrations will consume a lot of time. - pub fn expensive(&self, db: Arc) -> bool { - let db_version = match db - .get(MIGRATION_VERSION_KEY) - .expect("get the version of database") - { - Some(version_bytes) => { - String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8") - } - None => { - return false; - } - }; - - let migrations = self.migrations.values(); - migrations - .skip_while(|m| m.version() <= db_version.as_str()) - .any(|m| m.expensive()) - } - fn run_migrate(&self, mut db: Arc, v: &str) -> Result, Error> { let mpb = Arc::new(MultiProgress::new()); let migrations: BTreeMap<_, _> = self diff --git a/src/migration/migrations/sample.rs b/src/migration/migrations/sample.rs new file mode 100644 index 00000000..679a519b --- /dev/null +++ b/src/migration/migrations/sample.rs @@ -0,0 +1,37 @@ +use crate::{migration::migration::Migration, Error}; +use indicatif::ProgressBar; +use rocksdb::DB; +use std::sync::Arc; + +const INIT_DB_VERSION: &str = "20351116135521"; + +pub struct SampleMigration { + version: String, +} + +impl SampleMigration { + pub fn new() -> Self { + Self { + version: INIT_DB_VERSION.to_string(), + } + } +} + +impl Migration for SampleMigration { + fn migrate( + &self, + db: Arc, + _pb: Arc ProgressBar + Send + Sync>, + ) -> Result, Error> { + eprintln!("SampleMigration::migrate ..........."); + Ok(db) + } + + fn version(&self) -> &str { + &self.version + } + + fn expensive(&self) -> bool { + false + } +} diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 4bcc27e0..1139b9e9 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -1,7 +1,7 @@ pub const MIGRATION_VERSION_KEY: &[u8] = b"db-version"; -pub mod migrate; -pub mod migration; -pub mod migrations; +pub mod db_migrate; +pub(crate) mod migration; +pub(crate) mod migrations; #[cfg(test)] mod tests; diff --git a/src/migration/tests.rs b/src/migration/tests.rs deleted file mode 100644 index 8b137891..00000000 --- a/src/migration/tests.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/migration/tests/migrate.rs b/src/migration/tests/migrate.rs new file mode 100644 index 00000000..2a4d6cc6 --- /dev/null +++ b/src/migration/tests/migrate.rs @@ -0,0 +1,94 @@ +use crate::migration::db_migrate::DbMigrate; +use crate::migration::migration::Migration; +use crate::migration::migration::Migrations; +use crate::Error; +use indicatif::ProgressBar; +use rocksdb::DB; +use std::cmp::Ordering; +use std::sync::{Arc, RwLock}; + +fn gen_path() -> std::path::PathBuf { + let tmp_dir = tempfile::Builder::new() + .prefix("test_fiber_") + .tempdir() + .unwrap(); + tmp_dir.as_ref().to_path_buf() +} + +#[test] +fn test_default_migration() { + let migrate = DbMigrate::new(gen_path()); + assert!(migrate.need_init()); + assert_eq!(migrate.check(), Ordering::Less); + migrate.init_db_version().unwrap(); + assert!(!migrate.need_init()); + assert_eq!(migrate.check(), Ordering::Equal); +} + +#[test] +fn test_run_migration() { + let run_count = Arc::new(RwLock::new(0)); + + pub struct DummyMigration { + version: String, + run_count: Arc>, + } + + impl DummyMigration { + pub fn new(version: &str, run_count: Arc>) -> Self { + Self { + version: version.to_string(), + run_count, + } + } + } + + impl Migration for DummyMigration { + fn migrate( + &self, + db: Arc, + _pb: Arc ProgressBar + Send + Sync>, + ) -> Result, Error> { + eprintln!("DummyMigration::migrate {} ... ", self.version); + let mut count = self.run_count.write().unwrap(); + *count += 1; + Ok(db) + } + + fn version(&self) -> &str { + &self.version + } + + fn expensive(&self) -> bool { + false + } + } + + let migrate = DbMigrate::new(gen_path()); + migrate.init_db_version().unwrap(); + let db = migrate.db(); + + let mut migrations = Migrations::default(); + migrations.add_migration(Arc::new(DummyMigration::new( + "20221116135521", + run_count.clone(), + ))); + + migrations.add_migration(Arc::new(DummyMigration::new( + "20251116135521", + run_count.clone(), + ))); + migrations.add_migration(Arc::new(DummyMigration::new( + "20251116135522", + run_count.clone(), + ))); + migrations.add_migration(Arc::new(DummyMigration::new( + "20251116135523", + run_count.clone(), + ))); + + assert_eq!(migrations.check(db.clone()), Ordering::Less); + migrations.migrate(db.clone()).unwrap(); + assert_eq!(*run_count.read().unwrap(), 3); + assert_eq!(migrations.check(db), Ordering::Equal); +} diff --git a/src/migration/tests/mod.rs b/src/migration/tests/mod.rs new file mode 100644 index 00000000..0a3a3373 --- /dev/null +++ b/src/migration/tests/mod.rs @@ -0,0 +1 @@ +mod migrate; diff --git a/src/store.rs b/src/store.rs index 5ca2d5e7..02708814 100644 --- a/src/store.rs +++ b/src/store.rs @@ -6,7 +6,7 @@ use crate::{ types::{Hash256, Pubkey}, }, invoice::{CkbInvoice, CkbInvoiceStatus, InvoiceError, InvoiceStore}, - migration::migrate::Migrate, + migration::db_migrate::DbMigrate, watchtower::{ChannelData, RevocationData, WatchtowerStore}, }; use ckb_jsonrpc_types::JsonBytes; @@ -14,8 +14,15 @@ use ckb_types::packed::{OutPoint, Script}; use ckb_types::prelude::Entity; use rocksdb::{prelude::*, DBIterator, Direction, IteratorMode, WriteBatch, DB}; use serde_json; -use std::{cmp::Ordering, path::Path, sync::Arc}; +use std::io::Write; +use std::{ + cmp::Ordering, + io::{stdin, stdout}, + path::Path, + sync::Arc, +}; use tentacle::secio::PeerId; +use tracing::{error, info}; #[derive(Clone)] pub struct Store { @@ -24,8 +31,7 @@ pub struct Store { impl Store { pub fn new>(path: P) -> Self { - //let db = Arc::new(DB::open_default(path).expect("Failed to open rocksdb")); - let db = open_or_create_db(path).expect("Failed to open rocksdb"); + let db = Self::open_or_create_db(path).expect("Failed to open rocksdb"); Self { db } } @@ -60,47 +66,71 @@ impl Store { wb: WriteBatch::default(), } } -} -/// Open or create a rocksdb -fn open_or_create_db>(path: P) -> Result, String> { - let migrate = Migrate::new(path.as_ref()); - if !migrate.need_init() { - match migrate.check() { - Ordering::Greater => { - eprintln!( - "The database was created by a higher version fiber executable binary \n\ + /// Open or create a rocksdb + fn open_or_create_db>(path: P) -> Result, String> { + let migrate = DbMigrate::new(path.as_ref()); + if !migrate.need_init() { + match migrate.check() { + Ordering::Greater => { + eprintln!( + "The database was created by a higher version fiber executable binary \n\ and cannot be opened by the current binary.\n\ Please download the latest fiber executable binary." - ); - return Err("incompatible database, need to upgrade bin".to_string()); - } - Ordering::Equal => { - eprintln!("no need to migrate, everything is OK ..."); - return Ok(migrate.db()); - } - Ordering::Less => { - let path_buf = path.as_ref().to_path_buf(); - eprintln!( - "For optimal performance, CKB recommends migrating your data into a new format.\n\ - If you prefer to stick with the older version, \n\ - it's important to note that they may have unfixed vulnerabilities.\n\ - Before migrating, we strongly recommend backuping your data directory.\n\ - To migrate, run `\"fiber\" migrate -C \"{}\"` and confirm by typing \"YES\".", - path_buf.display() ); - return Err("need to migrate".to_string()); + return Err("incompatible database, need to upgrade fiber binary".to_string()); + } + Ordering::Equal => { + eprintln!("no need to migrate, everything is OK ..."); + return Ok(migrate.db()); + } + Ordering::Less => { + let path_buf = path.as_ref().to_path_buf(); + let input = prompt(format!("\ + \n\ + Fiber need to run some database migrations.\n\ + \n\ + Once the migration started, the data will be no longer compatible with all older version,\n\ + so we strongly recommended you to backup the old data {} before migrating.\n\ + \n\ + If the migration failed, try to delete all data and sync from scratch.\n\ + \nIf you want to migrate the data, please input YES, otherwise, the current process will exit.\n\ + > ", path_buf.display()).as_str()); + + if input.trim().to_lowercase() != "yes" { + error!("Migration was declined since the user didn't confirm."); + return Err("need to migrate".to_string()); + } + info!("now begin to migrate db ..."); + let db = migrate.migrate().expect("failed to migrate db"); + info!("db migrated successfully ..."); + Ok(db) + } } + } else { + eprintln!("now begin to init db version ..."); + migrate + .init_db_version() + .expect("failed to init db version"); + Ok(migrate.db()) } - } else { - eprintln!("now begin to init db version ..."); - migrate - .init_db_version() - .expect("failed to init db version"); - Ok(migrate.db()) } } +pub fn prompt(msg: &str) -> String { + let stdout = stdout(); + let mut stdout = stdout.lock(); + let stdin = stdin(); + + write!(stdout, "{msg}").unwrap(); + stdout.flush().unwrap(); + + let mut input = String::new(); + let _ = stdin.read_line(&mut input); + + input +} + pub struct Batch { db: Arc, wb: WriteBatch,