Skip to content

Commit

Permalink
feat: use SnapshotOp
Browse files Browse the repository at this point in the history
  • Loading branch information
praveen-influx committed Jan 7, 2025
1 parent 6619e9e commit 280e40e
Show file tree
Hide file tree
Showing 11 changed files with 441 additions and 328 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions influxdb3_cache/src/last_cache/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ impl LastCacheProvider {
}
}
WalOp::Catalog(_) => (),
WalOp::ForcedSnapshot(_) => (),
WalOp::Snapshot(_) => (),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions influxdb3_wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ tokio.workspace = true

[lints]
workspace = true

[dev-dependencies]
test-log.workspace = true
6 changes: 3 additions & 3 deletions influxdb3_wal/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub fn wal_contents(
max_timestamp_ns,
wal_file_number: WalFileSequenceNumber::new(wal_file_number),
ops: ops.into_iter().collect(),
snapshot: None,
}
}

Expand All @@ -29,13 +28,14 @@ pub fn wal_contents_with_snapshot(
ops: impl IntoIterator<Item = WalOp>,
snapshot: SnapshotDetails,
) -> WalContents {
let mut wal_ops: Vec<WalOp> = ops.into_iter().collect();
wal_ops.push(WalOp::Snapshot(snapshot));
WalContents {
persist_timestamp_ms: 0,
min_timestamp_ns,
max_timestamp_ns,
wal_file_number: WalFileSequenceNumber::new(wal_file_number),
ops: ops.into_iter().collect(),
snapshot: Some(snapshot),
ops: wal_ops,
}
}

Expand Down
114 changes: 77 additions & 37 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod object_store;
pub mod serialize;
mod snapshot_tracker;

use crate::snapshot_tracker::SnapshotInfo;
use async_trait::async_trait;
use data_types::Timestamp;
use hashbrown::HashMap;
Expand Down Expand Up @@ -77,16 +76,25 @@ pub trait Wal: Debug + Send + Sync + 'static {
/// permit to release when done. The caller is responsible for cleaning up the wal.
async fn flush_buffer(
&self,
) -> Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)>;
) -> Option<(oneshot::Receiver<SnapshotDetails>, OwnedSemaphorePermit)>;

/// This is similar to flush buffer but it allows for snapshot to be done immediately rather
/// than waiting for query buffer to get to a certain capacity. It would be nicer to not
/// require an external interface to force flush the buffer however to decide whether to
/// snapshot immediately is based on query buffer's capacity which is not visible to snapshot
/// tracker which usually decides whether to snapshot or not. To bubble up the query buffer's
/// size or even to indicate that buffer is full we need a shared state and because flushing
/// the buffer is in hot path, this additional method acts as a compromise as this can be
/// called by making the decision to force the snapshot externally to `WalObjectStore` (that
/// implements this trait)
async fn force_flush_buffer(
&self,
) -> Option<(oneshot::Receiver<SnapshotDetails>, OwnedSemaphorePermit)>;

/// Removes any snapshot wal files
async fn cleanup_snapshot(
&self,
snapshot_details: SnapshotInfo,
snapshot_details: SnapshotDetails,
snapshot_permit: OwnedSemaphorePermit,
);

Expand All @@ -96,37 +104,29 @@ pub trait Wal: Debug + Send + Sync + 'static {
/// Returns the last persisted wal file sequence number
async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber;

/// Returns the snapshot info, if force snapshot is set it avoids checking
/// certain cases and returns snapshot info leaving only the last wal period
async fn snapshot_info_and_permit(
&self,
force_snapshot: bool,
) -> Option<(SnapshotInfo, OwnedSemaphorePermit)>;
/// Get snapshot details based on conditions
async fn get_snapshot_details(&self, force_snapshot: bool) -> Option<SnapshotDetails>;

/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);

async fn flush_buffer_and_cleanup_snapshot(self: Arc<Self>) {
let maybe_snapshot = self.flush_buffer().await;
if let Some((snapshot_complete, snapshot_info, permit)) = maybe_snapshot {
self.cleanup_after_snapshot(snapshot_complete, snapshot_info, permit)
.await;
if let Some((snapshot_complete, permit)) = maybe_snapshot {
self.cleanup_after_snapshot(snapshot_complete, permit).await;
}
}

async fn cleanup_after_snapshot(
self: Arc<Self>,
snapshot_complete: oneshot::Receiver<SnapshotDetails>,
snapshot_info: SnapshotInfo,
permit: OwnedSemaphorePermit,
) {
// handle snapshot cleanup outside of the flush loop
let arcd_wal = Arc::clone(&self);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

arcd_wal.cleanup_snapshot(snapshot_info, permit).await;
arcd_wal.cleanup_snapshot(snapshot_details, permit).await;
});
}
}
Expand All @@ -136,22 +136,26 @@ pub trait Wal: Debug + Send + Sync + 'static {
#[async_trait]
pub trait WalFileNotifier: Debug + Send + Sync + 'static {
/// Notify the handler that a new WAL file has been persisted with the given contents.
fn notify(&self, write: WalContents);

