Skip to content

Commit

Permalink
add db migration and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Nov 16, 2024
1 parent 133f9d8 commit 868a1df
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 76 deletions.
10 changes: 4 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use ckb_chain_spec::ChainSpec;
use ckb_hash::blake2b_256;
use ckb_resource::Resource;
use core::default::Default;
use fnn::actors::RootActor;
use fnn::cch::CchMessage;
use fnn::ckb::{
Expand All @@ -13,15 +15,11 @@ use fnn::tasks::{
};
use fnn::watchtower::{WatchtowerActor, WatchtowerMessage};
use fnn::{start_cch, start_network, start_rpc, Config};

use core::default::Default;
use ractor::Actor;
use secp256k1::Secp256k1;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use ckb_hash::blake2b_256;
use ractor::Actor;
use secp256k1::Secp256k1;
use tokio::sync::{mpsc, RwLock};
use tokio::{select, signal};
use tracing::{debug, error, info, info_span, trace};
Expand Down
6 changes: 3 additions & 3 deletions src/migration/migrate.rs → src/migration/db_migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ use rocksdb::{ops::Open, DB};
use std::{cmp::Ordering, path::Path, sync::Arc};

/// migrate helper
pub struct Migrate {
pub struct DbMigrate {
migrations: Migrations,
db: Arc<DB>,
}

impl Migrate {
impl DbMigrate {
/// Construct new migrate
pub fn new<P: AsRef<Path>>(path: P) -> Self {
let mut migrations = Migrations::default();
migrations.add_migration(Arc::new(DefaultMigration::new()));

let db = Arc::new(DB::open_default(path).expect("Failed to open rocksdb"));

Migrate { migrations, db }
DbMigrate { migrations, db }
}

/// Check if database's version is matched with the executable binary version.
Expand Down
28 changes: 1 addition & 27 deletions src/migration/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ pub struct Migrations {
}

impl Migrations {
pub fn new() -> Self {
Migrations {
migrations: BTreeMap::new(),
}
}

pub fn add_migration(&mut self, migration: Arc<dyn Migration>) {
self.migrations
.insert(migration.version().to_string(), migration);
Expand All @@ -50,7 +44,7 @@ impl Migrations {
String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
}
None => {
return Ordering::Equal;
return Ordering::Less;
}
};

Expand All @@ -65,26 +59,6 @@ impl Migrations {
db_version.as_str().cmp(latest_version)
}

/// Check if the migrations will consume a lot of time.
pub fn expensive(&self, db: Arc<DB>) -> bool {
let db_version = match db
.get(MIGRATION_VERSION_KEY)
.expect("get the version of database")
{
Some(version_bytes) => {
String::from_utf8(version_bytes.to_vec()).expect("version bytes to utf8")
}
None => {
return false;
}
};

let migrations = self.migrations.values();
migrations
.skip_while(|m| m.version() <= db_version.as_str())
.any(|m| m.expensive())
}

fn run_migrate(&self, mut db: Arc<DB>, v: &str) -> Result<Arc<DB>, Error> {
let mpb = Arc::new(MultiProgress::new());
let migrations: BTreeMap<_, _> = self
Expand Down
37 changes: 37 additions & 0 deletions src/migration/migrations/sample.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use crate::{migration::migration::Migration, Error};
use indicatif::ProgressBar;
use rocksdb::DB;
use std::sync::Arc;

const INIT_DB_VERSION: &str = "20351116135521";

pub struct SampleMigration {
version: String,
}

impl SampleMigration {
pub fn new() -> Self {
Self {
version: INIT_DB_VERSION.to_string(),
}
}
}

impl Migration for SampleMigration {
fn migrate(
&self,
db: Arc<DB>,
_pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
) -> Result<Arc<DB>, Error> {
eprintln!("SampleMigration::migrate ...........");
Ok(db)
}

fn version(&self) -> &str {
&self.version
}

fn expensive(&self) -> bool {
false
}
}
6 changes: 3 additions & 3 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub const MIGRATION_VERSION_KEY: &[u8] = b"db-version";

pub mod migrate;
pub mod migration;
pub mod migrations;
pub mod db_migrate;
pub(crate) mod migration;
pub(crate) mod migrations;
#[cfg(test)]
mod tests;
1 change: 0 additions & 1 deletion src/migration/tests.rs

This file was deleted.

94 changes: 94 additions & 0 deletions src/migration/tests/migrate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::migration::db_migrate::DbMigrate;
use crate::migration::migration::Migration;
use crate::migration::migration::Migrations;
use crate::Error;
use indicatif::ProgressBar;
use rocksdb::DB;
use std::cmp::Ordering;
use std::sync::{Arc, RwLock};

fn gen_path() -> std::path::PathBuf {
let tmp_dir = tempfile::Builder::new()
.prefix("test_fiber_")
.tempdir()
.unwrap();
tmp_dir.as_ref().to_path_buf()
}

#[test]
fn test_default_migration() {
let migrate = DbMigrate::new(gen_path());
assert!(migrate.need_init());
assert_eq!(migrate.check(), Ordering::Less);
migrate.init_db_version().unwrap();
assert!(!migrate.need_init());
assert_eq!(migrate.check(), Ordering::Equal);
}

#[test]
fn test_run_migration() {
let run_count = Arc::new(RwLock::new(0));

pub struct DummyMigration {
version: String,
run_count: Arc<RwLock<usize>>,
}

impl DummyMigration {
pub fn new(version: &str, run_count: Arc<RwLock<usize>>) -> Self {
Self {
version: version.to_string(),
run_count,
}
}
}

impl Migration for DummyMigration {
fn migrate(
&self,
db: Arc<DB>,
_pb: Arc<dyn Fn(u64) -> ProgressBar + Send + Sync>,
) -> Result<Arc<DB>, Error> {
eprintln!("DummyMigration::migrate {} ... ", self.version);
let mut count = self.run_count.write().unwrap();
*count += 1;
Ok(db)
}

fn version(&self) -> &str {
&self.version
}

fn expensive(&self) -> bool {
false
}
}

let migrate = DbMigrate::new(gen_path());
migrate.init_db_version().unwrap();
let db = migrate.db();

let mut migrations = Migrations::default();
migrations.add_migration(Arc::new(DummyMigration::new(
"20221116135521",
run_count.clone(),
)));

migrations.add_migration(Arc::new(DummyMigration::new(
"20251116135521",
run_count.clone(),
)));
migrations.add_migration(Arc::new(DummyMigration::new(
"20251116135522",
run_count.clone(),
)));
migrations.add_migration(Arc::new(DummyMigration::new(
"20251116135523",
run_count.clone(),
)));

assert_eq!(migrations.check(db.clone()), Ordering::Less);
migrations.migrate(db.clone()).unwrap();
assert_eq!(*run_count.read().unwrap(), 3);
assert_eq!(migrations.check(db), Ordering::Equal);
}
1 change: 1 addition & 0 deletions src/migration/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod migrate;
102 changes: 66 additions & 36 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,23 @@ use crate::{
types::{Hash256, Pubkey},
},
invoice::{CkbInvoice, CkbInvoiceStatus, InvoiceError, InvoiceStore},
migration::migrate::Migrate,
migration::db_migrate::DbMigrate,
watchtower::{ChannelData, RevocationData, WatchtowerStore},
};
use ckb_jsonrpc_types::JsonBytes;
use ckb_types::packed::{OutPoint, Script};
use ckb_types::prelude::Entity;
use rocksdb::{prelude::*, DBIterator, Direction, IteratorMode, WriteBatch, DB};
use serde_json;
use std::{cmp::Ordering, path::Path, sync::Arc};
use std::io::Write;
use std::{
cmp::Ordering,
io::{stdin, stdout},
path::Path,
sync::Arc,
};
use tentacle::secio::PeerId;
use tracing::{error, info};

#[derive(Clone)]
pub struct Store {
Expand All @@ -24,8 +31,7 @@ pub struct Store {

impl Store {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
//let db = Arc::new(DB::open_default(path).expect("Failed to open rocksdb"));
let db = open_or_create_db(path).expect("Failed to open rocksdb");
let db = Self::open_or_create_db(path).expect("Failed to open rocksdb");
Self { db }
}

Expand Down Expand Up @@ -60,47 +66,71 @@ impl Store {
wb: WriteBatch::default(),
}
}
}

/// Open or create a rocksdb
fn open_or_create_db<P: AsRef<Path>>(path: P) -> Result<Arc<DB>, String> {
let migrate = Migrate::new(path.as_ref());
if !migrate.need_init() {
match migrate.check() {
Ordering::Greater => {
eprintln!(
"The database was created by a higher version fiber executable binary \n\
/// Open or create a rocksdb
fn open_or_create_db<P: AsRef<Path>>(path: P) -> Result<Arc<DB>, String> {
let migrate = DbMigrate::new(path.as_ref());
if !migrate.need_init() {
match migrate.check() {
Ordering::Greater => {
eprintln!(
"The database was created by a higher version fiber executable binary \n\
and cannot be opened by the current binary.\n\
Please download the latest fiber executable binary."
);
return Err("incompatible database, need to upgrade bin".to_string());
}
Ordering::Equal => {
eprintln!("no need to migrate, everything is OK ...");
return Ok(migrate.db());
}
Ordering::Less => {
let path_buf = path.as_ref().to_path_buf();
eprintln!(
"For optimal performance, CKB recommends migrating your data into a new format.\n\
If you prefer to stick with the older version, \n\
it's important to note that they may have unfixed vulnerabilities.\n\
Before migrating, we strongly recommend backuping your data directory.\n\
To migrate, run `\"fiber\" migrate -C \"{}\"` and confirm by typing \"YES\".",
path_buf.display()
);
return Err("need to migrate".to_string());
return Err("incompatible database, need to upgrade fiber binary".to_string());
}
Ordering::Equal => {
eprintln!("no need to migrate, everything is OK ...");
return Ok(migrate.db());
}
Ordering::Less => {
let path_buf = path.as_ref().to_path_buf();
let input = prompt(format!("\
\n\
Fiber need to run some database migrations.\n\
\n\
Once the migration started, the data will be no longer compatible with all older version,\n\
so we strongly recommended you to backup the old data {} before migrating.\n\
\n\
If the migration failed, try to delete all data and sync from scratch.\n\
\nIf you want to migrate the data, please input YES, otherwise, the current process will exit.\n\
> ", path_buf.display()).as_str());

if input.trim().to_lowercase() != "yes" {
error!("Migration was declined since the user didn't confirm.");
return Err("need to migrate".to_string());
}
info!("now begin to migrate db ...");
let db = migrate.migrate().expect("failed to migrate db");
info!("db migrated successfully ...");
Ok(db)
}
}
} else {
eprintln!("now begin to init db version ...");
migrate
.init_db_version()
.expect("failed to init db version");
Ok(migrate.db())
}
} else {
eprintln!("now begin to init db version ...");
migrate
.init_db_version()
.expect("failed to init db version");
Ok(migrate.db())
}
}

pub fn prompt(msg: &str) -> String {
let stdout = stdout();
let mut stdout = stdout.lock();
let stdin = stdin();

write!(stdout, "{msg}").unwrap();
stdout.flush().unwrap();

let mut input = String::new();
let _ = stdin.read_line(&mut input);

input
}

pub struct Batch {
db: Arc<DB>,
wb: WriteBatch,
Expand Down

0 comments on commit 868a1df

Please sign in to comment.