feat: clear query buffer incrementally when snapshotting (#25948)

* feat: clear query buffer incrementally when snapshotting

This commit clears the query buffer incrementally as soon as a table's
data in buffer is written into parquet file and cached. Previously,
clearing the buffer happened at the end in the background

* refactor: only clear buffer after adding to persisted files

* refactor: rename function
mgattozzi/pretty-cli
praveen-influx 2025-02-02 16:51:53 +00:00 committed by GitHub
parent 23b77946f4
commit 911ba92ab4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 31 deletions

View File

@ -36,6 +36,12 @@ impl PersistedFiles {
inner.add_persisted_snapshot(persisted_snapshot);
}
/// Add single file to a table
pub fn add_persisted_file(&self, db_id: &DbId, table_id: &TableId, parquet_file: &ParquetFile) {
let mut inner = self.inner.write();
inner.add_persisted_file(db_id, table_id, parquet_file);
}
/// Get the list of files for a given database and table, always return in descending order of min_time
pub fn get_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
self.get_files_filtered(db_id, table_id, &ChunkFilter::default())
@ -124,6 +130,26 @@ impl Inner {
update_persisted_files_with_snapshot(false, persisted_snapshot, &mut self.files);
self.parquet_files_count += file_count;
}
pub fn add_persisted_file(
&mut self,
db_id: &DbId,
table_id: &TableId,
parquet_file: &ParquetFile,
) {
let existing_parquet_files = self
.files
.entry(*db_id)
.or_default()
.entry(*table_id)
.or_default();
if !existing_parquet_files.contains(parquet_file) {
self.parquet_files_row_count += parquet_file.row_count;
self.parquet_files_size_mb += as_mb(parquet_file.size_bytes);
existing_parquet_files.push(parquet_file.clone());
}
self.parquet_files_count += 1;
}
}
fn as_mb(bytes: u64) -> f64 {

View File

@ -283,6 +283,8 @@ impl QueryableBuffer {
let executor = Arc::clone(&executor);
let persisted_snapshot = Arc::clone(&persisted_snapshot);
let parquet_cache = parquet_cache.clone();
let buffer = Arc::clone(&buffer);
let persisted_files = Arc::clone(&persisted_files);
set.spawn(async move {
let path = persist_job.path.to_string();
@ -313,20 +315,33 @@ impl QueryableBuffer {
// https://github.com/influxdata/influxdb/issues/25676
// https://github.com/influxdata/influxdb/issues/25677
.expect("sort, deduplicate, and persist buffer data as parquet");
let parquet_file = ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
};
persisted_snapshot.lock().add_parquet_file(
database_id,
table_id,
ParquetFile {
id: ParquetFileId::new(),
path,
size_bytes: file_size_bytes,
row_count: file_meta_data.num_rows as u64,
chunk_time,
min_time,
max_time,
},
)
{
// we can clear the buffer as we move on
let mut buffer = buffer.write();
// add file first
persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file);
// then clear the buffer
if let Some(db) = buffer.db_to_table.get_mut(&database_id) {
if let Some(table) = db.get_mut(&table_id) {
table.clear_snapshots();
}
}
}
persisted_snapshot
.lock()
.add_parquet_file(database_id, table_id, parquet_file)
});
}
@ -386,24 +401,6 @@ impl QueryableBuffer {
}
}
// clear out the write buffer and add all the persisted files to the persisted files
// on a background task to ensure that the cache has been populated before we clear
// the buffer
tokio::spawn(async move {
// same reason as explained above, if persist jobs are empty, no snapshotting
// has happened so no need to clear the snapshots
if !persist_jobs_empty {
let mut buffer = buffer.write();
for (_, table_map) in buffer.db_to_table.iter_mut() {
for (_, table_buffer) in table_map.iter_mut() {
table_buffer.clear_snapshots();
}
}
persisted_files.add_persisted_snapshot_files(persisted_snapshot);
}
});
let _ = sender.send(snapshot_details);
});