Skip to content

Commit 29a73cd

Browse files
feat: add concurrency limit for WAL replay
WAL replay currently loads _all_ WAL files concurrently running into OOM. This commit adds a CLI parameter `--wal-replay-concurrency-limit` that would allow the user to set a lower limit and run WAL replay again. closes: #26481
1 parent be25c6f commit 29a73cd

File tree

11 files changed

+146
-83
lines changed

11 files changed

+146
-83
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

influxdb3/src/commands/serve.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,12 @@ pub struct Config {
413413
hide = true
414414
)]
415415
pub tcp_listener_file_path: Option<PathBuf>,
416+
417+
#[clap(
418+
long = "wal-replay-concurrency-limit",
419+
env = "INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT"
420+
)]
421+
pub wal_replay_concurrency_limit: Option<usize>,
416422
}
417423

418424
/// The minimum version of TLS to use for InfluxDB
@@ -692,6 +698,7 @@ pub async fn command(config: Config) -> Result<()> {
692698
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
693699
query_file_limit: config.query_file_limit,
694700
shutdown: shutdown_manager.register(),
701+
wal_replay_concurrency_limit: config.wal_replay_concurrency_limit,
695702
})
696703
.await
697704
.map_err(|e| Error::WriteBufferInit(e.into()))?;

influxdb3/src/help/serve_all.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ Examples:
109109
--wal-max-write-buffer-size <SIZE>
110110
Max write requests in buffer [default: 100000]
111111
[env: INFLUXDB3_WAL_MAX_WRITE_BUFFER_SIZE=]
112+
--wal-replay-concurrency-limit <LIMIT>
113+
Concurrency limit during WAL replay [default: no_limit]
114+
If replay runs into OOM, set this to a lower number eg. 10
115+
[env: INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT=]
112116
--snapshotted-wal-files-to-keep <N>
113117
Number of snapshotted WAL files to retain [default: 300]
114118
[env: INFLUXDB3_NUM_WAL_FILES_TO_KEEP=]

influxdb3_processing_engine/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,6 +1011,7 @@ mod tests {
10111011
snapshotted_wal_files_to_keep: 10,
10121012
query_file_limit: None,
10131013
shutdown: shutdown.register(),
1014+
wal_replay_concurrency_limit: Some(1),
10141015
})
10151016
.await
10161017
.unwrap();

influxdb3_server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,7 @@ mod tests {
887887
snapshotted_wal_files_to_keep: 100,
888888
query_file_limit: None,
889889
shutdown: shutdown_manager.register(),
890+
wal_replay_concurrency_limit: Some(1),
890891
},
891892
)
892893
.await

influxdb3_server/src/query_executor/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -867,6 +867,7 @@ mod tests {
867867
snapshotted_wal_files_to_keep: 1,
868868
query_file_limit,
869869
shutdown: shutdown.register(),
870+
wal_replay_concurrency_limit: Some(1),
870871
})
871872
.await
872873
.unwrap();

influxdb3_server/tests/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ impl TestService {
259259
snapshotted_wal_files_to_keep: 100,
260260
query_file_limit: None,
261261
shutdown: ShutdownManager::new_testing().register(),
262+
wal_replay_concurrency_limit: Some(1),
262263
})
263264
.await
264265
.unwrap();

influxdb3_wal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ bitcode.workspace = true
2323
bytes.workspace = true
2424
byteorder.workspace = true
2525
crc32fast.workspace = true
26+
futures.workspace = true
2627
futures-util.workspace = true
2728
hashbrown.workspace = true
2829
humantime.workspace = true

influxdb3_wal/src/object_store.rs

Lines changed: 108 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,25 @@ use iox_time::TimeProvider;
1313
use object_store::path::{Path, PathPart};
1414
use object_store::{ObjectStore, PutMode, PutOptions, PutPayload};
1515
use observability_deps::tracing::{debug, error, info};
16-
use std::time::Duration;
16+
use std::time::{Duration, Instant};
1717
use std::{str::FromStr, sync::Arc};
1818
use tokio::sync::Mutex;
1919
use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot};
2020

