diff --git a/Cargo.toml b/Cargo.toml index bf4a35ba..feb15a98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,8 +35,11 @@ required-features = ["failpoints"] byteorder = "1.2" crc32fast = "1.2" crossbeam = "0.8" +crossbeam-channel = "0.5.8" +either = "1.8.1" fail = "0.5" fs2 = "0.4" +futures = "0.3.28" hashbrown = "0.14" hex = "0.4" if_chain = "1.0" @@ -70,6 +73,7 @@ rand = "0.8" rand_distr = "0.4" tempfile = "3.6" toml = "0.8" +md-5 = "0.10.5" [features] default = ["internals", "scripting"] diff --git a/src/config.rs b/src/config.rs index 4283737f..c4cacf99 100644 --- a/src/config.rs +++ b/src/config.rs @@ -40,6 +40,8 @@ pub struct Config { /// Default: None pub spill_dir: Option, + pub second_dir: Option, + /// How to deal with file corruption during recovery. /// /// Default: "tolerate-tail-corruption". @@ -110,7 +112,7 @@ pub struct Config { /// Whether to prepare log files for recycling when start. /// If `true`, batch empty log files will be prepared for recycling when /// starting engine. - /// Only available for `enable-log-reycle` is true. + /// Only available for `enable-log-recycle` is true. /// /// Default: false pub prefill_for_recycle: bool, @@ -129,6 +131,7 @@ impl Default for Config { let mut cfg = Config { dir: "".to_owned(), spill_dir: None, + second_dir: None, recovery_mode: RecoveryMode::TolerateTailCorruption, recovery_read_block_size: ReadableSize::kb(16), recovery_threads: 4, diff --git a/src/engine.rs b/src/engine.rs index 0d055296..d8f21b41 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -74,12 +74,13 @@ where Self::open_with(cfg, file_system, vec![]) } - pub fn open_with( + fn open_with( mut cfg: Config, file_system: Arc, mut listeners: Vec>, ) -> Result>> { cfg.sanitize()?; + file_system.bootstrap()?; listeners.push(Arc::new(PurgeHook::default()) as Arc); let start = Instant::now(); @@ -626,7 +627,7 @@ where #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::env::{ObfuscatedFileSystem, Permission}; + use crate::env::{HedgedFileSystem, ObfuscatedFileSystem, Permission}; use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt}; use crate::log_batch::AtomicGroupBuilder; use crate::pipe_log::Version; @@ -2629,6 +2630,284 @@ pub(crate) mod tests { assert!(engine.raft_groups().is_empty()); } + fn number_of_files(p: &Path) -> usize { + let mut r = 0; + std::fs::read_dir(p).unwrap().for_each(|e| { + if e.unwrap() + .path() + .file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with("000") + { + r += 1; + } + }); + r + } + + use md5::{Digest, Md5}; + use std::{fs, io}; + + fn calculate_hash(path: &Path) -> [u8; 16] { + let mut hasher = Md5::new(); + + std::fs::read_dir(path).unwrap().for_each(|e| { + let p = e.unwrap().path(); + let file_name = p.file_name().unwrap().to_str().unwrap(); + match FileId::parse_file_name(file_name) { + None => { + if parse_reserved_file_name(file_name).is_none() { + return; + } + } + _ => {} + } + let mut file = fs::File::open(&p).unwrap(); + let n = io::copy(&mut file, &mut hasher).unwrap(); + }); + hasher.finalize().into() + } + + use std::io::Write; + #[test] + fn test_start_engine_with_second_disk() { + let dir = tempfile::Builder::new() + .prefix("test_start_engine_with_second_disk_default") + .tempdir() + .unwrap(); + let sec_dir = tempfile::Builder::new() + .prefix("test_start_engine_with_second_disk_second") + .tempdir() + .unwrap(); + + let file_system = Arc::new(HedgedFileSystem::new( + Arc::new(DefaultFileSystem {}), + dir.path().to_path_buf(), + sec_dir.path().to_path_buf(), + )); + let entry_data = vec![b'x'; 512]; + + // Preparations for multi-dirs. + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + enable_log_recycle: false, + target_file_size: ReadableSize(1), + ..Default::default() + }; + + // Step 1: write data into the main directory. + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + for rid in 1..=10 { + engine.append(rid, 1, 10, Some(&entry_data)); + } + drop(engine); + + // Restart the engine with recycle and prefill. Test reusing files from both + // dirs. + let cfg_2 = Config { + enable_log_recycle: true, + prefill_for_recycle: true, + purge_threshold: ReadableSize(40), + ..cfg + }; + let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system).unwrap(); + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + for rid in 1..=10 { + assert_eq!(engine.first_index(rid).unwrap(), 1); + engine.clean(rid); + } + engine.purge_manager.must_rewrite_append_queue(None, None); + let file_count = number_of_files(dir.path()); + assert_eq!(number_of_files(sec_dir.path()), file_count); + assert!(file_count > engine.file_count(None)); + // Append data, recycled files are reused. + for rid in 1..=30 { + engine.append(rid, 20, 30, Some(&entry_data)); + } + // No new file is created. + let file_count1 = number_of_files(dir.path()); + assert_eq!(file_count, file_count1); + assert_eq!(number_of_files(sec_dir.path()), file_count1); + } + + #[test] + fn test_start_engine_with_abnormal_second_disk() { + let dir = tempfile::Builder::new() + .prefix("test_start_engine_with_abnormal_second_disk_default") + .tempdir() + .unwrap(); + let sec_dir = tempfile::Builder::new() + .prefix("test_start_engine_with_abnormal_second_disk_second") + .tempdir() + .unwrap(); + + let file_system = Arc::new(HedgedFileSystem::new( + Arc::new(DefaultFileSystem {}), + dir.path().to_path_buf(), + sec_dir.path().to_path_buf(), + )); + let entry_data = vec![b'x'; 512]; + + // Preparations for multi-dirs. + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + enable_log_recycle: true, + prefill_for_recycle: true, + target_file_size: ReadableSize(1), + purge_threshold: ReadableSize(40), + ..Default::default() + }; + + // Step 1: write data into the main directory. + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + for rid in 1..=10 { + engine.append(rid, 1, 10, Some(&entry_data)); + } + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + for rid in 1..=10 { + assert_eq!(engine.first_index(rid).unwrap(), 1); + engine.clean(rid); + } + engine.purge_manager.must_rewrite_append_queue(None, None); + let file_count = number_of_files(dir.path()); + assert_eq!(number_of_files(sec_dir.path()), file_count); + assert!(file_count > engine.file_count(None)); + // Append data, recycled files are reused. + for rid in 1..=30 { + engine.append(rid, 20, 30, Some(&entry_data)); + } + // No new file is created. + let file_count1 = number_of_files(dir.path()); + assert_eq!(file_count, file_count1); + assert_eq!(number_of_files(sec_dir.path()), file_count1); + drop(engine); + + // abnormal case - Empty second dir + { + std::fs::remove_dir_all(sec_dir.path()).unwrap(); + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + // All files in first dir are copied to second dir + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); + } + // abnormal case - Missing some append files in second dir + { + let mut file_count = 0; + for e in std::fs::read_dir(sec_dir.path()).unwrap() { + let p = e.unwrap().path(); + let file_name = p.file_name().unwrap().to_str().unwrap(); + if let Some(FileId { + queue: LogQueue::Append, + seq: _, + }) = FileId::parse_file_name(file_name) + { + if file_count % 2 == 0 { + std::fs::remove_file(sec_dir.path().join(file_name)).unwrap(); + } + file_count += 1; + } + } + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + // Missing append files are copied + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); + } + // abnormal case - Missing some rewrite files in second dir + { + let mut file_count = 0; + for e in std::fs::read_dir(sec_dir.path()).unwrap() { + let p = e.unwrap().path(); + let file_name = p.file_name().unwrap().to_str().unwrap(); + if let Some(FileId { + queue: LogQueue::Rewrite, + seq: _, + }) = FileId::parse_file_name(file_name) + { + if file_count % 2 == 0 { + std::fs::remove_file(sec_dir.path().join(file_name)).unwrap(); + } + file_count += 1; + } + } + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + // Missing rewrite files are copied + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); + } + // abnormal case - Missing some reserve files in second dir + { + let mut file_count = 0; + for e in std::fs::read_dir(sec_dir.path()).unwrap() { + let p = e.unwrap().path(); + let file_name = p.file_name().unwrap().to_str().unwrap(); + if let None = FileId::parse_file_name(file_name) { + if file_count % 2 == 0 { + std::fs::remove_file(sec_dir.path().join(file_name)).unwrap(); + } + file_count += 1; + } + } + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + // Missing reserve files are copied + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); + } + // abnormal case - Have some extra files in second dir + { + let mut file_count = 0; + for e in std::fs::read_dir(sec_dir.path()).unwrap() { + let p = e.unwrap().path(); + let file_name = p.file_name().unwrap().to_str().unwrap(); + if file_count % 2 == 0 { + std::fs::copy( + sec_dir.path().join(file_name), + sec_dir.path().join(file_name.to_owned() + "tmp"), + ) + .unwrap(); + } + } + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + // Extra files are untouched. + assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); + } + // TODO: handle the error + // abnormal case - One file is corrupted + { + for e in std::fs::read_dir(sec_dir.path()).unwrap() { + let p = e.unwrap().path(); + let file_name = p.file_name().unwrap().to_str().unwrap(); + if file_count % 2 == 0 { + let mut f = std::fs::OpenOptions::new() + .write(true) + .open(sec_dir.path().join(file_name)) + .unwrap(); + f.write_all(b"corrupted").unwrap(); + } + } + let engine = + RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + // Corrupted files are untouched. + assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + assert_eq!(calculate_hash(sec_dir.path()), calculate_hash(dir.path())); + } + // abnormal case - One file in main dir is corrupted and one file in second dir + // is corrupted + {} + // abnormal case - Missing latest rewrite file in main dir and missing one log + // file in second dir + {} + } + #[test] fn test_start_engine_with_multi_dirs() { let dir = tempfile::Builder::new() @@ -2639,22 +2918,7 @@ pub(crate) mod tests { .prefix("test_start_engine_with_multi_dirs_spill") .tempdir() .unwrap(); - fn number_of_files(p: &Path) -> usize { - let mut r = 0; - std::fs::read_dir(p).unwrap().for_each(|e| { - if e.unwrap() - .path() - .file_name() - .unwrap() - .to_str() - .unwrap() - .starts_with("000") - { - r += 1; - } - }); - r - } + let file_system = Arc::new(DeleteMonitoredFileSystem::new()); let entry_data = vec![b'x'; 512]; diff --git a/src/env/default.rs b/src/env/default.rs index 44f4fa18..2c06338c 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -93,6 +93,7 @@ impl WriteExt for LogFile { } } +#[derive(Clone)] pub struct DefaultFileSystem; impl FileSystem for DefaultFileSystem { diff --git a/src/env/hedged/mod.rs b/src/env/hedged/mod.rs new file mode 100644 index 00000000..e6b693cd --- /dev/null +++ b/src/env/hedged/mod.rs @@ -0,0 +1,671 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use crossbeam::channel::unbounded; +use crossbeam::channel::Receiver; +use log::info; +use std::fs; +use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; +use std::path::Path; +use std::path::PathBuf; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; + +use crate::env::log_fd::LogFd; +use crate::env::DefaultFileSystem; +use crate::env::{FileSystem, Handle, Permission, WriteExt}; +use futures::executor::block_on; +use futures::{join, select}; + +mod recover; +mod runner; +mod sender; +mod task; +mod util; + +use runner::TaskRunner; +use sender::HedgedSender; +use task::{ + empty_callback, paired_future_callback, Callback, FutureHandle, SeqTask, Task, TaskRes, +}; +use util::replace_path; + +pub use sender::State; + +/// In cloud environment, cloud disk IO may get stuck for a while due to cloud +/// vendor infrastructure issues. This may affect the foreground latency +/// dramatically. Raft log apply doesn't sync mostly, so it wouldn't be a +/// problem. While raft log append is synced every time. To alleviate that, we +/// can write raft log to two different cloud disks. If either one of them is +/// synced, the raft log append is considered finished. Thus when one of the +/// cloud disk IO is stuck, the other one can still work and doesn't affect +/// foreground write flow. +/// +/// Under the hood, the file system manages two directories on different +/// cloud disks. All operations are serialized as tasks attached with a +/// monotonic increasing sequence number in the channel. The task is sent to +/// both disks and wait until either one of the channels consumes the task. With +/// that, if one of the disk's io is slow for a long time, the other can still +/// serve the operations without any delay. As we know, if two state machines +/// apply several changes in same order with same initial state, the final state +/// of the two state machines must be the same. So once the slow disk comes back +/// normal, it can catch up with the accumulated operations record in the +/// channel. Then the contents of the two disks can be identical still. +/// +/// For raft engine write thread model, only one thread writes WAL at one point. +/// So not supporting writing WAL concurrently is not a big deal. But for the +/// rewrite(GC), it is concurrent to WAL write. Making GC write operations +/// serialized with WAL write may affect the performance pretty much as write io +/// is performed only by one thread. For performance consideration, we can treat +/// rewrite files especially that make rewrite operations wait both disks +/// because rewrite is a background job that doesn’t affect foreground latency. +/// +/// For read, for performance consideration, it doesn't serialize read +/// operations to disks' channel. It just reads from the disk which has handled +/// task of larger sequence number. +/// +/// But what if it's blocking for a long time or down, then the infinite +/// accumulated operations record would exhaust the memory causing OOM. To avoid +/// that, once the channel piles up to some extent, it just abandons that disk, +/// which sends an `Pause` task to the channel and does not send tasks to that +/// disk's channel anymore, while operations on the rewrite files' is not paused +/// and still wait on both disks. So it has the chance that one disk written a +/// rewrite entry while the origin entry is not written yet. But it's fine as +/// rewrite read is performed on the faster disk which has the origin entry. +/// +/// And later on, when the blocked disk comes back to normal, it would +/// finally consume to the end of channel where is the `Pause` task indicating +/// the disk is recovered. Then the slow disk would request a snapshot from +/// faster disk, let the normal disk copy all the append files to that disk. +/// And new changes resume being sent to the disk's channel. With that, all the +/// writes to the new mutable append file during the copy won't be lost, and +/// two disks can be synced eventually. +/// +/// As the raft log file number increases monotonically, after a restart, it can +/// check the latest append(exclude rewrite files) file number to decide +/// which disk has the latest data. If the largest file numbers are the same, we +/// can't simply use the file size to decide due to file recycling. Instead, +/// scan the two files to find the biggest offset of the latest and valid record +/// in them. And finally, regarding the disk of the bigger one as the latest. +/// The latest one is responsible to sync the missing append files to the other +/// disk. +/// +/// It relays on some raft-engine invariants: +/// 1. Raft log is append only. +/// 2. Raft log is read-only once it's sealed. +/// Here is the TLA+ proof https://github.com/pingcap/tla-plus/pull/41 + +// TODO: handle specially on config change(upgrade and downgrade) +// TODO: fallback to one disk, if the other disk is down for a long time and +// close to full +// TODO: consider encryption + +// All operations of file system trait are sent to the channels of both disks +// through `HedgedFileSystem`. +pub struct HedgedFileSystem { + base: Arc, + + path1: PathBuf, + path2: PathBuf, + + sender: HedgedSender, + + seqno1: Arc, + seqno2: Arc, + + thread1: Option>, + thread2: Option>, +} + +impl HedgedFileSystem { + pub fn new(base: Arc, path1: PathBuf, path2: PathBuf) -> Self { + let (tx1, rx1) = unbounded::<(SeqTask, Callback)>(); + let (tx2, rx2) = unbounded::<(SeqTask, Callback)>(); + let sender = HedgedSender::new(tx1, tx2); + + let seqno1 = Arc::new(AtomicU64::new(0)); + let seqno2 = Arc::new(AtomicU64::new(0)); + let runner1 = TaskRunner::new( + 1, + path1.clone(), + base.clone(), + rx1, + sender.clone(), + seqno1.clone(), + ); + let runner2 = TaskRunner::new( + 2, + path2.clone(), + base.clone(), + rx2, + sender.clone(), + seqno2.clone(), + ); + let thread1 = runner1.spawn(); + let thread2 = runner2.spawn(); + Self { + base, + path1, + path2, + sender, + seqno1, + seqno2, + thread1: Some(thread1), + thread2: Some(thread2), + } + } + + pub fn state(&self) -> State { + self.sender.state() + } + + async fn wait_handle(&self, task1: Task, task2: Task) -> IoResult { + let (cb1, mut f1) = paired_future_callback(); + let (cb2, mut f2) = paired_future_callback(); + self.sender.send(task1.clone(), task2.clone(), cb1, cb2); + + let resolve = |res: TaskRes| -> (LogFd, bool) { + match res { + TaskRes::Create { fd, is_for_rewrite } => (fd, is_for_rewrite), + TaskRes::Open { fd, is_for_rewrite } => (fd, is_for_rewrite), + _ => unreachable!(), + } + }; + select! { + res1 = f1 => res1.unwrap().map(|res| { + let (fd, is_for_rewrite) = resolve(res); + HedgedHandle::new( + self.base.clone(), + is_for_rewrite, + self.sender.clone(), + FutureHandle::new_owned(fd), + FutureHandle::new(f2, task2), + self.seqno1.clone(), + self.seqno2.clone(), + ) + }), + res2 = f2 => res2.unwrap().map(|res| { + let (fd, is_for_rewrite) = resolve(res); + HedgedHandle::new( + self.base.clone(), + is_for_rewrite, + self.sender.clone(), + FutureHandle::new(f1, task1), + FutureHandle::new_owned(fd) , + self.seqno1.clone(), + self.seqno2.clone(), + ) + }), + } + } + + async fn wait(&self, task1: Task, task2: Task) -> IoResult<()> { + let (cb1, mut f1) = paired_future_callback(); + let (cb2, mut f2) = paired_future_callback(); + self.sender.send(task1, task2, cb1, cb2); + + select! { + res1 = f1 => res1.unwrap().map(|_| ()), + res2 = f2 => res2.unwrap().map(|_| ()), + } + } +} + +impl Drop for HedgedFileSystem { + fn drop(&mut self) { + block_on(self.wait(Task::Stop, Task::Stop)).unwrap(); + + let t1 = self.thread1.take().unwrap(); + let t2 = self.thread2.take().unwrap(); + let mut times = 0; + loop { + if t1.is_finished() && t2.is_finished() { + t1.join().unwrap(); + t2.join().unwrap(); + break; + } + times += 1; + if times > 100 { + // wait 1s + // one disk may be blocked for a long time, + // to avoid block shutdown process for a long time, do not join the threads + // here, only need at least to ensure one thread is exited + if t1.is_finished() || t2.is_finished() { + if t1.is_finished() { + t1.join().unwrap(); + } else { + t2.join().unwrap(); + } + break; + } + } + std::thread::sleep(std::time::Duration::from_millis(10)); + } + } +} + +impl FileSystem for HedgedFileSystem { + type Handle = HedgedHandle; + type Reader = HedgedReader; + type Writer = HedgedWriter; + + fn bootstrap(&self) -> IoResult<()> { + // catch up diff + if !self.path1.exists() { + info!("Create raft log directory: {}", self.path1.display()); + fs::create_dir(&self.path1).unwrap(); + } + if !self.path2.exists() { + info!("Create raft log directory: {}", self.path2.display()); + fs::create_dir(&self.path2).unwrap(); + } + let files1 = recover::get_files(&self.path1)?; + let files2 = recover::get_files(&self.path2)?; + + let count1 = recover::get_latest_valid_seq(&self.base, &files1)?; + let count2 = recover::get_latest_valid_seq(&self.base, &files2)?; + + match count1.cmp(&count2) { + std::cmp::Ordering::Equal => { + // still need to catch up, but only diff + recover::catch_up_diff(&self.base, files1, files2, false)?; + return Ok(()); + } + std::cmp::Ordering::Less => { + recover::catch_up_diff(&self.base, files2, files1, false)?; + } + std::cmp::Ordering::Greater => { + recover::catch_up_diff(&self.base, files1, files2, false)?; + } + } + Ok(()) + } + + fn create>(&self, path: P) -> IoResult { + block_on(self.wait_handle( + Task::Create(path.as_ref().to_path_buf()), + Task::Create(replace_path( + path.as_ref(), + self.path1.as_ref(), + self.path2.as_ref(), + )), + )) + } + + fn open>(&self, path: P, perm: Permission) -> IoResult { + block_on(self.wait_handle( + Task::Open { + path: path.as_ref().to_path_buf(), + perm, + }, + Task::Open { + path: replace_path(path.as_ref(), self.path1.as_ref(), self.path2.as_ref()), + perm, + }, + )) + } + + fn delete>(&self, path: P) -> IoResult<()> { + block_on(self.wait( + Task::Delete(path.as_ref().to_path_buf()), + Task::Delete(replace_path( + path.as_ref(), + self.path1.as_ref(), + self.path2.as_ref(), + )), + )) + } + + fn rename>(&self, src_path: P, dst_path: P) -> IoResult<()> { + block_on(self.wait( + Task::Rename { + src_path: src_path.as_ref().to_path_buf(), + dst_path: dst_path.as_ref().to_path_buf(), + }, + Task::Rename { + src_path: replace_path(src_path.as_ref(), self.path1.as_ref(), self.path2.as_ref()), + dst_path: replace_path(dst_path.as_ref(), self.path1.as_ref(), self.path2.as_ref()), + }, + )) + } + + fn new_reader(&self, handle: Arc) -> IoResult { + Ok(HedgedReader::new(handle)) + } + + fn new_writer(&self, handle: Arc) -> IoResult { + Ok(HedgedWriter::new(handle)) + } +} + +// HedgedHandle wraps two handles, and send the same operation to both disk's +// file handles +pub struct HedgedHandle { + base: Arc, + + sender: HedgedSender, + + // The two file handles for each disk + handle1: Arc, + handle2: Arc, + + // The sequence number of the latest handled task for each disk + seqno1: Arc, + seqno2: Arc, + + // For rewrite file, all the operations should wait both disks finished + rewrite_file: bool, + // The two threads for handling operations on rewrite files which is separated from the disk + // channel thread for performance consideration + thread1: Option>, + thread2: Option>, +} + +impl HedgedHandle { + fn new( + base: Arc, + rewrite_file: bool, + mut sender: HedgedSender, + handle1: FutureHandle, + handle2: FutureHandle, + mut seqno1: Arc, + mut seqno2: Arc, + ) -> Self { + let mut thread1 = None; + let mut thread2 = None; + if rewrite_file { + // use two separated threads for both wait + let (tx1, rx1) = unbounded::<(SeqTask, Callback)>(); + let (tx2, rx2) = unbounded::<(SeqTask, Callback)>(); + // replace the seqno with self owned, then in `read` the seqno from two disks + // should be always the same. It's just to reuse the logic without adding + // special check in `read` + seqno1 = Arc::new(AtomicU64::new(0)); + seqno2 = Arc::new(AtomicU64::new(0)); + let poll = |(rx, seqno, fs): ( + Receiver<(SeqTask, Callback)>, + Arc, + Arc, + )| { + for (task, cb) in rx { + if let Task::Stop = task.inner { + break; + } + assert!(!matches!(task.inner, Task::Pause | Task::Snapshot)); + let res = task.handle_process(&fs); + seqno.fetch_add(1, Ordering::Relaxed); + cb(res).unwrap(); + } + }; + let args1 = (rx1, seqno1.clone(), base.clone()); + thread1 = Some(thread::spawn(move || { + poll(args1); + })); + let args2 = (rx2, seqno2.clone(), base.clone()); + thread2 = Some(thread::spawn(move || { + poll(args2); + })); + sender = HedgedSender::new(tx1, tx2); + } + + Self { + base, + rewrite_file, + sender, + handle1: Arc::new(handle1), + handle2: Arc::new(handle2), + seqno1, + seqno2, + thread1, + thread2, + } + } + + fn read(&self, offset: usize, buf: &mut [u8]) -> IoResult { + // Raft engine promises that the offset would be read only after the write is + // finished and memtable is updated. And the hedged file system promises that + // the write is done when either one of the disk finishes the write. Here the + // read data must be present in at least one of the disks. So choose the disk of + // largest seqno to read. + // + // Safety: the get for these two seqno is not necessary to be atomic. + // What if the seqno2 is updated after getting seqno1? It's fine, let's say + // - T1 denotes the time of getting seqno1, the actual seqno for disk1 and disk2 + // is S1, S2 + // - T2 denotes the time of getting seqno2, the actual seqno for disk1 and disk2 + // is S1', S2' + // Assume disk2 is just slightly slower than disk1, here is a possible case: + // - T1: S1 = 10, S2 = 9 + // - T2: S1'= 12, S2'= 11 + // Then, what we get would be seq1=10, seq2=11, and the read would be performed + // on disk2. But disk2 is slower than disk1. The data may not be written yet. + // Would the read on a slower disk is safe? + // Yes, it's safe because at T1 we know the data can be read at least with a + // seqno of S1, then at T2, S2' > S1, so the data must be already written in the + // disk2, even if it's the slow disk. + let seq1 = self.seqno1.load(Ordering::Relaxed); + let seq2 = self.seqno2.load(Ordering::Relaxed); + match seq1.cmp(&seq2) { + std::cmp::Ordering::Equal => { + // TODO: read simultaneously from both disks and return the faster one + if let Some(fd) = self.handle1.try_get(&self.base)? { + fd.read(offset, buf) + } else if let Some(fd) = self.handle2.try_get(&self.base)? { + fd.read(offset, buf) + } else { + panic!("Both fd1 and fd2 are None"); + } + } + std::cmp::Ordering::Greater => { + self.handle1.try_get(&self.base)?.unwrap().read(offset, buf) + } + std::cmp::Ordering::Less => { + self.handle2.try_get(&self.base)?.unwrap().read(offset, buf) + } + } + } + + fn write(&self, offset: usize, content: &[u8]) -> IoResult { + block_on(self.wait( + Task::Write { + handle: self.handle1.clone(), + offset, + bytes: content.to_vec(), + }, + Task::Write { + handle: self.handle2.clone(), + offset, + bytes: content.to_vec(), + }, + )) + .map(|res| { + if let TaskRes::Write(size) = res { + size + } else { + unreachable!() + } + }) + } + + fn allocate(&self, offset: usize, size: usize) -> IoResult<()> { + block_on(self.wait( + Task::Allocate { + handle: self.handle1.clone(), + offset, + size, + }, + Task::Allocate { + handle: self.handle2.clone(), + offset, + size, + }, + )) + .map(|_| ()) + } + + async fn wait_one(&self, task1: Task, task2: Task) -> IoResult { + let (cb1, mut f1) = paired_future_callback(); + let (cb2, mut f2) = paired_future_callback(); + self.sender.send(task1, task2, cb1, cb2); + + select! { + res1 = f1 => res1.unwrap(), + res2 = f2 => res2.unwrap(), + } + } + + async fn wait_both(&self, task1: Task, task2: Task) -> IoResult { + let (cb1, f1) = paired_future_callback(); + let (cb2, f2) = paired_future_callback(); + self.sender.send(task1, task2, cb1, cb2); + + let (res1, res2) = join!(f1, f2); + match (res1.unwrap(), res2.unwrap()) { + (res @ Ok(_), Ok(_)) => res, + (Err(e), Err(_)) => Err(e), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + } + } + + async fn wait(&self, task1: Task, task2: Task) -> IoResult { + if self.rewrite_file { + self.wait_both(task1, task2).await + } else { + self.wait_one(task1, task2).await + } + } +} + +impl Handle for HedgedHandle { + fn truncate(&self, offset: usize) -> IoResult<()> { + block_on(self.wait( + Task::Truncate { + handle: self.handle1.clone(), + offset, + }, + Task::Truncate { + handle: self.handle2.clone(), + offset, + }, + )) + .map(|_| ()) + } + + fn file_size(&self) -> IoResult { + block_on(self.wait( + Task::FileSize(self.handle1.clone()), + Task::FileSize(self.handle2.clone()), + )) + .map(|res| { + if let TaskRes::FileSize(size) = res { + size + } else { + unreachable!() + } + }) + } + + fn sync(&self) -> IoResult<()> { + block_on(self.wait( + Task::Sync(self.handle1.clone()), + Task::Sync(self.handle2.clone()), + )) + .map(|_| ()) + } +} + +impl Drop for HedgedHandle { + fn drop(&mut self) { + if self.rewrite_file { + self.sender + .send(Task::Stop, Task::Stop, empty_callback(), empty_callback()); + self.thread1.take().unwrap().join().unwrap(); + self.thread2.take().unwrap().join().unwrap(); + } + } +} + +pub struct HedgedWriter { + inner: Arc, + offset: usize, +} + +impl HedgedWriter { + pub fn new(handle: Arc) -> Self { + Self { + inner: handle, + offset: 0, + } + } +} + +impl Write for HedgedWriter { + fn write(&mut self, buf: &[u8]) -> IoResult { + let len = self.inner.write(self.offset, buf)?; + self.offset += len; + Ok(len) + } + + fn flush(&mut self) -> IoResult<()> { + Ok(()) + } +} + +impl WriteExt for HedgedWriter { + fn truncate(&mut self, offset: usize) -> IoResult<()> { + self.inner.truncate(offset)?; + self.offset = offset; + Ok(()) + } + + fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { + self.inner.allocate(offset, size) + } +} + +impl Seek for HedgedWriter { + fn seek(&mut self, pos: SeekFrom) -> IoResult { + match pos { + SeekFrom::Start(offset) => self.offset = offset as usize, + SeekFrom::Current(i) => self.offset = (self.offset as i64 + i) as usize, + SeekFrom::End(i) => self.offset = (self.inner.file_size()? as i64 + i) as usize, + } + Ok(self.offset as u64) + } +} + +pub struct HedgedReader { + inner: Arc, + offset: usize, +} + +impl HedgedReader { + pub fn new(handle: Arc) -> Self { + Self { + inner: handle, + offset: 0, + } + } +} + +impl Seek for HedgedReader { + fn seek(&mut self, pos: SeekFrom) -> IoResult { + match pos { + SeekFrom::Start(offset) => self.offset = offset as usize, + SeekFrom::Current(i) => self.offset = (self.offset as i64 + i) as usize, + SeekFrom::End(i) => self.offset = (self.inner.file_size()? as i64 + i) as usize, + } + Ok(self.offset as u64) + } +} + +impl Read for HedgedReader { + fn read(&mut self, buf: &mut [u8]) -> IoResult { + let len = self.inner.read(self.offset, buf)?; + self.offset += len; + Ok(len) + } +} diff --git a/src/env/hedged/recover.rs b/src/env/hedged/recover.rs new file mode 100644 index 00000000..78751de2 --- /dev/null +++ b/src/env/hedged/recover.rs @@ -0,0 +1,255 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use crate::env::default::LogFile; +use crate::file_pipe_log::log_file::build_file_reader; +use crate::file_pipe_log::pipe_builder::FileName; +use crate::file_pipe_log::reader::LogItemBatchFileReader; +use crate::file_pipe_log::FileNameExt; +use crate::internals::parse_reserved_file_name; +use crate::internals::FileId; +use crate::internals::LogQueue; +use crate::Error; +use std::fs; +use std::io::Result as IoResult; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::env::log_fd::LogFd; +use crate::env::DefaultFileSystem; +use crate::env::{FileSystem, Permission}; + +use super::util::replace_path; + +#[derive(Default)] +pub(crate) struct Files { + pub prefix: PathBuf, + pub append_files: Vec, + pub rewrite_files: Vec, + pub reserved_files: Vec, +} + +pub(crate) enum SeqFile { + Path(FileName), + Handle((FileName, Arc)), +} + +impl SeqFile { + pub fn seq(&self) -> u64 { + match self { + SeqFile::Path(f) => f.seq, + SeqFile::Handle((f, _)) => f.seq, + } + } + + pub fn path(&self) -> &PathBuf { + match self { + SeqFile::Path(f) => &f.path, + SeqFile::Handle((f, _)) => &f.path, + } + } + + pub fn remove(&self) -> IoResult<()> { + match self { + SeqFile::Path(f) => fs::remove_file(&f.path), + SeqFile::Handle((f, _)) => fs::remove_file(&f.path), + } + } + + pub fn copy(&self, file_system: &DefaultFileSystem, to: &PathBuf) -> IoResult { + match self { + SeqFile::Path(f) => fs::copy(&f.path, to.as_path()), + SeqFile::Handle((_, fd)) => { + let mut reader = LogFile::new(fd.clone()); + let mut writer = LogFile::new(Arc::new(file_system.create(to)?)); + std::io::copy(&mut reader, &mut writer) + } + } + } + + pub fn into_handle(mut self, file_system: &DefaultFileSystem) -> Self { + if let SeqFile::Path(f) = self { + let fd = Arc::new(file_system.open(&f.path, Permission::ReadOnly).unwrap()); + self = SeqFile::Handle((f, fd)); + } + self + } +} + +pub(crate) fn catch_up_diff( + file_system: &Arc, + mut from_files: Files, + mut to_files: Files, + skip_rewrite: bool, +) -> IoResult<()> { + from_files + .append_files + .sort_by(|a, b| a.seq().cmp(&b.seq())); + to_files.append_files.sort_by(|a, b| a.seq().cmp(&b.seq())); + from_files + .rewrite_files + .sort_by(|a, b| a.seq().cmp(&b.seq())); + to_files.rewrite_files.sort_by(|a, b| a.seq().cmp(&b.seq())); + from_files + .reserved_files + .sort_by(|a, b| a.seq().cmp(&b.seq())); + to_files + .reserved_files + .sort_by(|a, b| a.seq().cmp(&b.seq())); + + let check_files = |from: &Vec, to: &Vec| -> IoResult<()> { + let last_from_seq = from.last().map(|f| f.seq()).unwrap_or(0); + + let mut iter1 = from.iter().peekable(); + let mut iter2 = to.iter().peekable(); + // compare files of from and to, if the file in from is not in to, copy it to + // to, and if the file in to is not in from, delete it + loop { + match (iter1.peek(), iter2.peek()) { + (None, None) => break, + (Some(f1), None) => { + let to = replace_path( + f1.path().as_ref(), + from_files.prefix.as_ref(), + to_files.prefix.as_ref(), + ); + f1.copy(file_system, &to)?; + iter1.next(); + } + (None, Some(f2)) => { + f2.remove()?; + iter2.next(); + } + (Some(f1), Some(f2)) => { + match f1.seq().cmp(&f2.seq()) { + std::cmp::Ordering::Equal => { + // check file size is not enough, treat the last files differently + // considering the recycle, always copy the last file + // TODO: only copy diff part + if f1.seq() == last_from_seq { + let to = replace_path( + f1.path().as_ref(), + from_files.prefix.as_ref(), + to_files.prefix.as_ref(), + ); + f1.copy(file_system, &to)?; + } + iter1.next(); + iter2.next(); + } + std::cmp::Ordering::Less => { + let to = replace_path( + f1.path().as_ref(), + from_files.prefix.as_ref(), + to_files.prefix.as_ref(), + ); + f1.copy(file_system, &to)?; + iter1.next(); + } + std::cmp::Ordering::Greater => { + f2.remove()?; + iter2.next(); + } + } + } + } + } + Ok(()) + }; + + check_files(&from_files.append_files, &to_files.append_files)?; + if !skip_rewrite { + check_files(&from_files.rewrite_files, &to_files.rewrite_files)?; + } + check_files(&from_files.reserved_files, &to_files.reserved_files)?; + Ok(()) +} + +pub(crate) fn get_files(path: &PathBuf) -> IoResult { + assert!(path.exists()); + + let mut files = Files { + prefix: path.clone(), + ..Default::default() + }; + + fs::read_dir(path) + .unwrap() + .try_for_each(|e| -> IoResult<()> { + let dir_entry = e?; + let p = dir_entry.path(); + if !p.is_file() { + return Ok(()); + } + let file_name = p.file_name().unwrap().to_str().unwrap(); + match FileId::parse_file_name(file_name) { + Some(FileId { + queue: LogQueue::Append, + seq, + }) => files.append_files.push(SeqFile::Path(FileName { + seq, + path: p, + path_id: 0, + })), + Some(FileId { + queue: LogQueue::Rewrite, + seq, + }) => files.rewrite_files.push(SeqFile::Path(FileName { + seq, + path: p, + path_id: 0, + })), + _ => { + if let Some(seq) = parse_reserved_file_name(file_name) { + files.reserved_files.push(SeqFile::Path(FileName { + seq, + path: p, + path_id: 0, + })) + } + } + } + Ok(()) + }) + .unwrap(); + + Ok(files) +} + +pub(crate) fn get_latest_valid_seq( + file_system: &Arc, + files: &Files, +) -> IoResult { + let mut count = 0; + if let Some(f) = files.append_files.last() { + let recovery_read_block_size = 1024; + let mut reader = LogItemBatchFileReader::new(recovery_read_block_size); + let handle = Arc::new(file_system.open(&f.path(), Permission::ReadOnly)?); + let file_reader = build_file_reader(file_system.as_ref(), handle)?; + match reader.open( + FileId { + queue: LogQueue::Append, + seq: f.seq(), + }, + file_reader, + ) { + Err(e) => match e { + Error::Io(err) => return Err(err), + _ => return Ok(0), + }, + Ok(_) => { + // Do nothing + } + } + loop { + match reader.next() { + Ok(Some(_)) => { + count += 1; + } + Ok(None) => break, + Err(_) => break, + } + } + } + + Ok(count) +} diff --git a/src/env/hedged/runner.rs b/src/env/hedged/runner.rs new file mode 100644 index 00000000..031051ef --- /dev/null +++ b/src/env/hedged/runner.rs @@ -0,0 +1,127 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use crossbeam::channel::Receiver; +use fail::fail_point; +use std::io::Result as IoResult; +use std::path::PathBuf; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; +use std::thread::JoinHandle; + +use crate::env::DefaultFileSystem; +use futures::executor::block_on; + +use super::recover; +use super::sender::HedgedSender; +use super::task::paired_future_callback; +use super::task::{Callback, SeqTask, Task, TaskRes}; + +// TaskRunner is a thread runner that handles the disk IO tasks. It would poll +// the channel until receiving a `Stop` task. +pub(crate) struct TaskRunner { + id: u8, + path: PathBuf, + fs: Arc, + rx: Receiver<(SeqTask, Callback)>, + sender: HedgedSender, + seqno: Arc, +} + +impl TaskRunner { + pub fn new( + id: u8, + path: PathBuf, + fs: Arc, + rx: Receiver<(SeqTask, Callback)>, + sender: HedgedSender, + seqno: Arc, + ) -> Self { + Self { + id, + path, + fs, + rx, + sender, + seqno, + } + } + + pub fn spawn(self) -> JoinHandle<()> { + let id = self.id; + thread::Builder::new() + .name(format!("raft-engine-disk{}", id)) + .spawn(move || { + if let Err(e) = self.poll() { + panic!("disk {} failed: {:?}", id, e); + } + }) + .unwrap() + } + + fn poll(self) -> IoResult<()> { + let mut last_seq = 0; + let mut snap_seq = None; + for (task, cb) in self.rx { + if let Task::Stop = task.inner { + cb(Ok(TaskRes::Stop))?; + break; + } + if let Task::Pause = task.inner { + // Encountering `Pause`, indicate the disk may not slow anymore + let (cb, f) = paired_future_callback(); + self.sender.send_snapshot(cb); + let to_files = recover::get_files(&self.path)?; + let from_files = block_on(f).unwrap().map(|res| { + if let TaskRes::Snapshot((seq, files)) = res { + snap_seq = Some(seq); + files + } else { + unreachable!() + } + })?; + + // Snapshot doesn't include the file size, so it would copy more data than + // the data seen at the time of snapshot. But it's okay, as the data is + // written with specific offset, so the data written + // of no necessity will be overwritten by the latter writes. + // Exclude rewrite files because rewrite files are always synced. + recover::catch_up_diff(&self.fs, from_files, to_files, true)?; + + self.sender.finish_snapshot(); + self.seqno.store(snap_seq.unwrap(), Ordering::Relaxed); + last_seq = snap_seq.unwrap(); + continue; + } + if self.id == 1 { + fail_point!("hedged::task_runner::thread1"); + } + let seq = task.seq; + assert_ne!(seq, 0); + if let Some(snap) = snap_seq.as_ref() { + // the change already included in the snapshot + if seq < *snap { + continue; + } else if seq == *snap { + unreachable!(); + } else if seq == *snap + 1 { + snap_seq = None; + } else { + panic!("seqno {} is larger than snapshot seqno {}", seq, *snap); + } + } + + assert_eq!(last_seq + 1, seq); + last_seq = seq; + let res = task.process(&self.fs, &self.path); + // seqno should be updated before the write callback is called, otherwise one + // read may be performed right after the write is finished. Then the read may be + // performed on the other disk not having the data because the seqno for this + // disk is not updated yet. + self.seqno.store(seq, Ordering::Relaxed); + cb(res)?; + } + Ok(()) + } +} diff --git a/src/env/hedged/sender.rs b/src/env/hedged/sender.rs new file mode 100644 index 00000000..75c90134 --- /dev/null +++ b/src/env/hedged/sender.rs @@ -0,0 +1,140 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use crossbeam::channel::Sender; +use fail::fail_point; +use std::sync::Arc; +use std::sync::Mutex; + +use super::task::{empty_callback, Callback, SeqTask, Task}; + +// let say the average entry size is 100B, then the total size of the log in the +// channel is 1GB, +const PAUSE_THRESHOLD: usize = 10000; + +fn get_pause_threshold() -> usize { + fail_point!("hedged::pause_threshold", |s| s + .unwrap() + .parse::() + .unwrap()); + PAUSE_THRESHOLD +} + +#[derive(Debug, PartialEq, Clone)] +pub enum State { + Normal, + Paused1, /* When the length of channel of disk1 reaches threshold, a + * `Pause` task is sent and no more later task will be sent + * to disk1 */ + Paused2, // no more task will be sent to disk2 + Recovering, +} + +// Make sure the task is sent to two disks' channel atomically, otherwise the +// ordering of the tasks in two disks' channels are not same. +#[derive(Clone)] +pub(crate) struct HedgedSender(Arc>); + +struct HedgedSenderInner { + disk1: Sender<(SeqTask, Callback)>, + disk2: Sender<(SeqTask, Callback)>, + seq: u64, + state: State, +} + +impl HedgedSender { + pub fn new(disk1: Sender<(SeqTask, Callback)>, disk2: Sender<(SeqTask, Callback)>) -> Self { + Self(Arc::new(Mutex::new(HedgedSenderInner { + disk1, + disk2, + seq: 0, + state: State::Normal, + }))) + } + + pub fn state(&self) -> State { + self.0.lock().unwrap().state.clone() + } + + pub fn send(&self, task1: Task, task2: Task, cb1: Callback, cb2: Callback) { + if matches!(task1, Task::Pause | Task::Snapshot) { + unreachable!(); + } + + let mut inner = self.0.lock().unwrap(); + inner.seq += 1; + let task1 = SeqTask { + inner: task1, + seq: inner.seq, + }; + let task2 = SeqTask { + inner: task2, + seq: inner.seq, + }; + if matches!(inner.state, State::Normal) { + let check1 = inner.disk1.len() > get_pause_threshold(); + let check2 = inner.disk2.len() > get_pause_threshold(); + match (check1, check2) { + (true, true) => { + panic!("Both channels of disk1 and disk2 are full") + } + (true, false) => { + inner.state = State::Paused1; + inner + .disk1 + .send(( + SeqTask { + inner: Task::Pause, + seq: 0, + }, + empty_callback(), + )) + .unwrap(); + } + (false, true) => { + inner.state = State::Paused2; + inner + .disk2 + .send(( + SeqTask { + inner: Task::Pause, + seq: 0, + }, + empty_callback(), + )) + .unwrap(); + } + _ => {} + } + } + if !matches!(inner.state, State::Paused1) { + inner.disk1.send((task1, cb1)).unwrap(); + } + if !matches!(inner.state, State::Paused2) { + inner.disk2.send((task2, cb2)).unwrap(); + } + } + + pub fn send_snapshot(&self, cb: Callback) { + let mut inner = self.0.lock().unwrap(); + inner.seq += 1; + let task = SeqTask { + inner: Task::Snapshot, + seq: inner.seq, + }; + match inner.state { + State::Paused1 => { + inner.disk2.send((task, cb)).unwrap(); + } + State::Paused2 => { + inner.disk1.send((task, cb)).unwrap(); + } + _ => unreachable!(), + } + inner.state = State::Recovering; + } + + pub fn finish_snapshot(&self) { + let mut inner = self.0.lock().unwrap(); + inner.state = State::Normal; + } +} diff --git a/src/env/hedged/task.rs b/src/env/hedged/task.rs new file mode 100644 index 00000000..206f2e44 --- /dev/null +++ b/src/env/hedged/task.rs @@ -0,0 +1,256 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use std::cell::UnsafeCell; +use std::io::Result as IoResult; +use std::path::PathBuf; +use std::sync::Arc; + +use crate::env::log_fd::LogFd; +use crate::env::DefaultFileSystem; +use crate::env::{FileSystem, Handle, Permission}; +use futures::channel::oneshot::{self, Canceled}; +use futures::executor::block_on; + +use super::recover::{self, Files}; + +use either::Either; + +pub(crate) type Callback = Box) -> IoResult + Send>; + +pub(crate) fn empty_callback() -> Callback { + Box::new(|_| Ok(TaskRes::Noop)) +} + +pub(crate) fn paired_future_callback() -> (Callback, oneshot::Receiver>) { + let (tx, future) = oneshot::channel(); + let callback = Box::new(move |result| -> IoResult { + if let Err(result) = tx.send(result) { + return result; + } + Ok(TaskRes::Noop) + }); + (callback, future) +} + +pub(crate) struct SeqTask { + pub inner: Task, + pub seq: u64, +} + +#[derive(Clone)] +pub(crate) enum Task { + Create(PathBuf), + Open { + path: PathBuf, + perm: Permission, + }, + Delete(PathBuf), + Rename { + src_path: PathBuf, + dst_path: PathBuf, + }, + Truncate { + handle: Arc, + offset: usize, + }, + FileSize(Arc), + Sync(Arc), + Write { + handle: Arc, + offset: usize, + bytes: Vec, + }, + Allocate { + handle: Arc, + offset: usize, + size: usize, + }, + Pause, + Snapshot, + Stop, +} + +impl SeqTask { + pub fn process(self, file_system: &DefaultFileSystem, path: &PathBuf) -> IoResult { + match self.inner { + Task::Create(path) => file_system.create(&path).map(|h| TaskRes::Create { + fd: h, + is_for_rewrite: path.extension().map_or(false, |ext| ext == "rewrite"), + }), + Task::Open { path, perm } => file_system.open(&path, perm).map(|h| TaskRes::Open { + fd: h, + is_for_rewrite: path.extension().map_or(false, |ext| ext == "rewrite"), + }), + Task::Delete(path) => file_system.delete(path).map(|_| TaskRes::Delete), + Task::Rename { src_path, dst_path } => file_system + .rename(src_path, dst_path) + .map(|_| TaskRes::Rename), + Task::Snapshot => { + let mut files = recover::get_files(&path)?; + files.append_files = files + .append_files + .into_iter() + .map(|f| f.into_handle(file_system)) + .collect(); + // exclude rewrite files, as they are always synced + files.reserved_files = files + .reserved_files + .into_iter() + .map(|f| f.into_handle(file_system)) + .collect(); + Ok(TaskRes::Snapshot((self.seq, files))) + } + Task::Stop | Task::Pause => unreachable!(), + _ => self.handle_process(file_system), + } + } + + pub fn handle_process(self, file_system: &DefaultFileSystem) -> IoResult { + match self.inner { + Task::Truncate { handle, offset } => handle + .get(file_system)? + .truncate(offset) + .map(|_| TaskRes::Truncate), + Task::FileSize(handle) => handle + .get(file_system)? + .file_size() + .map(|s| TaskRes::FileSize(s)), + Task::Sync(handle) => handle.get(file_system)?.sync().map(|_| TaskRes::Sync), + Task::Write { + handle, + offset, + bytes, + } => handle + .get(file_system)? + .write(offset, &bytes) + .map(|s| TaskRes::Write(s)), + Task::Allocate { + handle, + offset, + size, + } => handle + .get(file_system)? + .allocate(offset, size) + .map(|_| TaskRes::Allocate), + _ => unreachable!(), + } + } +} + +pub(crate) enum TaskRes { + Noop, + Create { fd: LogFd, is_for_rewrite: bool }, + Open { fd: LogFd, is_for_rewrite: bool }, + Delete, + Rename, + Truncate, + FileSize(usize), + Sync, + Write(usize), + Allocate, + Snapshot((u64, Files)), + Stop, +} + +// A helper struct to get the fd from callback in the future. As on the creation +// of `HedgedHandle`, only one of the fd is retrieved at the time, so we can +// need to check the receiver to get the fd for the later get. If the fd is +// ready, we will update the inner to `Arc`, so the later get will get +// the fd directly. +pub(crate) struct FutureHandle { + inner: UnsafeCell>, Arc>>, + task: Option, +} + +unsafe impl Send for FutureHandle {} + +// To avoid using `Mutex` +// Safety: +// For write, all writes are serialized to one channel, so only one thread will +// update the inner. For read, multiple readers and one writer and may visit +// try_get() concurrently to get the fd from receiver. The receiver is `Sync`, +// so only one of them will get the fd, and update the inner to Arc. +unsafe impl Sync for FutureHandle {} + +impl FutureHandle { + pub fn new(rx: oneshot::Receiver>, task: Task) -> Self { + Self { + inner: UnsafeCell::new(Either::Left(rx)), + task: Some(task), + } + } + pub fn new_owned(h: LogFd) -> Self { + Self { + inner: UnsafeCell::new(Either::Right(Arc::new(h))), + task: None, + } + } + + pub fn get(&self, file_system: &DefaultFileSystem) -> IoResult> { + let mut set = false; + let fd = match unsafe { &mut *self.inner.get() } { + Either::Left(rx) => { + set = true; + match block_on(rx) { + // Canceled is caused by the task is dropped when in paused state, + // so we should retry the task now + Err(Canceled) => self.retry_canceled(file_system)?, + Ok(res) => match res? { + TaskRes::Open { fd, .. } => Arc::new(fd), + TaskRes::Create { fd, .. } => Arc::new(fd), + _ => unreachable!(), + }, + } + } + Either::Right(w) => w.clone(), + }; + if set { + unsafe { + *self.inner.get() = Either::Right(fd.clone()); + } + } + Ok(fd) + } + + pub fn try_get(&self, file_system: &DefaultFileSystem) -> IoResult>> { + let mut set = false; + let fd = match unsafe { &mut *self.inner.get() } { + Either::Left(rx) => { + set = true; + match rx.try_recv() { + // Canceled is caused by the task is dropped when in paused state, + // so we should retry the task now + Err(Canceled) => self.retry_canceled(file_system)?, + Ok(None) => return Ok(None), + Ok(Some(res)) => match res? { + TaskRes::Open { fd, .. } => Arc::new(fd), + TaskRes::Create { fd, .. } => Arc::new(fd), + _ => unreachable!(), + }, + } + } + Either::Right(w) => w.clone(), + }; + if set { + unsafe { + *self.inner.get() = Either::Right(fd.clone()); + } + } + Ok(Some(fd)) + } + + fn retry_canceled(&self, file_system: &DefaultFileSystem) -> IoResult> { + Ok(match self.task.as_ref().unwrap() { + Task::Create(path) => { + // has been already created, so just open + let fd = file_system.open(path, Permission::ReadWrite)?; + Arc::new(fd) + } + Task::Open { path, perm } => { + let fd = file_system.open(path, *perm)?; + Arc::new(fd) + } + _ => unreachable!(), + }) + } +} diff --git a/src/env/hedged/util.rs b/src/env/hedged/util.rs new file mode 100644 index 00000000..f5f5b9f1 --- /dev/null +++ b/src/env/hedged/util.rs @@ -0,0 +1,9 @@ +use std::path::{Path, PathBuf}; + +pub fn replace_path(path: &Path, from: &Path, to: &Path) -> PathBuf { + if let Ok(file) = path.strip_prefix(from) { + to.to_path_buf().join(file) + } else { + panic!("Invalid path: {:?}", path); + } +} diff --git a/src/env/mod.rs b/src/env/mod.rs index 06de495a..6f9a88a8 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -5,12 +5,16 @@ use std::path::Path; use std::sync::Arc; mod default; +mod hedged; mod log_fd; mod obfuscated; pub use default::DefaultFileSystem; +pub use hedged::HedgedFileSystem; pub use obfuscated::ObfuscatedFileSystem; +pub use hedged::State; + #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum Permission { ReadOnly, @@ -23,6 +27,10 @@ pub trait FileSystem: Send + Sync { type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; + fn bootstrap(&self) -> Result<()> { + Ok(()) + } + fn create>(&self, path: P) -> Result; fn open>(&self, path: P, perm: Permission) -> Result; diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index 8ba92592..572e8235 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -30,7 +30,7 @@ pub(super) fn build_file_writer( handle: Arc, format: LogFileFormat, force_reset: bool, -) -> Result> { +) -> IoResult> { let writer = system.new_writer(handle.clone())?; LogFileWriter::open(handle, writer, format, force_reset) } @@ -49,7 +49,7 @@ impl LogFileWriter { writer: F::Writer, format: LogFileFormat, force_reset: bool, - ) -> Result { + ) -> IoResult { let file_size = handle.file_size()?; let mut f = Self { handle, @@ -130,10 +130,10 @@ impl LogFileWriter { } /// Build a file reader. -pub(super) fn build_file_reader( +pub(crate) fn build_file_reader( system: &F, handle: Arc, -) -> Result> { +) -> IoResult> { let reader = system.new_reader(handle.clone())?; Ok(LogFileReader::open(handle, reader)) } diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index 64042e01..3bad1446 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -5,10 +5,10 @@ //! [`PipeLog`]: crate::pipe_log::PipeLog mod format; -mod log_file; -mod pipe; -mod pipe_builder; -mod reader; +pub mod log_file; +pub mod pipe; +pub mod pipe_builder; +pub mod reader; pub use format::{parse_reserved_file_name, FileNameExt}; pub use pipe::DualPipes as FilePipeLog; @@ -31,6 +31,7 @@ pub mod debug { use super::format::{FileNameExt, LogFileFormat}; use super::log_file::{LogFileReader, LogFileWriter}; use super::reader::LogItemBatchFileReader; + use std::io::Result as IoResult; /// Opens a log file for write. When `create` is true, the specified file /// will be created first if not exists. @@ -40,7 +41,7 @@ pub mod debug { path: &Path, format: LogFileFormat, create: bool, - ) -> Result> { + ) -> IoResult> { let fd = if create { file_system.create(path)? } else { @@ -54,7 +55,7 @@ pub mod debug { pub fn build_file_reader( file_system: &F, path: &Path, - ) -> Result> { + ) -> IoResult> { let fd = Arc::new(file_system.open(path, Permission::ReadOnly)?); super::log_file::build_file_reader(file_system, fd) } diff --git a/src/file_pipe_log/pipe_builder.rs b/src/file_pipe_log/pipe_builder.rs index 25ca433c..ca0c0a8a 100644 --- a/src/file_pipe_log/pipe_builder.rs +++ b/src/file_pipe_log/pipe_builder.rs @@ -349,8 +349,8 @@ impl DualPipesBuilder { let file_system = self.file_system.clone(); // As the `recover_queue` would update the `LogFileFormat` of each log file - // in `apend_files` and `rewrite_files`, we re-design the implementation on - // `recover_queue` to make it compatiable to concurrent processing + // in `append_files` and `rewrite_files`, we re-design the implementation on + // `recover_queue` to make it compatible to concurrent processing // with ThreadPool. let (append, rewrite) = pool.join( || { @@ -630,5 +630,5 @@ pub(super) fn lock_dir>(dir: P) -> Result { pub(crate) struct FileName { pub seq: FileSeq, pub path: PathBuf, - path_id: PathId, + pub path_id: PathId, } diff --git a/src/file_pipe_log/reader.rs b/src/file_pipe_log/reader.rs index 106ba72f..d19d9900 100644 --- a/src/file_pipe_log/reader.rs +++ b/src/file_pipe_log/reader.rs @@ -10,7 +10,7 @@ use super::format::{is_zero_padded, LogFileFormat}; use super::log_file::LogFileReader; /// A reusable reader over [`LogItemBatch`]s in a log file. -pub(super) struct LogItemBatchFileReader { +pub(crate) struct LogItemBatchFileReader { file_id: Option, format: Option, pub(crate) reader: Option>, diff --git a/src/swappy_allocator.rs b/src/swappy_allocator.rs index 8baa4835..06e02154 100644 --- a/src/swappy_allocator.rs +++ b/src/swappy_allocator.rs @@ -1030,7 +1030,7 @@ mod tests { // test_extend_ref let mut v = VecDeque::new_in(allocator.clone()); v.push_back(1); - v.extend(&[2, 3, 4]); + v.extend([2, 3, 4]); assert_eq!(v.len(), 4); assert_eq!(v[0], 1); diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index 8d74b70b..b463d9e3 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -1,5 +1,7 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use raft_engine::env::{DefaultFileSystem, HedgedFileSystem}; +use std::path::Path; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Barrier}; use std::time::Duration; @@ -1186,3 +1188,150 @@ fn test_build_engine_with_recycling_and_multi_dirs() { ); } } + +fn number_of_files(p: &Path) -> usize { + let mut r = 0; + std::fs::read_dir(p).unwrap().for_each(|e| { + if e.unwrap() + .path() + .file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with("000") + { + r += 1; + } + }); + r +} + +#[test] +fn test_start_engine_with_slow_second_disk_recover() { + let dir = tempfile::Builder::new() + .prefix("test_start_engine_with_slow_second_disk_default") + .tempdir() + .unwrap(); + let sec_dir = tempfile::Builder::new() + .prefix("test_start_engine_with_slow_second_disk_second") + .tempdir() + .unwrap(); + + let file_system = Arc::new(HedgedFileSystem::new( + Arc::new(DefaultFileSystem {}), + dir.path().to_path_buf(), + sec_dir.path().to_path_buf(), + )); + let entry_data = vec![b'x'; 512]; + + // Preparations for multi-dirs. + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + enable_log_recycle: false, + target_file_size: ReadableSize(1), + ..Default::default() + }; + + fail::cfg("hedged::pause_threshold", "return(10)").unwrap(); + // Step 1: write data into the main directory. + let engine = Engine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + fail::cfg("hedged::task_runner::thread1", "pause").unwrap(); + for rid in 1..=20 { + append(&engine, rid, 1, 10, Some(&entry_data)); + } + for rid in 1..=20 { + assert_eq!(engine.first_index(rid).unwrap(), 1); + } + assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + fail::remove("hedged::task_runner::thread1"); + let mut times = 0; + loop { + if number_of_files(sec_dir.path()) == number_of_files(dir.path()) { + break; + } + if times > 50 { + panic!("rewrite queue is not finished"); + } + times += 1; + std::thread::sleep(Duration::from_millis(10)); + } + assert_eq!(file_system.state(), env::State::Normal); + drop(file_system); + drop(engine); + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); +} + +#[test] +fn test_start_engine_with_slow_second_disk() { + let dir = tempfile::Builder::new() + .prefix("test_start_engine_with_slow_second_disk_default") + .tempdir() + .unwrap(); + let sec_dir = tempfile::Builder::new() + .prefix("test_start_engine_with_slow_second_disk_second") + .tempdir() + .unwrap(); + + let file_system = Arc::new(HedgedFileSystem::new( + Arc::new(DefaultFileSystem {}), + dir.path().to_path_buf(), + sec_dir.path().to_path_buf(), + )); + let entry_data = vec![b'x'; 512]; + + // Preparations for multi-dirs. + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + enable_log_recycle: false, + target_file_size: ReadableSize(1), + ..Default::default() + }; + + // Step 1: write data into the main directory. + let engine = Engine::open_with_file_system(cfg.clone(), file_system).unwrap(); + fail::cfg("hedged::task_runner::thread1", "pause").unwrap(); + for rid in 1..=10 { + append(&engine, rid, 1, 10, Some(&entry_data)); + } + for rid in 1..=10 { + assert_eq!(engine.first_index(rid).unwrap(), 1); + } + assert_ne!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + fail::remove("hedged::task_runner::thread1"); + drop(engine); + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + + // Restart the engine with recycle and prefill. Test reusing files from both + // dirs. + let cfg_2 = Config { + enable_log_recycle: true, + prefill_for_recycle: true, + purge_threshold: ReadableSize(40), + ..cfg + }; + let file_system = Arc::new(HedgedFileSystem::new( + Arc::new(DefaultFileSystem {}), + dir.path().to_path_buf(), + sec_dir.path().to_path_buf(), + )); + let engine = Engine::open_with_file_system(cfg_2, file_system).unwrap(); + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + for rid in 1..=10 { + assert_eq!(engine.first_index(rid).unwrap(), 1); + let mut log_batch = LogBatch::default(); + log_batch.add_command(rid, Command::Clean); + engine.write(&mut log_batch, true).unwrap(); + } + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + engine.purge_manager().must_rewrite_append_queue(None, None); + assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); + let file_count = number_of_files(dir.path()); + // Append data, recycled files are reused. + for rid in 1..=30 { + append(&engine, rid, 20, 30, Some(&entry_data)); + } + // No new file is created. + let file_count1 = number_of_files(dir.path()); + assert_eq!(file_count, file_count1); + assert_eq!(number_of_files(sec_dir.path()), file_count1); +}