refactor: address PR comment

praveen/delete-wal-background
Praveen Kumar 2025-01-12 00:11:54 +00:00
parent 5e76230403
commit a03f3fb509
No known key found for this signature in database
GPG Key ID: CB9E05780A79EA5A
3 changed files with 31 additions and 29 deletions

View File

@ -775,8 +775,6 @@ mod tests {
expected: &'a [&'a str],
}
// TODO: I wasn't expecting to change these test cases when disabling deletion of wal files, this
// probably needs some reasoning with the test, currently just updated the tests
let test_cases = [
TestCase {
query: "\
@ -787,9 +785,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
@ -802,9 +800,9 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1961 | 3 | 20 | 40 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
@ -816,12 +814,12 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| mem | 1956 | 2 | 0 | 10 |",
"| mem | 1961 | 3 | 20 | 40 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 0 | 20 |",
"| mem | 1961 | 3 | 30 | 50 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},
@ -834,10 +832,10 @@ mod tests {
"+------------+------------+-----------+----------+----------+",
"| table_name | size_bytes | row_count | min_time | max_time |",
"+------------+------------+-----------+----------+----------+",
"| cpu | 1956 | 2 | 0 | 10 |",
"| cpu | 1961 | 3 | 20 | 40 |",
"| cpu | 1961 | 3 | 50 | 70 |",
"| mem | 1961 | 3 | 50 | 70 |",
"| cpu | 1961 | 3 | 0 | 20 |",
"| cpu | 1961 | 3 | 30 | 50 |",
"| cpu | 1961 | 3 | 60 | 80 |",
"| mem | 1961 | 3 | 60 | 80 |",
"+------------+------------+-----------+----------+----------+",
],
},

View File

@ -365,11 +365,11 @@ impl WalObjectStore {
last_wal_sequence_number: Option<WalFileSequenceNumber>,
all_wal_file_paths: &[Path],
) -> Vec<Path> {
// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
all_wal_file_paths
.iter()
.filter(|path| {
// if we have a last wal path from persisted snapshots, we don't need to load the wal files
// that came before it:
if let Some(last_wal_sequence_number) = last_wal_sequence_number {
let last_wal_path =
wal_path(&self.host_identifier_prefix, last_wal_sequence_number);
@ -378,7 +378,10 @@ impl WalObjectStore {
?last_wal_path,
">>> path and last_wal_path check when replaying"
);
*path >= &last_wal_path
// last_wal_sequence_number that comes from persisted snapshot is
// holds the last wal number (inclusive) that has been snapshotted
// so, anything greater than that needs to be loaded
*path > &last_wal_path
} else {
true
}

View File

@ -179,6 +179,13 @@ impl QueryableBuffer {
let persist_jobs = {
let mut buffer = self.buffer.write();
// need to buffer first before snapshotting
buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.distinct_cache_provider,
);
let mut persisting_chunks = vec![];
let catalog = Arc::clone(&buffer.catalog);
for (database_id, table_map) in buffer.db_to_table.iter_mut() {
@ -208,7 +215,7 @@ impl QueryableBuffer {
table_name.as_ref(),
table_id.as_u32(),
chunk.chunk_time,
write.wal_file_number,
snapshot_details.last_wal_sequence_number,
),
batch: chunk.record_batch,
schema: chunk.schema,
@ -221,12 +228,6 @@ impl QueryableBuffer {
}
}
buffer.buffer_ops(
&write.ops,
&self.last_cache_provider,
&self.distinct_cache_provider,
);
persisting_chunks
};
@ -278,7 +279,7 @@ impl QueryableBuffer {
let mut persisted_snapshot = PersistedSnapshot::new(
persister.host_identifier_prefix().to_string(),
snapshot_details.snapshot_sequence_number,
wal_file_number,
snapshot_details.last_wal_sequence_number,
catalog.sequence_number(),
);
let mut cache_notifiers = vec![];