feat: add concurrency limit for WAL replay

WAL replay currently loads _all_ WAL files concurrently running into
OOM. This commit adds a CLI parameter `--wal-replay-concurrency-limit`
that would allow the user to set a lower limit and run WAL replay again.

closes: https://github.com/influxdata/influxdb/issues/26481
pk/wal-replay-concurrency-limit
Praveen Kumar 2025-06-02 15:21:03 +01:00
parent be25c6f52b
commit 29a73cd185
No known key found for this signature in database
GPG Key ID: CB9E05780A79EA5A
11 changed files with 144 additions and 81 deletions

1
Cargo.lock generated
View File

@ -3306,6 +3306,7 @@ dependencies = [
"clap",
"crc32fast",
"data_types",
"futures",
"futures-util",
"hashbrown 0.15.3",
"humantime",

View File

@ -413,6 +413,12 @@ pub struct Config {
hide = true
)]
pub tcp_listener_file_path: Option<PathBuf>,
#[clap(
long = "wal-replay-concurrency-limit",
env = "INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT"
)]
pub wal_replay_concurrency_limit: Option<usize>,
}
/// The minimum version of TLS to use for InfluxDB
@ -692,6 +698,7 @@ pub async fn command(config: Config) -> Result<()> {
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
query_file_limit: config.query_file_limit,
shutdown: shutdown_manager.register(),
wal_replay_concurrency_limit: config.wal_replay_concurrency_limit,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;

View File

@ -109,6 +109,10 @@ Examples:
--wal-max-write-buffer-size <SIZE>
Max write requests in buffer [default: 100000]
[env: INFLUXDB3_WAL_MAX_WRITE_BUFFER_SIZE=]
--wal-replay-concurrency-limit <LIMIT>
Concurrency limit during WAL replay [default: no_limit]
If replay runs into OOM, set this to a lower number eg. 10
[env: INFLUXDB3_WAL_REPLAY_CONCURRENCY_LIMIT=]
--snapshotted-wal-files-to-keep <N>
Number of snapshotted WAL files to retain [default: 300]
[env: INFLUXDB3_NUM_WAL_FILES_TO_KEEP=]

View File

@ -1011,6 +1011,7 @@ mod tests {
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: shutdown.register(),
wal_replay_concurrency_limit: Some(1),
})
.await
.unwrap();

View File

@ -887,6 +887,7 @@ mod tests {
snapshotted_wal_files_to_keep: 100,
query_file_limit: None,
shutdown: shutdown_manager.register(),
wal_replay_concurrency_limit: Some(1),
},
)
.await

View File

@ -867,6 +867,7 @@ mod tests {
snapshotted_wal_files_to_keep: 1,
query_file_limit,
shutdown: shutdown.register(),
wal_replay_concurrency_limit: Some(1),
})
.await
.unwrap();

View File

@ -259,6 +259,7 @@ impl TestService {
snapshotted_wal_files_to_keep: 100,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
wal_replay_concurrency_limit: Some(1),
})
.await
.unwrap();

View File

@ -23,6 +23,7 @@ bitcode.workspace = true
bytes.workspace = true
byteorder.workspace = true
crc32fast.workspace = true
futures.workspace = true
futures-util.workspace = true
hashbrown.workspace = true
humantime.workspace = true

View File