21+
#[derive(Debug)]
22+
pub struct CreateWalObjectStoreArgs<'a> {
23+
pub time_provider: Arc<dyn TimeProvider>,
24+
pub object_store: Arc<dyn ObjectStore>,
25+
pub node_identifier_prefix: &'a str,
26+
pub file_notifier: Arc<dyn WalFileNotifier>,
27+
pub config: WalConfig,
28+
pub last_wal_sequence_number: Option<WalFileSequenceNumber>,
29+
pub last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
30+
pub snapshotted_wal_files_to_keep: u64,
31+
pub shutdown: ShutdownToken,
32+
pub wal_replay_concurrency_limit: Option<usize>,
33+
}
34+
2135
#[derive(Debug)]
2236
pub struct WalObjectStore {
2337
object_store: Arc<dyn ObjectStore>,
@@ -35,19 +49,21 @@ pub struct WalObjectStore {
3549
impl WalObjectStore {
3650
/// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that
3751
/// exist in the WAL files that haven't been cleaned up yet.
38-
#[allow(clippy::too_many_arguments)]
39-
pub async fn new(
40-
time_provider: Arc<dyn TimeProvider>,
41-
object_store: Arc<dyn ObjectStore>,
42-
node_identifier_prefix: impl Into<String> + Send,
43-
file_notifier: Arc<dyn WalFileNotifier>,
44-
config: WalConfig,
45-
last_wal_sequence_number: Option<WalFileSequenceNumber>,
46-
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
47-
snapshotted_wal_files_to_keep: u64,
48-
shutdown: ShutdownToken,
52+
pub async fn new<'a>(
53+
CreateWalObjectStoreArgs {
54+
time_provider,
55+
object_store,
56+
node_identifier_prefix,
57+
file_notifier,
58+
config,
59+
last_wal_sequence_number,
60+
last_snapshot_sequence_number,
61+
snapshotted_wal_files_to_keep,
62+
shutdown,
63+
wal_replay_concurrency_limit,
64+
}: CreateWalObjectStoreArgs<'a>,
4965
) -> Result<Arc<Self>, crate::Error> {
50-
let node_identifier = node_identifier_prefix.into();
66+
let node_identifier = node_identifier_prefix.to_string();
5167
let all_wal_file_paths =
5268
load_all_wal_file_paths(Arc::clone(&object_store), node_identifier.clone()).await?;
5369
let flush_interval = config.flush_interval;
@@ -64,8 +80,12 @@ impl WalObjectStore {
6480
shutdown.clone_cancellation_token(),
6581
);
6682

67-
wal.replay(last_wal_sequence_number, &all_wal_file_paths)
68-
.await?;
83+
wal.replay(
84+
last_wal_sequence_number,
85+
&all_wal_file_paths,
86+
wal_replay_concurrency_limit,
87+
)
88+
.await?;
6989
let wal = Arc::new(wal);
7090
background_wal_flush(Arc::clone(&wal), flush_interval, shutdown);
7191

@@ -128,8 +148,10 @@ impl WalObjectStore {
128148
&self,
129149
last_wal_sequence_number: Option<WalFileSequenceNumber>,
130150
all_wal_file_paths: &[Path],
151+
concurrency_limit: Option<usize>,
131152
) -> crate::Result<()> {
132-
debug!("replaying");
153+
let replay_start = Instant::now();
154+
info!("replaying WAL files");
133155
let paths = self.load_existing_wal_file_paths(last_wal_sequence_number, all_wal_file_paths);
134156

135157
let last_snapshot_sequence_number = {
@@ -148,72 +170,84 @@ impl WalObjectStore {
148170
Ok(verify_file_type_and_deserialize(file_bytes)?)
149171
}
150172

151-
let mut replay_tasks = Vec::new();
152-
for path in paths {
153-
let object_store = Arc::clone(&self.object_store);
154-
replay_tasks.push(tokio::spawn(get_contents(object_store, path)));
155-
}
156-
157-
for wal_contents in replay_tasks {
158-
let wal_contents = wal_contents.await??;
173+
// Load N files concurrently and then replay them immediately before loading the next batch
174+
// of N files. Since replaying has to happen _in order_ only loading the files part is
175+
// concurrent, replaying the WAL file itself is done sequentially based on the original
176+
// order (i.e paths, which is already sorted)
177+
for batched in paths.chunks(concurrency_limit.unwrap_or(usize::MAX)) {
178+
let batched_start = Instant::now();
179+
let mut results = Vec::with_capacity(batched.len());
180+
for path in batched {
181+
let object_store = Arc::clone(&self.object_store);
182+
results.push(tokio::spawn(get_contents(object_store, path.clone())));
183+
}
159184

160-
// add this to the snapshot tracker, so we know what to clear out later if the replay
161-
// was a wal file that had a snapshot
162-
self.flush_buffer
163-
.lock()
164-
.await
165-
.replay_wal_period(WalPeriod::new(
166-
wal_contents.wal_file_number,
167-
Timestamp::new(wal_contents.min_timestamp_ns),
168-
Timestamp::new(wal_contents.max_timestamp_ns),
169-
));
170-
171-
info!(
172-
n_ops = %wal_contents.ops.len(),
173-
min_timestamp_ns = %wal_contents.min_timestamp_ns,
174-
max_timestamp_ns = %wal_contents.max_timestamp_ns,
175-
wal_file_number = %wal_contents.wal_file_number,
176-
snapshot_details = ?wal_contents.snapshot,
177-
"replaying WAL file"
178-
);
185+
for wal_contents in results {
186+
let wal_contents = wal_contents.await??;
187+
info!(
188+
n_ops = %wal_contents.ops.len(),
189+
min_timestamp_ns = %wal_contents.min_timestamp_ns,
190+
max_timestamp_ns = %wal_contents.max_timestamp_ns,
191+
wal_file_number = %wal_contents.wal_file_number,
192+
snapshot_details = ?wal_contents.snapshot,
193+
"replaying WAL file with details"
194+
);
179195

180-
match wal_contents.snapshot {
181-
// This branch uses so much time
182-
None => self.file_notifier.notify(Arc::new(wal_contents)).await,
183-
Some(snapshot_details) => {
184-
let snapshot_info = {
185-
let mut buffer = self.flush_buffer.lock().await;
186-
187-
match buffer.snapshot_tracker.snapshot(snapshot_details.forced) {
188-
None => None,
189-
Some(info) => {
190-
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
191-
let permit = semaphore.acquire_owned().await.unwrap();
192-
Some((info, permit))
196+
// add this to the snapshot tracker, so we know what to clear out later if the replay
197+
// was a wal file that had a snapshot
198+
self.flush_buffer
199+
.lock()
200+
.await
201+
.replay_wal_period(WalPeriod::new(
202+
wal_contents.wal_file_number,
203+
Timestamp::new(wal_contents.min_timestamp_ns),
204+
Timestamp::new(wal_contents.max_timestamp_ns),
205+
));
206+
207+
match wal_contents.snapshot {
208+
// This branch uses so much time
209+
None => self.file_notifier.notify(Arc::new(wal_contents)).await,
210+
Some(snapshot_details) => {
211+
let snapshot_info = {
212+
let mut buffer = self.flush_buffer.lock().await;
213+
214+
match buffer.snapshot_tracker.snapshot(snapshot_details.forced) {
215+
None => None,
216+
Some(info) => {
217+
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
218+
let permit = semaphore.acquire_owned().await.unwrap();
219+
Some((info, permit))
220+
}
193221
}
222+
};
223+
if snapshot_details.snapshot_sequence_number
224+
<= last_snapshot_sequence_number
225+
{
226+
// Instead just notify about the WAL, as this snapshot has already been taken
227+
// and WAL files may have been cleared.
228+
self.file_notifier.notify(Arc::new(wal_contents)).await;
229+
} else {
230+
let snapshot_done = self
231+
.file_notifier
232+
.notify_and_snapshot(Arc::new(wal_contents), snapshot_details)
233+
.await;
234+
let details = snapshot_done.await.unwrap();
235+
assert_eq!(snapshot_details, details);
194236
}
195-
};
196-
if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number {
197-
// Instead just notify about the WAL, as this snapshot has already been taken
198-
// and WAL files may have been cleared.
199-
self.file_notifier.notify(Arc::new(wal_contents)).await;
200-
} else {
201-
let snapshot_done = self
202-
.file_notifier
203-
.notify_and_snapshot(Arc::new(wal_contents), snapshot_details)
204-
.await;
205-
let details = snapshot_done.await.unwrap();
206-
assert_eq!(snapshot_details, details);
207-
}
208237

209-
// if the info is there, we have wal files to delete
210-
if let Some((snapshot_info, snapshot_permit)) = snapshot_info {
211-
self.cleanup_snapshot(snapshot_info, snapshot_permit).await;
238+
// if the info is there, we have wal files to delete
239+
if let Some((snapshot_info, snapshot_permit)) = snapshot_info {
240+
self.cleanup_snapshot(snapshot_info, snapshot_permit).await;
241+
}
212242
}
213243
}
214244
}
245+
let batched_end = batched_start.elapsed();
246+
debug!(time_taken = ?batched_end, batch_len = ?batched.len(), "replaying batch completed");
215247
}
216248

249+
// this is useful to know at the info level
250+
info!(time_taken = ?replay_start.elapsed(), "completed replaying wal files");
217251
Ok(())
218252
}
219253

@@ -1216,6 +1250,7 @@ mod tests {
12161250
Path::from("my_host/wal/00000000001.wal"),
12171251
Path::from("my_host/wal/00000000002.wal"),
12181252
],
1253+
None,
12191254
)
12201255
.await
12211256
.unwrap();
@@ -1364,7 +1399,7 @@ mod tests {
13641399
vec![Path::from("my_host/wal/00000000003.wal")]
13651400
);
13661401
replay_wal
1367-
.replay(None, &[Path::from("my_host/wal/00000000003.wal")])
1402+
.replay(None, &[Path::from("my_host/wal/00000000003.wal")], None)
13681403
.await
13691404
.unwrap();
13701405
let replay_notifier = replay_notifier

influxdb3_wal/src/snapshot_tracker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
77
use crate::{Gen1Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber};
88
use data_types::Timestamp;
9-
use observability_deps::tracing::{debug, info};
9+
use observability_deps::tracing::{debug, info, trace};
1010

1111
/// A struct that tracks the WAL periods (files if using object store) and decides when to snapshot the WAL.
1212
#[derive(Debug)]
@@ -207,7 +207,7 @@ impl WalPeriod {
207207
min_time: Timestamp,
208208
max_time: Timestamp,
209209
) -> Self {
210-
info!(
210+
trace!(
211211
?min_time,
212212
?max_time,
213213
?wal_file_number,

0 commit comments

Comments
 (0)