From 1ff4f7689645b7e55a22368134b3ef7bc22fb90d Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Sat, 11 Jan 2025 10:27:58 -0500 Subject: [PATCH] feat: only load wal files after most recent snapshot (#25787) --- influxdb3_wal/src/object_store.rs | 32 ++++++++++++++----- .../src/write_buffer/persisted_files.rs | 8 ----- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 65a161dc18..0d9947e4f3 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -50,7 +50,7 @@ impl WalObjectStore { last_snapshot_sequence_number, ); - wal.replay().await?; + wal.replay(last_wal_sequence_number).await?; let wal = Arc::new(wal); background_wal_flush(Arc::clone(&wal), flush_interval); @@ -96,9 +96,14 @@ impl WalObjectStore { /// Loads the WAL files in order from object store, calling the file notifier on each one and /// populating the snapshot tracker with the WAL periods. - pub async fn replay(&self) -> crate::Result<()> { + pub async fn replay( + &self, + last_wal_sequence_number: Option, + ) -> crate::Result<()> { debug!(">>> replaying"); - let paths = self.load_existing_wal_file_paths().await?; + let paths = self + .load_existing_wal_file_paths(last_wal_sequence_number) + .await?; let last_snapshot_sequence_number = { self.flush_buffer @@ -335,7 +340,10 @@ impl WalObjectStore { snapshot_response } - async fn load_existing_wal_file_paths(&self) -> crate::Result> { + async fn load_existing_wal_file_paths( + &self, + last_wal_sequence_number: Option, + ) -> crate::Result> { let mut paths = Vec::new(); let mut offset: Option = None; let path = Path::from(format!("{host}/wal", host = self.host_identifier_prefix)); @@ -358,6 +366,14 @@ impl WalObjectStore { paths.sort(); offset = Some(paths.last().unwrap().clone()) } + + // if we have a last wal path from persisted snapshots, we don't need to load the wal files + // that came before it: + if let Some(last_wal_sequence_number) = last_wal_sequence_number { + let last_wal_path = wal_path(&self.host_identifier_prefix, last_wal_sequence_number); + paths.retain(|path| path >= &last_wal_path); + } + paths.sort(); Ok(paths) @@ -968,13 +984,13 @@ mod tests { None, ); assert_eq!( - replay_wal.load_existing_wal_file_paths().await.unwrap(), + replay_wal.load_existing_wal_file_paths(None).await.unwrap(), vec![ Path::from("my_host/wal/00000000001.wal"), Path::from("my_host/wal/00000000002.wal") ] ); - replay_wal.replay().await.unwrap(); + replay_wal.replay(None).await.unwrap(); let replay_notifier = replay_notifier .as_any() .downcast_ref::() @@ -1109,10 +1125,10 @@ mod tests { None, ); assert_eq!( - replay_wal.load_existing_wal_file_paths().await.unwrap(), + replay_wal.load_existing_wal_file_paths(None).await.unwrap(), vec![Path::from("my_host/wal/00000000003.wal")] ); - replay_wal.replay().await.unwrap(); + replay_wal.replay(None).await.unwrap(); let replay_notifier = replay_notifier .as_any() .downcast_ref::() diff --git a/influxdb3_write/src/write_buffer/persisted_files.rs b/influxdb3_write/src/write_buffer/persisted_files.rs index 39b2adf9b1..5c73f0b071 100644 --- a/influxdb3_write/src/write_buffer/persisted_files.rs +++ b/influxdb3_write/src/write_buffer/persisted_files.rs @@ -26,14 +26,6 @@ impl PersistedFiles { } } - /// Add a file to the list of persisted files - pub fn add_file(&self, db_id: DbId, table_id: TableId, file: ParquetFile) { - let mut inner = self.inner.write(); - let tables = inner.files.entry(db_id).or_default(); - let table_files = tables.entry(table_id).or_default(); - table_files.push(file); - } - /// Add all files from a persisted snapshot pub fn add_persisted_snapshot_files(&self, persisted_snapshot: PersistedSnapshot) { let mut inner = self.inner.write();