@ -13,11 +13,25 @@ use iox_time::TimeProvider;
use object_store::path::{Path, PathPart};
use object_store::{ObjectStore, PutMode, PutOptions, PutPayload};
use observability_deps::tracing::{debug, error, info};
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{str::FromStr, sync::Arc};
use tokio::sync::Mutex;
use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot};
#[derive(Debug)]
pub struct CreateWalObjectStoreArgs<'a> {
pub time_provider: Arc<dyn TimeProvider>,
pub object_store: Arc<dyn ObjectStore>,
pub node_identifier_prefix: &'a str,
pub file_notifier: Arc<dyn WalFileNotifier>,
pub config: WalConfig,
pub last_wal_sequence_number: Option<WalFileSequenceNumber>,
pub last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
pub snapshotted_wal_files_to_keep: u64,
pub shutdown: ShutdownToken,
pub wal_replay_concurrency_limit: Option<usize>,
}
#[derive(Debug)]
pub struct WalObjectStore {
object_store: Arc<dyn ObjectStore>,
@ -35,19 +49,21 @@ pub struct WalObjectStore {
impl WalObjectStore {
/// Creates a new WAL. This will replay files into the notifier and trigger any snapshots that
/// exist in the WAL files that haven't been cleaned up yet.
#[allow(clippy::too_many_arguments)]
pub async fn new(
time_provider: Arc<dyn TimeProvider>,
object_store: Arc<dyn ObjectStore>,
node_identifier_prefix: impl Into<String> + Send,
file_notifier: Arc<dyn WalFileNotifier>,
config: WalConfig,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
snapshotted_wal_files_to_keep: u64,
shutdown: ShutdownToken,
pub async fn new<'a>(
CreateWalObjectStoreArgs {
time_provider,
object_store,
node_identifier_prefix,
file_notifier,
config,
last_wal_sequence_number,
last_snapshot_sequence_number,
snapshotted_wal_files_to_keep,
shutdown,
wal_replay_concurrency_limit,
}: CreateWalObjectStoreArgs<'a>,
) -> Result<Arc<Self>, crate::Error> {
let node_identifier = node_identifier_prefix.into();
let node_identifier = node_identifier_prefix.to_string();
let all_wal_file_paths =
load_all_wal_file_paths(Arc::clone(&object_store), node_identifier.clone()).await?;
let flush_interval = config.flush_interval;
@ -64,8 +80,12 @@ impl WalObjectStore {
shutdown.clone_cancellation_token(),
);
wal.replay(last_wal_sequence_number, &all_wal_file_paths)
.await?;
wal.replay(
last_wal_sequence_number,
&all_wal_file_paths,
wal_replay_concurrency_limit,
)
.await?;
let wal = Arc::new(wal);
background_wal_flush(Arc::clone(&wal), flush_interval, shutdown);
@ -128,8 +148,10 @@ impl WalObjectStore {
&self,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
all_wal_file_paths: &[Path],
concurrency_limit: Option<usize>,
) -> crate::Result<()> {
debug!("replaying");
let replay_start = Instant::now();
info!("replaying WAL files");
let paths = self.load_existing_wal_file_paths(last_wal_sequence_number, all_wal_file_paths);
let last_snapshot_sequence_number = {
@ -148,72 +170,84 @@ impl WalObjectStore {
Ok(verify_file_type_and_deserialize(file_bytes)?)
}
let mut replay_tasks = Vec::new();
for path in paths {
let object_store = Arc::clone(&self.object_store);
replay_tasks.push(tokio::spawn(get_contents(object_store, path)));
}
// Load N files concurrently and then replay them immediately before loading the next batch
// of N files. Since replaying has to happen _in order_ only loading the files part is
// concurrent, replaying the WAL file itself is done sequentially based on the original
// order (i.e paths, which is already sorted)
for batched in paths.chunks(concurrency_limit.unwrap_or(usize::MAX)) {
let batched_start = Instant::now();
let mut results = Vec::with_capacity(batched.len());
for path in batched {
let object_store = Arc::clone(&self.object_store);
results.push(tokio::spawn(get_contents(object_store, path.clone())));
}
for wal_contents in replay_tasks {
let wal_contents = wal_contents.await??;
for wal_contents in results {
let wal_contents = wal_contents.await??;
info!(
n_ops = %wal_contents.ops.len(),
min_timestamp_ns = %wal_contents.min_timestamp_ns,
max_timestamp_ns = %wal_contents.max_timestamp_ns,
wal_file_number = %wal_contents.wal_file_number,
snapshot_details = ?wal_contents.snapshot,
"replaying WAL file with details"
);
// add this to the snapshot tracker, so we know what to clear out later if the replay
// was a wal file that had a snapshot
self.flush_buffer
.lock()
.await
.replay_wal_period(WalPeriod::new(
wal_contents.wal_file_number,
Timestamp::new(wal_contents.min_timestamp_ns),
Timestamp::new(wal_contents.max_timestamp_ns),
));
// add this to the snapshot tracker, so we know what to clear out later if the replay
// was a wal file that had a snapshot
self.flush_buffer
.lock()
.await
.replay_wal_period(WalPeriod::new(
wal_contents.wal_file_number,
Timestamp::new(wal_contents.min_timestamp_ns),
Timestamp::new(wal_contents.max_timestamp_ns),
));
info!(
n_ops = %wal_contents.ops.len(),
min_timestamp_ns = %wal_contents.min_timestamp_ns,
max_timestamp_ns = %wal_contents.max_timestamp_ns,
wal_file_number = %wal_contents.wal_file_number,
snapshot_details = ?wal_contents.snapshot,
"replaying WAL file"
);
match wal_contents.snapshot {
// This branch uses so much time
None => self.file_notifier.notify(Arc::new(wal_contents)).await,
Some(snapshot_details) => {
let snapshot_info = {
let mut buffer = self.flush_buffer.lock().await;
match wal_contents.snapshot {
// This branch uses so much time
None => self.file_notifier.notify(Arc::new(wal_contents)).await,
Some(snapshot_details) => {
let snapshot_info = {
let mut buffer = self.flush_buffer.lock().await;
match buffer.snapshot_tracker.snapshot(snapshot_details.forced) {
None => None,
Some(info) => {
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
let permit = semaphore.acquire_owned().await.unwrap();
Some((info, permit))
match buffer.snapshot_tracker.snapshot(snapshot_details.forced) {
None => None,
Some(info) => {
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
let permit = semaphore.acquire_owned().await.unwrap();
Some((info, permit))
}
}
};
if snapshot_details.snapshot_sequence_number
<= last_snapshot_sequence_number
{
// Instead just notify about the WAL, as this snapshot has already been taken
// and WAL files may have been cleared.
self.file_notifier.notify(Arc::new(wal_contents)).await;
} else {
let snapshot_done = self
.file_notifier
.notify_and_snapshot(Arc::new(wal_contents), snapshot_details)
.await;
let details = snapshot_done.await.unwrap();
assert_eq!(snapshot_details, details);
}
};
if snapshot_details.snapshot_sequence_number <= last_snapshot_sequence_number {
// Instead just notify about the WAL, as this snapshot has already been taken
// and WAL files may have been cleared.
self.file_notifier.notify(Arc::new(wal_contents)).await;
} else {
let snapshot_done = self
.file_notifier
.notify_and_snapshot(Arc::new(wal_contents), snapshot_details)
.await;
let details = snapshot_done.await.unwrap();
assert_eq!(snapshot_details, details);
}
// if the info is there, we have wal files to delete
if let Some((snapshot_info, snapshot_permit)) = snapshot_info {
self.cleanup_snapshot(snapshot_info, snapshot_permit).await;
// if the info is there, we have wal files to delete
if let Some((snapshot_info, snapshot_permit)) = snapshot_info {
self.cleanup_snapshot(snapshot_info, snapshot_permit).await;
}
}
}
}
let batched_end = batched_start.elapsed();
debug!(time_taken = ?batched_end, batch_len = ?batched.len(), "replaying batch completed");
}
// this is useful to know at the info level
info!(time_taken = ?replay_start.elapsed(), "completed replaying wal files");
Ok(())
}
@ -1216,6 +1250,7 @@ mod tests {
Path::from("my_host/wal/00000000001.wal"),
Path::from("my_host/wal/00000000002.wal"),
],
None,
)
.await
.unwrap();
@ -1364,7 +1399,7 @@ mod tests {
vec![Path::from("my_host/wal/00000000003.wal")]
);
replay_wal
.replay(None, &[Path::from("my_host/wal/00000000003.wal")])
.replay(None, &[Path::from("my_host/wal/00000000003.wal")], None)
.await
.unwrap();
let replay_notifier = replay_notifier

View File

@ -6,7 +6,7 @@
use crate::{Gen1Duration, SnapshotDetails, SnapshotSequenceNumber, WalFileSequenceNumber};
use data_types::Timestamp;
use observability_deps::tracing::{debug, info};
use observability_deps::tracing::{debug, info, trace};
/// A struct that tracks the WAL periods (files if using object store) and decides when to snapshot the WAL.
#[derive(Debug)]
@ -207,7 +207,7 @@ impl WalPeriod {
min_time: Timestamp,
max_time: Timestamp,
) -> Self {
info!(
trace!(
?min_time,
?max_time,
?wal_file_number,

View File

@ -41,7 +41,10 @@ use influxdb3_catalog::{
catalog::{Catalog, DatabaseSchema, Prompt, TableDefinition},
};
use influxdb3_id::{DbId, TableId};
use influxdb3_wal::{Wal, WalConfig, WalFileNotifier, WalOp, object_store::WalObjectStore};
use influxdb3_wal::{
Wal, WalConfig, WalFileNotifier, WalOp,
object_store::{CreateWalObjectStoreArgs, WalObjectStore},
};
use iox_query::{
QueryChunk,
chunk_statistics::{NoColumnRanges, create_chunk_statistics},
@ -182,6 +185,7 @@ pub struct WriteBufferImplArgs {
pub snapshotted_wal_files_to_keep: u64,
pub query_file_limit: Option<usize>,
pub shutdown: ShutdownToken,
pub wal_replay_concurrency_limit: Option<usize>,
}
impl WriteBufferImpl {
@ -199,6 +203,7 @@ impl WriteBufferImpl {
snapshotted_wal_files_to_keep,
query_file_limit,
shutdown,
wal_replay_concurrency_limit,
}: WriteBufferImplArgs,
) -> Result<Arc<Self>> {
// load snapshots and replay the wal into the in memory buffer
@ -237,17 +242,18 @@ impl WriteBufferImpl {
// create the wal instance, which will replay into the queryable buffer and start
// the background flush task.
let wal = WalObjectStore::new(
Arc::clone(&time_provider),
persister.object_store(),
persister.node_identifier_prefix(),
Arc::clone(&queryable_buffer) as Arc<dyn WalFileNotifier>,
wal_config,
let wal = WalObjectStore::new(CreateWalObjectStoreArgs {
time_provider: Arc::clone(&time_provider),
object_store: persister.object_store(),
node_identifier_prefix: persister.node_identifier_prefix(),
file_notifier: Arc::clone(&queryable_buffer) as Arc<dyn WalFileNotifier>,
config: wal_config,
last_wal_sequence_number,
last_snapshot_sequence_number,
snapshotted_wal_files_to_keep,
shutdown,
)
wal_replay_concurrency_limit,
})
.await?;
let result = Arc::new(Self {
@ -780,6 +786,7 @@ mod tests {
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
wal_replay_concurrency_limit: Some(1),
})
.await
.unwrap();
@ -888,6 +895,7 @@ mod tests {
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
wal_replay_concurrency_limit: Some(1),
})
.await
.unwrap();
@ -980,6 +988,7 @@ mod tests {
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
wal_replay_concurrency_limit: Some(1),
})
.await
.unwrap()
@ -1239,6 +1248,7 @@ mod tests {
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
wal_replay_concurrency_limit: Some(1),
})
.await
.unwrap();
@ -3373,6 +3383,7 @@ mod tests {
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
wal_replay_concurrency_limit: None,
})
.await
.unwrap();