/// Notify the handler that a new WAL file has been persisted with the given contents and tell
/// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete.
async fn notify_and_snapshot(
async fn notify(
&self,
write: WalContents,
snapshot_details: SnapshotDetails,
) -> oneshot::Receiver<SnapshotDetails>;

/// Snapshot only, currently used to force the snapshot
async fn snapshot(
&self,
snapshot_details: SnapshotDetails,
) -> oneshot::Receiver<SnapshotDetails>;

do_snapshot: bool,
) -> Option<oneshot::Receiver<SnapshotDetails>>;

// /// Notify the handler that a new WAL file has been persisted with the given contents and tell
// /// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete.
// async fn notify_and_snapshot(
// &self,
// write: WalContents,
// snapshot_details: SnapshotDetails,
// ) -> oneshot::Receiver<SnapshotDetails>;

// /// Snapshot only, currently used to force the snapshot
// async fn snapshot(
// &self,
// snapshot_details: SnapshotDetails,
// ) -> oneshot::Receiver<SnapshotDetails>;
//
fn as_any(&self) -> &dyn Any;
}

Expand Down Expand Up @@ -251,6 +255,8 @@ impl Default for Gen1Duration {
pub enum WalOp {
Write(WriteBatch),
Catalog(OrderedCatalogBatch),
ForcedSnapshot(SnapshotDetails),
Snapshot(SnapshotDetails),
}

impl PartialOrd for WalOp {
Expand All @@ -273,6 +279,18 @@ impl Ord for WalOp {

// For two Write ops, consider them equal
(WalOp::Write(_), WalOp::Write(_)) => Ordering::Equal,
(WalOp::Write(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::Write(_), WalOp::Snapshot(_)) => Ordering::Equal,
(WalOp::Catalog(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::Catalog(_), WalOp::Snapshot(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::Write(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::Catalog(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::Snapshot(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::Write(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::Catalog(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::Snapshot(_)) => Ordering::Equal,
}
}
}
Expand All @@ -282,13 +300,17 @@ impl WalOp {
match self {
WalOp::Write(w) => Some(w),
WalOp::Catalog(_) => None,
WalOp::ForcedSnapshot(_) => None,
WalOp::Snapshot(_) => None,
}
}

pub fn as_catalog(&self) -> Option<&CatalogBatch> {
match self {
WalOp::Write(_) => None,
WalOp::Catalog(c) => Some(&c.catalog),
WalOp::ForcedSnapshot(_) => None,
WalOp::Snapshot(_) => None,
}
}
}
Expand Down Expand Up @@ -837,13 +859,29 @@ pub struct WalContents {
pub wal_file_number: WalFileSequenceNumber,
/// The operations contained in the WAL file
pub ops: Vec<WalOp>,
/// If present, the buffer should be snapshot after the contents of this file are loaded.
pub snapshot: Option<SnapshotDetails>,
}

impl WalContents {
pub fn is_empty(&self) -> bool {
self.ops.is_empty() && self.snapshot.is_none()
self.ops.is_empty()
}

pub fn add_snapshot_op(&mut self, snapshot_details: SnapshotDetails) {
self.ops.push(WalOp::Snapshot(snapshot_details));
}

pub fn add_force_snapshot_op(&mut self, snapshot_details: SnapshotDetails) {
self.ops.push(WalOp::ForcedSnapshot(snapshot_details));
}

pub fn find_snapshot_details(&self) -> Option<SnapshotDetails> {
// There should be only one snapshot in a wal file?
// should assert that
self.ops.iter().find_map(|item| match item {
WalOp::Snapshot(details) => Some(*details),
WalOp::ForcedSnapshot(details) => Some(*details),
_ => None,
})
}
}

Expand Down Expand Up @@ -912,6 +950,8 @@ pub struct SnapshotDetails {
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// All chunks with data before this time can be snapshot and persisted
pub end_time_marker: i64,
/// All wal files with a sequence number >= to this can be deleted once snapshotting is complete
pub first_wal_sequence_number: WalFileSequenceNumber,
/// All wal files with a sequence number <= to this can be deleted once snapshotting is complete
pub last_wal_sequence_number: WalFileSequenceNumber,
}
Expand Down
Loading

0 comments on commit 280e40e

Please sign in to comment.