feat: only load wal files after most recent snapshot (#25787)

pull/25793/head
Trevor Hilton 2025-01-11 10:27:58 -05:00 committed by GitHub
parent 88e10928ed
commit 1ff4f76896
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 24 additions and 16 deletions

View File

@ -50,7 +50,7 @@ impl WalObjectStore {
last_snapshot_sequence_number,
);
wal.replay().await?;
wal.replay(last_wal_sequence_number).await?;
let wal = Arc::new(wal);
background_wal_flush(Arc::clone(&wal), flush_interval);
@ -96,9 +96,14 @@ impl WalObjectStore {
/// Loads the WAL files in order from object store, calling the file notifier on each one and
/// populating the snapshot tracker with the WAL periods.
pub async fn replay(&self) -> crate::Result<()> {
pub async fn replay(
&self,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
) -> crate::Result<()> {
debug!(">>> replaying");
let paths = self.load_existing_wal_file_paths().await?;
let paths = self
.load_existing_wal_file_paths(last_wal_sequence_number)
.await?;
let last_snapshot_sequence_number = {
self.flush_buffer
@ -335,7 +340,10 @@ impl WalObjectStore {
snapshot_response
}
async fn load_existing_wal_file_paths(&self) -> crate::Result<Vec<Path>> {
async fn load_existing_wal_file_paths(
&self,
last_wal_sequence_number: Option<WalFileSequenceNumber>,
) -> crate::Result<Vec<Path>> {
let mut paths = Vec::new();
let mut offset: Option<Path> = None;
let path = Path::from(format!("{host}/wal", host = self.host_identifier_prefix));
@ -358,6 +366,14 @@ impl WalObjectStore {
paths.sort();
offset = Some(paths.last().unwrap().clone())
}
// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
if let Some(last_wal_sequence_number) = last_wal_sequence_number {
let last_wal_path = wal_path(&self.host_identifier_prefix, last_wal_sequence_number);
paths.retain(|path| path >= &last_wal_path);
}
paths.sort();
Ok(paths)
@ -968,13 +984,13 @@ mod tests {
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
replay_wal.load_existing_wal_file_paths(None).await.unwrap(),
vec![
Path::from("my_host/wal/00000000001.wal"),
Path::from("my_host/wal/00000000002.wal")
]
);
replay_wal.replay().await.unwrap();
replay_wal.replay(None).await.unwrap();
let replay_notifier = replay_notifier
.as_any()
.downcast_ref::<TestNotifier>()
@ -1109,10 +1125,10 @@ mod tests {
None,
);
assert_eq!(
replay_wal.load_existing_wal_file_paths().await.unwrap(),
replay_wal.load_existing_wal_file_paths(None).await.unwrap(),
vec![Path::from("my_host/wal/00000000003.wal")]
);
replay_wal.replay().await.unwrap();
replay_wal.replay(None).await.unwrap();
let replay_notifier = replay_notifier
.as_any()
.downcast_ref::<TestNotifier>()

View File

@ -26,14 +26,6 @@ impl PersistedFiles {
}
}
/// Add a file to the list of persisted files
pub fn add_file(&self, db_id: DbId, table_id: TableId, file: ParquetFile) {
let mut inner = self.inner.write();
let tables = inner.files.entry(db_id).or_default();
let table_files = tables.entry(table_id).or_default();
table_files.push(file);
}
/// Add all files from a persisted snapshot
pub fn add_persisted_snapshot_files(&self, persisted_snapshot: PersistedSnapshot) {
let mut inner = self.inner.write();