Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add snapshot walop #25755

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ rand.workspace = true
secrecy.workspace = true
serde_json.workspace = true
sha2.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
Expand Down
39 changes: 36 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs},
write_buffer::{
check_mem_and_force_snapshot_loop, persisted_files::PersistedFiles, WriteBufferImpl,
WriteBufferImplArgs,
},
WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
Expand All @@ -37,7 +40,7 @@ use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{num::NonZeroUsize, sync::Arc};
use std::{path::Path, str::FromStr};
use std::{path::Path, str::FromStr, time::Duration};
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::time::Instant;
Expand Down Expand Up @@ -287,6 +290,16 @@ pub struct Config {
action
)]
pub meta_cache_eviction_interval: humantime::Duration,

/// Threshold for internal buffer, can be either percentage or absolute value.
/// eg: 70% or 100000
#[clap(
long = "force-snapshot-mem-threshold",
env = "INFLUXDB3_FORCE_SNAPSHOT_MEM_THRESHOLD",
default_value = "70%",
action
)]
pub force_snapshot_mem_threshold: MemorySize,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -441,7 +454,7 @@ pub async fn command(config: Config) -> Result<()> {

let persister = Arc::new(Persister::new(
Arc::clone(&object_store),
config.host_identifier_prefix,
&config.host_identifier_prefix,
));
let wal_config = WalConfig {
gen1_duration: config.gen1_duration,
Expand Down Expand Up @@ -485,6 +498,13 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;

background_buffer_checker(
config.force_snapshot_mem_threshold.bytes(),
&write_buffer_impl,
)
.await;

debug!("setting up telemetry store");
let telemetry_store = setup_telemetry_store(
&config.object_store_config,
catalog.instance_id(),
Expand Down Expand Up @@ -538,6 +558,19 @@ pub async fn command(config: Config) -> Result<()> {
Ok(())
}

async fn background_buffer_checker(
mem_threshold_bytes: usize,
write_buffer_impl: &Arc<WriteBufferImpl>,
) {
debug!(mem_threshold_bytes, "setting up background buffer checker");
check_mem_and_force_snapshot_loop(
Arc::clone(write_buffer_impl),
mem_threshold_bytes,
Duration::from_secs(10),
)
.await;
}

async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_cache/src/last_cache/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ impl LastCacheProvider {
}
}
WalOp::Catalog(_) => (),
WalOp::ForcedSnapshot(_) => (),
WalOp::Snapshot(_) => (),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions influxdb3_wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ tokio.workspace = true

[lints]
workspace = true

[dev-dependencies]
test-log.workspace = true
6 changes: 3 additions & 3 deletions influxdb3_wal/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub fn wal_contents(
max_timestamp_ns,
wal_file_number: WalFileSequenceNumber::new(wal_file_number),
ops: ops.into_iter().collect(),
snapshot: None,
}
}

Expand All @@ -29,13 +28,14 @@ pub fn wal_contents_with_snapshot(
ops: impl IntoIterator<Item = WalOp>,
snapshot: SnapshotDetails,
) -> WalContents {
let mut wal_ops: Vec<WalOp> = ops.into_iter().collect();
wal_ops.push(WalOp::Snapshot(snapshot));
WalContents {
persist_timestamp_ms: 0,
min_timestamp_ns,
max_timestamp_ns,
wal_file_number: WalFileSequenceNumber::new(wal_file_number),
ops: ops.into_iter().collect(),
snapshot: Some(snapshot),
ops: wal_ops,
}
}

Expand Down
133 changes: 97 additions & 36 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod object_store;
pub mod serialize;
mod snapshot_tracker;

use crate::snapshot_tracker::SnapshotInfo;
use async_trait::async_trait;
use data_types::Timestamp;
use hashbrown::HashMap;
Expand All @@ -17,7 +16,6 @@ use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
use observability_deps::tracing::error;
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -78,16 +76,25 @@ pub trait Wal: Debug + Send + Sync + 'static {
/// permit to release when done. The caller is responsible for cleaning up the wal.
async fn flush_buffer(
&self,
) -> Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)>;
) -> Option<(oneshot::Receiver<SnapshotDetails>, OwnedSemaphorePermit)>;

/// This is similar to flush buffer but it allows for snapshot to be done immediately rather
/// than waiting for query buffer to get to a certain capacity. It would be nicer to not
/// require an external interface to force flush the buffer however to decide whether to
/// snapshot immediately is based on query buffer's capacity which is not visible to snapshot
/// tracker which usually decides whether to snapshot or not. To bubble up the query buffer's
/// size or even to indicate that buffer is full we need a shared state and because flushing
/// the buffer is in hot path, this additional method acts as a compromise as this can be
/// called by making the decision to force the snapshot externally to `WalObjectStore` (that
/// implements this trait)
async fn force_flush_buffer(
&self,
) -> Option<(oneshot::Receiver<SnapshotDetails>, OwnedSemaphorePermit)>;

/// Removes any snapshot wal files
async fn cleanup_snapshot(
&self,
snapshot_details: SnapshotInfo,
snapshot_details: SnapshotDetails,
snapshot_permit: OwnedSemaphorePermit,
);

Expand All @@ -97,25 +104,58 @@ pub trait Wal: Debug + Send + Sync + 'static {
/// Returns the last persisted wal file sequence number
async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber;

/// Get snapshot details based on conditions
async fn get_snapshot_details(&self, force_snapshot: bool) -> Option<SnapshotDetails>;

/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);

