Skip to content

Commit d24d593

Browse files
feat: add concurrency limit for WAL replay
WAL replay currently loads _all_ WAL files concurrently running into OOM. This commit adds a CLI parameter `--wal-replay-concurrency-limit` that would allow the user to set a lower limit and run WAL replay again. closes: #26481
1 parent be25c6f commit d24d593

File tree

10 files changed

+142
-82
lines changed

10 files changed

+142
-82
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

influxdb3/src/commands/serve.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,12 @@ pub struct Config {
413413
hide = true
414414
)]
415415
pub tcp_listener_file_path: Option<PathBuf>,
416+
417+
#[clap(
418+
long = "wal-replay-concurrency-limit",
419+
env = "INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT"
420+
)]
421+
pub wal_replay_concurrency_limit: Option<usize>,
416422
}
417423

418424
/// The minimum version of TLS to use for InfluxDB
@@ -692,6 +698,7 @@ pub async fn command(config: Config) -> Result<()> {
692698
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
693699
query_file_limit: config.query_file_limit,
694700
shutdown: shutdown_manager.register(),
701+
wal_replay_concurrency_limit: config.wal_replay_concurrency_limit,
695702
})
696703
.await
697704
.map_err(|e| Error::WriteBufferInit(e.into()))?;

influxdb3_processing_engine/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,7 @@ mod tests {
10111011
snapshotted_wal_files_to_keep: 10,
10121012
query_file_limit: None,
10131013
shutdown: shutdown.register(),
1014+
wal_replay_concurrency_limit: Some(1),
10141015
})
10151016
.await
10161017
.unwrap();

influxdb3_server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,7 @@ mod tests {
887887
snapshotted_wal_files_to_keep: 100,
888888
query_file_limit: None,
889889
shutdown: shutdown_manager.register(),
890+
wal_replay_concurrency_limit: Some(1),
890891
},
891892
)
892893
.await

influxdb3_server/src/query_executor/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,7 @@ mod tests {
867867
snapshotted_wal_files_to_keep: 1,
868868
query_file_limit,
869869
shutdown: shutdown.register(),
870+
wal_replay_concurrency_limit: Some(1),
870871
})
871872
.await
872873
.unwrap();

influxdb3_server/tests/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ impl TestService {
259259
snapshotted_wal_files_to_keep: 100,
260260
query_file_limit: None,
261261
shutdown: ShutdownManager::new_testing().register(),
262+
wal_replay_concurrency_limit: Some(1),
262263
})
263264
.await
264265
.unwrap();

influxdb3_wal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ bitcode.workspace = true
2323
bytes.workspace = true
2424
byteorder.workspace = true
2525
crc32fast.workspace = true
26+
futures.workspace = true
2627
futures-util.workspace = true
2728
hashbrown.workspace = true
2829
humantime.workspace = true

influxdb3_wal/src/object_store.rs

Lines changed: 108 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,25 @@ use iox_time::TimeProvider;
1313
use object_store::path::{Path, PathPart};
1414
use object_store::{ObjectStore, PutMode, PutOptions, PutPayload};
1515
use observability_deps::tracing::{debug, error, info};
16-
use std::time::Duration;
16+
use std::time::{Duration, Instant};
1717
use std::{str::FromStr, sync::Arc};
1818
use tokio::sync::Mutex;
1919
use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot};
2020

