-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce double write file system #323
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
src/env/double_write.rs
Outdated
pub struct HedgedHandle<F: FileSystem> { | ||
disk1: Sender<(FileTask, Callback<usize>)>, | ||
disk2: Sender<(FileTask, Callback<usize>)>, | ||
counter1: Arc<AtomicU64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do counters indicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sequence number for each disk's channel, we can know which one is newer by it
src/env/double_write.rs
Outdated
} | ||
} | ||
|
||
fn read(&self, offset: usize, buf: &mut [u8]) -> IoResult<usize> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there concurrent readers and writers when operating raft logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no concurrent writers, but may have concurrent readers
src/env/double_write.rs
Outdated
} | ||
} | ||
|
||
fn write(&self, offset: usize, content: &[u8]) -> IoResult<usize> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that possible that when we do a write, there is an another thread that's doing read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the design doc, we achieve an eventual consistency on the two disks. Meaning, at a time, the content of the two disks can be different. If the writer is writing disk 1, while the reader read from disk 2, looks like the raft engine would make a wrong action based on the wrong point of view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note the offset is specified. The reader always reads from the newer disk at the point of time. If the reader can read from disk2, it indicates that disk2 must have the data of that offset. It doesn't need to care about the latest write because it's not reading the latest write's offset
src/env/double_write.rs
Outdated
// TODO: read both dir at recovery, maybe no need? cause operations are to both | ||
// disks TODO: consider encryption | ||
|
||
impl<F: FileSystem> HedgedFileSystem<F> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this filesystem thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's thread-safe by trait Sync
. But it occurs to me that we should make two channels send atomically
src/env/double_write.rs
Outdated
let count1 = self.counter1.load(Ordering::Relaxed); | ||
let count2 = self.counter2.load(Ordering::Relaxed); | ||
match count1.cmp(&count2) { | ||
std::cmp::Ordering::Equal => { | ||
if let Some(fd) = self.fd1.read().unwrap().as_ref() { | ||
fd.read(offset, buf) | ||
} else if let Some(fd) = self.fd2.read().unwrap().as_ref() { | ||
fd.read(offset, buf) | ||
} else { | ||
panic!("Both fd1 and fd2 are None"); | ||
} | ||
} | ||
std::cmp::Ordering::Greater => { | ||
self.fd1.read().unwrap().as_ref().unwrap().read(offset, buf) | ||
} | ||
std::cmp::Ordering::Less => { | ||
self.fd2.read().unwrap().as_ref().unwrap().read(offset, buf) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's likely to create two concurrent readers, one reads from disk 1 while another reads from disk 2. Concerns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as long as the disk has the data, then you can read it. Any concerns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two cases:
- offset > file size
- fd is null for one of the disks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Impossible for raft engine logic, How can you read data that you haven't ever written?
- Fd is null means the file is purged, which further means no one would visit the file anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why are you so sure the
offset
is always <=file size
? Is there any contract on the layer above the FileSystem that guarantee it? The upper layerLogFileReader
expose below public API, from which we can pass an arbitrary value tooffset
that might be larger than the filesize.
/// Polls bytes from the file. Stops only when the buffer is filled or
/// reaching the "end of file".
pub fn read_to(&mut self, offset: u64, mut buf: &mut [u8]) -> Result<usize> {
- see the below comment regarding fd for one disk is non-null, while for another disk is null
src/env/double_write.rs
Outdated
match pos { | ||
SeekFrom::Start(offset) => self.offset = offset as usize, | ||
SeekFrom::Current(i) => self.offset = (self.offset as i64 + i) as usize, | ||
SeekFrom::End(i) => self.offset = (self.inner.file_size()? as i64 + i) as usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file size on the two disks can be different, so you get different result on each call of this method. Looks like it will cause problems
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would always return the file size of the newer disk. The result on each call for a being appended file must be monotonic increasing. lt has nothing different compared to the non-hedging file system.
src/env/double_write.rs
Outdated
|
||
pub struct HedgedReader<F: FileSystem> { | ||
inner: Arc<HedgedHandle<F>>, | ||
offset: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't add any concurrency protection here on the offset
, does that mean the offset for the files on two disks are equivalent ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember that from the design doc, you mentioned the content of the two disks can be different. IIUC, in the case of raft logs being purged on one disk while not yet be purged on the other disk, the inner: Arc<HedgedHandle<F>>
can points to one file, and points to nothing in another run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for concurrency protection, cause it's not Sync
. Rust compiler would take care of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Connor1996 could you please reply my 2nd comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's possible to point to one file, and the other points to nothing. Still doesn't matter, if it's purged which means nothing will visit it anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fd may not be meaningful for the disk which is purged, but it's meaningful for the other disk which is not yet purged. Is it possible that due to the in-atomic counters comparison (get counter1;get counter2; compare counter1 and counter2), the reader picks the wrong fd which is null to read data from?
Signed-off-by: Connor1996 <[email protected]>
src/env/double_write.rs
Outdated
|
||
pub fn bootstrap(&self) -> Result<()> { | ||
// catch up diff | ||
let files1 = self.get_files(&self.path1)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that files1 is a set of {f1, f2, f3}, while file2 is {f2, f3, f4} ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since they can be different, how does the catch_up_diff
work on this case?
fn catch_up_diff(&self, fromFiles: Files, toFiles: Files) -> Result<()> {
src/env/double_write.rs
Outdated
pub fn bootstrap(&self) -> Result<()> { | ||
// catch up diff | ||
let files1 = self.get_files(&self.path1)?; | ||
let files2 = self.get_files(&self.path2)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of enable-log-recycle
is enabled, IIUC, file names are reused. Meaning the same file name might hold different raft log content over time. Is it possible that for the same file name f
, the file on disk1 and the one on disk2 hold different contents?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impossible, log-recycle doesn't reuse file name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh my bad, my understanding was wrong. How does log recycling work?
src/env/double_write.rs
Outdated
// choose latest to perform read | ||
let count1 = self.counter1.load(Ordering::Relaxed); | ||
let count2 = self.counter2.load(Ordering::Relaxed); | ||
match count1.cmp(&count2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 3 steps here before reading a file:
- get counter1
- get counter2
- compare counter1 and counter2
The 3 steps together are not atomic. If we get an wrong result that counter1 < counter2 due to the non-atomicity, and the offset > file size for counter1, what will happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't matter.
Let's say disk2 is the slower one, and its file size < offset
- first get s1 = counter1, we can know that s1 >= offset >= file size,
- then get s2 = counter2,
- if s1 <= s2, we read on disk2, because s2 >= s1 >= offset >= file size, disk2 must have the data now
- if s1 > s2, we read on disk1
My overall impression of this PR is that we are leveraging certain undocumented invariants, e.g. sole append-only writer plus one reader instance per thread, in the upper layers to ensure the safety of concurrency in the lower-level HedgedFileSystem. Two drawbacks I can think of
Please correct me if my understanding is wrong. |
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
de998d7
to
6754299
Compare
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
Signed-off-by: Connor1996 <[email protected]>
1d3ccc3
to
5b1d470
Compare
Signed-off-by: Connor1996 <[email protected]>
5b1d470
to
5c7e7b2
Compare
use crossbeam::channel::unbounded; | ||
use crossbeam::channel::Receiver; | ||
use log::info; | ||
use std::fs; | ||
use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; | ||
use std::path::Path; | ||
use std::path::PathBuf; | ||
use std::sync::atomic::AtomicU64; | ||
use std::sync::atomic::Ordering; | ||
use std::sync::Arc; | ||
use std::thread; | ||
use std::thread::JoinHandle; | ||
|
||
use crate::env::log_fd::LogFd; | ||
use crate::env::DefaultFileSystem; | ||
use crate::env::{FileSystem, Handle, Permission, WriteExt}; | ||
use futures::executor::block_on; | ||
use futures::{join, select}; | ||
|
||
mod recover; | ||
mod runner; | ||
mod sender; | ||
mod task; | ||
mod util; | ||
|
||
use runner::TaskRunner; | ||
use sender::HedgedSender; | ||
use task::{ | ||
empty_callback, paired_future_callback, Callback, FutureHandle, SeqTask, Task, TaskRes, | ||
}; | ||
use util::replace_path; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refer to #339, this import format should be reformatted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More uts should be added to HedgeFileSystem.
if t1.is_finished() || t2.is_finished() { | ||
if t1.is_finished() { | ||
t1.join().unwrap(); | ||
} else { | ||
t2.join().unwrap(); | ||
} | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if t1.is_finished() || t2.is_finished() { | |
if t1.is_finished() { | |
t1.join().unwrap(); | |
} else { | |
t2.join().unwrap(); | |
} | |
break; | |
} | |
if t1.is_finished() { | |
t1.join().unwrap(); | |
break; | |
} else if t2.is_finished() { | |
t2.join().unwrap(); | |
break; | |
} |
std::cmp::Ordering::Equal => { | ||
// still need to catch up, but only diff | ||
recover::catch_up_diff(&self.base, files1, files2, false)?; | ||
return Ok(()); | ||
} | ||
std::cmp::Ordering::Less => { | ||
recover::catch_up_diff(&self.base, files2, files1, false)?; | ||
} | ||
std::cmp::Ordering::Greater => { | ||
recover::catch_up_diff(&self.base, files1, files2, false)?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::cmp::Ordering::Equal => { | |
// still need to catch up, but only diff | |
recover::catch_up_diff(&self.base, files1, files2, false)?; | |
return Ok(()); | |
} | |
std::cmp::Ordering::Less => { | |
recover::catch_up_diff(&self.base, files2, files1, false)?; | |
} | |
std::cmp::Ordering::Greater => { | |
recover::catch_up_diff(&self.base, files1, files2, false)?; | |
} | |
std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => { | |
// still need to catch up, but only diff | |
recover::catch_up_diff(&self.base, files1, files2, false)?; | |
} | |
std::cmp::Ordering::Less => { | |
recover::catch_up_diff(&self.base, files2, files1, false)?; | |
} |
And recommend import std::cmp::Ordering
and abbreviate it with Ordering::xxx
.
} | ||
} | ||
std::cmp::Ordering::Greater => { | ||
self.handle1.try_get(&self.base)?.unwrap().read(offset, buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, the error should also be processed ?
@@ -40,6 +40,8 @@ pub struct Config { | |||
/// Default: None | |||
pub spill_dir: Option<String>, | |||
|
|||
pub second_dir: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe backup_dir or mirror_dir
} | ||
} | ||
|
||
pub(crate) fn catch_up_diff( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"if the file in to is not in from, delete it"
pub(crate) fn catch_up_diff( | |
pub(crate) fn synchronize_files( |
let count1 = recover::get_latest_valid_seq(&self.base, &files1)?; | ||
let count2 = recover::get_latest_valid_seq(&self.base, &files2)?; | ||
|
||
match count1.cmp(&count2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_latest_valid_seq
returns the number of log items from latest file? Why is it used to determine the synchronize direction?
|
||
impl HedgedFileSystem { | ||
pub fn new(base: Arc<DefaultFileSystem>, path1: PathBuf, path2: PathBuf) -> Self { | ||
let (tx1, rx1) = unbounded::<(SeqTask, Callback)>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we limit the length of the channel to prevent the OOM, which is the hard limit HedgedFileSystem can use
} | ||
} | ||
|
||
async fn wait(&self, task1: Task, task2: Task) -> IoResult<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn wait(&self, task1: Task, task2: Task) -> IoResult<()> { | |
async fn wait_one(&self, task1: Task, task2: Task) -> IoResult<()> { |
self.sender.state() | ||
} | ||
|
||
async fn wait_handle(&self, task1: Task, task2: Task) -> IoResult<HedgedHandle> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn wait_handle(&self, task1: Task, task2: Task) -> IoResult<HedgedHandle> { | |
async fn wait_one_handle(&self, task1: Task, task2: Task) -> IoResult<HedgedHandle> { |
|
||
impl Drop for HedgedFileSystem { | ||
fn drop(&mut self) { | ||
block_on(self.wait(Task::Stop, Task::Stop)).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it wait for both Stop finished?
// wait 1s | ||
// one disk may be blocked for a long time, | ||
// to avoid block shutdown process for a long time, do not join the threads | ||
// here, only need at least to ensure one thread is exited |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HedgedFileSystem is dropped but its underlying thread may still live, which is not ideal.
Maybe we should abort the pending tasks on slow disk and wait both threads exist.
@@ -0,0 +1,9 @@ | |||
use std::path::{Path, PathBuf}; | |||
|
|||
pub fn replace_path(path: &Path, from: &Path, to: &Path) -> PathBuf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fn can be renamed as replace_prefix? Otherwise it's not obvious that parameter from must be prefix of path.
@@ -0,0 +1,9 @@ | |||
use std::path::{Path, PathBuf}; | |||
|
|||
pub fn replace_path(path: &Path, from: &Path, to: &Path) -> PathBuf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fn can be renamed as replace_prefix? Otherwise it's not obvious that parameter from must be prefix of path.
check_files(&from_files.rewrite_files, &to_files.rewrite_files)?; | ||
} | ||
check_files(&from_files.reserved_files, &to_files.reserved_files)?; | ||
Ok(()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should double check if the files copy are correct by checksum.
let check2 = inner.disk2.len() > get_pause_threshold(); | ||
match (check1, check2) { | ||
(true, true) => { | ||
panic!("Both channels of disk1 and disk2 are full") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When both disks are slow, maybe should not panic but instead fallback to single disk approach.
mut cfg: Config, | ||
file_system: Arc<F>, | ||
mut listeners: Vec<Arc<dyn EventListener>>, | ||
) -> Result<Engine<F, FilePipeLog<F>>> { | ||
cfg.sanitize()?; | ||
file_system.bootstrap()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it can fallback to single disk solution dynamically. The file system may need extra APIs to wait for all pending writes done and then engine can switch to different file_system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
let files1 = recover::get_files(&self.path1)?; | ||
let files2 = recover::get_files(&self.path2)?; | ||
|
||
let count1 = recover::get_latest_valid_seq(&self.base, &files1)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here we only compare the last file's log count to decide which disk is newer?
What if their file counts are different and the older has more?
@Connor1996: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
This PR introduces a hedged file system to double-write every io operation. The HedgedFileSystem manages two directories on different cloud disks. All operations of the interface are serialized by one channel for each disk and wait until either one of the channels is consumed. With that, if one of the disk's io is slow for a long time, the other can still serve the operations without any delay. And once the disk comes back to normal, it can catch up with the accumulated operations record in the channel. Then the states of the two disks can be synced again.
Close #342