async fn flush_buffer_and_cleanup_snapshot(self: Arc<Self>) {
let maybe_snapshot = self.flush_buffer().await;
if let Some((snapshot_complete, permit)) = maybe_snapshot {
self.cleanup_after_snapshot(snapshot_complete, permit).await;
}
}

async fn cleanup_after_snapshot(
self: Arc<Self>,
snapshot_complete: oneshot::Receiver<SnapshotDetails>,
permit: OwnedSemaphorePermit,
) {
// handle snapshot cleanup outside of the flush loop
let arcd_wal = Arc::clone(&self);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
arcd_wal.cleanup_snapshot(snapshot_details, permit).await;
});
}
}

/// When the WAL persists a file with buffered ops, the contents are sent to this
/// notifier so that the data can be loaded into the in memory buffer and caches.
#[async_trait]
pub trait WalFileNotifier: Debug + Send + Sync + 'static {
/// Notify the handler that a new WAL file has been persisted with the given contents.
fn notify(&self, write: WalContents);

/// Notify the handler that a new WAL file has been persisted with the given contents and tell
/// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete.
async fn notify_and_snapshot(
async fn notify(
&self,
write: WalContents,
snapshot_details: SnapshotDetails,
) -> oneshot::Receiver<SnapshotDetails>;

do_snapshot: bool,
) -> Option<oneshot::Receiver<SnapshotDetails>>;

// /// Notify the handler that a new WAL file has been persisted with the given contents and tell
// /// it to snapshot the data. The returned receiver will be signalled when the snapshot is complete.
// async fn notify_and_snapshot(
// &self,
// write: WalContents,
// snapshot_details: SnapshotDetails,
// ) -> oneshot::Receiver<SnapshotDetails>;

// /// Snapshot only, currently used to force the snapshot
// async fn snapshot(
// &self,
// snapshot_details: SnapshotDetails,
// ) -> oneshot::Receiver<SnapshotDetails>;
//
fn as_any(&self) -> &dyn Any;
}

Expand Down Expand Up @@ -215,6 +255,8 @@ impl Default for Gen1Duration {
pub enum WalOp {
Write(WriteBatch),
Catalog(OrderedCatalogBatch),
ForcedSnapshot(SnapshotDetails),
Snapshot(SnapshotDetails),
}

impl PartialOrd for WalOp {
Expand All @@ -237,6 +279,18 @@ impl Ord for WalOp {

// For two Write ops, consider them equal
(WalOp::Write(_), WalOp::Write(_)) => Ordering::Equal,
(WalOp::Write(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::Write(_), WalOp::Snapshot(_)) => Ordering::Equal,
(WalOp::Catalog(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::Catalog(_), WalOp::Snapshot(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::Write(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::Catalog(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::ForcedSnapshot(_), WalOp::Snapshot(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::Write(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::Catalog(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::ForcedSnapshot(_)) => Ordering::Equal,
(WalOp::Snapshot(_), WalOp::Snapshot(_)) => Ordering::Equal,
}
}
}
Expand All @@ -246,13 +300,17 @@ impl WalOp {
match self {
WalOp::Write(w) => Some(w),
WalOp::Catalog(_) => None,
WalOp::ForcedSnapshot(_) => None,
WalOp::Snapshot(_) => None,
}
}

pub fn as_catalog(&self) -> Option<&CatalogBatch> {
match self {
WalOp::Write(_) => None,
WalOp::Catalog(c) => Some(&c.catalog),
WalOp::ForcedSnapshot(_) => None,
WalOp::Snapshot(_) => None,
}
}
}
Expand Down Expand Up @@ -801,13 +859,29 @@ pub struct WalContents {
pub wal_file_number: WalFileSequenceNumber,
/// The operations contained in the WAL file
pub ops: Vec<WalOp>,
/// If present, the buffer should be snapshot after the contents of this file are loaded.
pub snapshot: Option<SnapshotDetails>,
}

impl WalContents {
pub fn is_empty(&self) -> bool {
self.ops.is_empty() && self.snapshot.is_none()
self.ops.is_empty()
}

pub fn add_snapshot_op(&mut self, snapshot_details: SnapshotDetails) {
self.ops.push(WalOp::Snapshot(snapshot_details));
}

pub fn add_force_snapshot_op(&mut self, snapshot_details: SnapshotDetails) {
self.ops.push(WalOp::ForcedSnapshot(snapshot_details));
}

pub fn find_snapshot_details(&self) -> Option<SnapshotDetails> {
// There should be only one snapshot in a wal file?
// should assert that
self.ops.iter().find_map(|item| match item {
WalOp::Snapshot(details) => Some(*details),
WalOp::ForcedSnapshot(details) => Some(*details),
_ => None,
})
}
}

Expand Down Expand Up @@ -876,6 +950,8 @@ pub struct SnapshotDetails {
pub snapshot_sequence_number: SnapshotSequenceNumber,
/// All chunks with data before this time can be snapshot and persisted
pub end_time_marker: i64,
/// All wal files with a sequence number >= to this can be deleted once snapshotting is complete
pub first_wal_sequence_number: WalFileSequenceNumber,
/// All wal files with a sequence number <= to this can be deleted once snapshotting is complete
pub last_wal_sequence_number: WalFileSequenceNumber,
}
Expand All @@ -890,23 +966,8 @@ pub fn background_wal_flush<W: Wal>(

loop {
interval.tick().await;

let cleanup_after_snapshot = wal.flush_buffer().await;

// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) =
cleanup_after_snapshot
{
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
}
let wal = Arc::clone(&wal);
wal.flush_buffer_and_cleanup_snapshot().await;
}
})
}
Loading