21+
#[derive(Debug)]
22+
pub struct CreateWalObjectStoreArgs<'a> {
23+
pub time_provider: Arc<dyn TimeProvider>,
24+
pub object_store: Arc<dyn ObjectStore>,
25+
pub node_identifier_prefix: &'a str,
26+
pub file_notifier: Arc<dyn WalFileNotifier>,
27+
pub config: WalConfig,
28+
pub last_wal_sequence_number: Option<WalFileSequenceNumber>,
29+
pub last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
30+
pub snapshotted_wal_files_to_keep: u64,
31+
pub shutdown: ShutdownToken,
32+
pub wal_replay_concurrency_limit: Option<usize>,
33+
}
34+
2135
#[derive(Debug)]
2236
pub struct WalObjectStore {
2337
object_store: Arc<dyn ObjectStore>,
@@ -36,18 +50,21 @@ impl WalObjectStore {
3650
/// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that
3751
/// exist in the WAL files that haven't been cleaned up yet.
3852
#[allow(clippy::too_many_arguments)]
39-
pub async fn new(
40-
time_provider: Arc<dyn TimeProvider>,
41-
object_store: Arc<dyn ObjectStore>,
42-
node_identifier_prefix: impl Into<String> + Send,
43-
file_notifier: Arc<dyn WalFileNotifier>,
44-
config: WalConfig,
45-
last_wal_sequence_number: Option<WalFileSequenceNumber>,
46-
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
47-
snapshotted_wal_files_to_keep: u64,
48-
shutdown: ShutdownToken,
53+
pub async fn new<'a>(
54+
CreateWalObjectStoreArgs {
55+
time_provider,
56+
object_store,
57+
node_identifier_prefix,
58+
file_notifier,
59+
config,
60+
last_wal_sequence_number,
61+
last_snapshot_sequence_number,
62+
snapshotted_wal_files_to_keep,
63+
shutdown,
64+
wal_replay_concurrency_limit,
65+
}: CreateWalObjectStoreArgs<'a>,
4966
) -> Result<Arc<Self>, crate::Error> {
50-
let node_identifier = node_identifier_prefix.into();
67+
let node_identifier = node_identifier_prefix.to_string();
5168
let all_wal_file_paths =
5269
load_all_wal_file_paths(Arc::clone(&object_store), node_identifier.clone()).await?;
5370
let flush_interval = config.flush_interval;
@@ -64,8 +81,12 @@ impl WalObjectStore {
6481
shutdown.clone_cancellation_token(),
6582
);
6683

67-
wal.replay(last_wal_sequence_number, &all_wal_file_paths)
68-
.await?;
84+
wal.replay(
85+
last_wal_sequence_number,
86+
&all_wal_file_paths,
87+
wal_replay_concurrency_limit,
88+
)
89+
.await?;
6990
let wal = Arc::new(wal);
7091
background_wal_flush(Arc::clone(&wal), flush_interval, shutdown);
7192

@@ -128,8 +149,10 @@ impl WalObjectStore {
128149
&self,
129150
last_wal_sequence_number: Option<WalFileSequenceNumber>,
130151
all_wal_file_paths: &[Path],
152+
concurrency_limit: Option<usize>,
131153
) -> crate::Result<()> {
132-
debug!("replaying");
154+
let replay_start = Instant::now();
155+
info!("replaying WAL files");
133156
let paths = self.load_existing_wal_file_paths(last_wal_sequence_number, all_wal_file_paths);
134157

135158
let last_snapshot_sequence_number = {
@@ -148,72 +171,84 @@ impl WalObjectStore {
148171
Ok(verify_file_type_and_deserialize(file_bytes)?)
149172
}
150173

151-
let mut replay_tasks = Vec::new();
152-
for path in paths {
153-
let object_store = Arc::clone(&self.object_store);
154-
replay_tasks.push(tokio::spawn(get_contents(object_store, path)));
155-
}
156-
157-
for wal_contents in replay_tasks {
158-
let wal_contents = wal_contents.await??;
174+
// Load N files concurrently and then replay them immediately before loading the next batch
175+
// of N files. Since replaying has to happen _in order_ only loading the files part is
176+
// concurrent, replaying the WAL file itself is done sequentially based on the original
177+
// order (i.e paths, which is already sorted)
178+
for batched in paths.chunks(concurrency_limit.unwrap_or(usize::MAX)) {
179+
let batched_start = Instant::now();
180+
let mut results = Vec::with_capacity(batched.len());
181+
for path in batched {
182+
let object_store = Arc::clone(&self.object_store);
183+
results.push(tokio::spawn(get_contents(object_store, path.clone())));
184+
}
159185

160-
// add this to the snapshot tracker, so we know what to clear out later if the replay
161-
// was a wal file that had a snapshot
162-
self.flush_buffer
163-
.lock()
164-
.await
165-
.replay_wal_period(WalPeriod::new(
166-
wal_contents.wal_file_number,
167-
Timestamp::new(wal_contents.min_timestamp_ns),
168-
Timestamp::new(wal_contents.max_timestamp_ns),
169-
));
170-
171-
info!(
172-
n_ops = %wal_contents.ops.len(),
173-
min_timestamp_ns = %wal_contents.min_timestamp_ns,
174-
max_timestamp_ns = %wal_contents.max_timestamp_ns,
175-
wal_file_number = %wal_contents.wal_file_number,
176-
snapshot_details = ?wal_contents.snapshot,
177-
"replaying WAL file"
178-
);
186+
for wal_contents in results {
187+
let wal_contents = wal_contents.await??;
188+
info!(
189+
n_ops = %wal_contents.ops.len(),
190+
min_timestamp_ns = %wal_contents.min_timestamp_ns,
191+
max_timestamp_ns = %wal_contents.max_timestamp_ns,
192+
wal_file_number = %wal_contents.wal_file_number,
193+
snapshot_details = ?wal_contents.snapshot,
194+
"replaying WAL file with details"
195+
);
179196

180-
match wal_contents.snapshot {
181-
// This branch uses so much time
182-
None => self.file_notifier.notify(Arc::new(wal_contents)).await,
183-
Some(snapshot_details) => {
184-
let snapshot_info = {
185-
let mut buffer = self.flush_buffer.lock().await;
186-
187-
match buffer.snapshot_tracker.snapshot(snapshot_details.forced) {
188-
None => None,
189-
Some(info) => {
190-
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
191-
let permit = semaphore.acquire_owned().await.unwrap();
192-
Some((info, permit))
197+
// add this to the snapshot tracker, so we know what to clear out later if the replay
198+
// was a wal file that had a snapshot
199+
self.flush_buffer
200+
.lock()
201+
.await
202+
.replay_wal_period(WalPeriod::new(
203+
wal_contents.wal_file_number,
204+
Timestamp::new(wal_contents.min_timestamp_ns),
205+
Timestamp::new(wal_contents.max_timestamp_ns),
206+
));
207+
208+
match wal_contents.snapshot {
209+
// This branch uses so much time
210+
None => self.file_notifier.notify(Arc::new(wal_contents)).await,
211+
Some(snapshot_details) => {
212+
let snapshot_info = {
213+
let mut buffer = self.flush_buffer.lock().await;
214+
215+
match buffer.snapshot_tracker.snapshot(snapshot_details.forced) {
216+
None => None,
217+
Some(info) => {
218+
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
219+
let permit = semaphore.acquire_owned().await.unwrap();
220+
Some((info, permit))
221+
}
193222
}
223+
};
224+
if snapshot_details.snapshot_sequence_number
225+
<= last_snapshot_sequence_number
226+
{
227+
// Instead just notify about the WAL, as this snapshot has already been taken
228+
// and WAL files may have been cleared.
229+
self.file_notifier.notify(Arc::new(wal_contents)).await;
230+
} else {
231+
let snapshot_done = self
232+
.file_notifier
233+
.notify_and_snapshot(Arc::new(wal_contents), snapshot_details)
234+
.await;
235+
let details = snapshot_done.await.unwrap();
236+
assert_eq!(snapshot_details, details);
194237
}
195-
};
196-
if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number {
197-
// Instead just notify about the WAL, as this snapshot has already been taken
198-
// and WAL files may have been cleared.
199-
self.file_notifier.notify(Arc::new(wal_contents)).await;
200-
} else {
201-
let snapshot_done = self
202-
.file_notifier
203-
.notify_and_snapshot(Arc::new(wal_contents), snapshot_details)
204-
.await;
205-
let details = snapshot_done.await.unwrap();
206-
assert_eq!(snapshot_details, details);
207-
}
208238

209-
// if the info is there, we have wal files to delete
210-
if let Some((snapshot_info, snapshot_permit)) = snapshot_info {
211-
self.cleanup_snapshot(snapshot_info, snapshot_permit).await;
239+
// if the info is there, we have wal files to delete
240+
if let Some((snapshot_info, snapshot_permit)) = snapshot_info {
241+
self.cleanup_snapshot(snapshot_info, snapshot_permit).await;
242+
}
212243
}
213244
}
214245
}
246+
let batched_end = batched_start.elapsed();
247+
debug!(time_taken = ?batched_end, batch_len = ?batched.len(), "replaying batch completed");
215248
}
216249

250+
// this is useful to know at the info level
251+
info!(time_taken = ?replay_start.elapsed(), "completed replaying wal files");
217252
Ok(())
218253
}
219254

@@ -1216,6 +1251,7 @@ mod tests {
12161251
Path::from("my_host/wal/00000000001.wal"),
12171252
Path::from("my_host/wal/00000000002.wal"),
12181253
],
1254+
None,
12191255
)
12201256
.await
12211257
.unwrap();
@@ -1364,7 +1400,7 @@ mod tests {
13641400
vec![Path::from("my_host/wal/00000000003.wal")]
13651401
);
13661402
replay_wal
1367-
.replay(None, &[Path::from("my_host/wal/00000000003.wal")])
1403+
.replay(None, &[Path::from("my_host/wal/00000000003.wal")], None)
13681404
.await
13691405
.unwrap();
13701406
let replay_notifier = replay_notifier

influxdb3_wal/src/snapshot_tracker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
77
use crate::{Gen1Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber};
88
use data_types::Timestamp;
9-
use observability_deps::tracing::{debug, info};
9+
use observability_deps::tracing::{debug, info, trace};
1010

1111
/// A struct that tracks the WAL periods (files if using object store) and decides when to snapshot the WAL.
1212
#[derive(Debug)]
@@ -207,7 +207,7 @@ impl WalPeriod {
207207
min_time: Timestamp,
208208
max_time: Timestamp,
209209
) -> Self {
210-
info!(
210+
trace!(
211211
?min_time,
212212
?max_time,
213213
?wal_file_number,

0 commit comments

Comments
 (0)