From 911ba92ab4133e75fe2a420e16ed9cb4cf32196f Mon Sep 17 00:00:00 2001 From: praveen-influx Date: Sun, 2 Feb 2025 16:51:53 +0000 Subject: [PATCH] feat: clear query buffer incrementally when snapshotting (#25948) * feat: clear query buffer incrementally when snapshotting This commit clears the query buffer incrementally as soon as a table's data in buffer is written into parquet file and cached. Previously, clearing the buffer happened at the end in the background * refactor: only clear buffer after adding to persisted files * refactor: rename function --- .../src/write_buffer/persisted_files.rs | 26 ++++++++ .../src/write_buffer/queryable_buffer.rs | 59 +++++++++---------- 2 files changed, 54 insertions(+), 31 deletions(-) diff --git a/influxdb3_write/src/write_buffer/persisted_files.rs b/influxdb3_write/src/write_buffer/persisted_files.rs index ece6b6efaa..c14a320276 100644 --- a/influxdb3_write/src/write_buffer/persisted_files.rs +++ b/influxdb3_write/src/write_buffer/persisted_files.rs @@ -36,6 +36,12 @@ impl PersistedFiles { inner.add_persisted_snapshot(persisted_snapshot); } + /// Add single file to a table + pub fn add_persisted_file(&self, db_id: &DbId, table_id: &TableId, parquet_file: &ParquetFile) { + let mut inner = self.inner.write(); + inner.add_persisted_file(db_id, table_id, parquet_file); + } + /// Get the list of files for a given database and table, always return in descending order of min_time pub fn get_files(&self, db_id: DbId, table_id: TableId) -> Vec { self.get_files_filtered(db_id, table_id, &ChunkFilter::default()) @@ -124,6 +130,26 @@ impl Inner { update_persisted_files_with_snapshot(false, persisted_snapshot, &mut self.files); self.parquet_files_count += file_count; } + + pub fn add_persisted_file( + &mut self, + db_id: &DbId, + table_id: &TableId, + parquet_file: &ParquetFile, + ) { + let existing_parquet_files = self + .files + .entry(*db_id) + .or_default() + .entry(*table_id) + .or_default(); + if !existing_parquet_files.contains(parquet_file) { + self.parquet_files_row_count += parquet_file.row_count; + self.parquet_files_size_mb += as_mb(parquet_file.size_bytes); + existing_parquet_files.push(parquet_file.clone()); + } + self.parquet_files_count += 1; + } } fn as_mb(bytes: u64) -> f64 { diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 9f643ba855..cacf6d10b9 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -283,6 +283,8 @@ impl QueryableBuffer { let executor = Arc::clone(&executor); let persisted_snapshot = Arc::clone(&persisted_snapshot); let parquet_cache = parquet_cache.clone(); + let buffer = Arc::clone(&buffer); + let persisted_files = Arc::clone(&persisted_files); set.spawn(async move { let path = persist_job.path.to_string(); @@ -313,20 +315,33 @@ impl QueryableBuffer { // https://github.com/influxdata/influxdb/issues/25676 // https://github.com/influxdata/influxdb/issues/25677 .expect("sort, deduplicate, and persist buffer data as parquet"); + let parquet_file = ParquetFile { + id: ParquetFileId::new(), + path, + size_bytes: file_size_bytes, + row_count: file_meta_data.num_rows as u64, + chunk_time, + min_time, + max_time, + }; - persisted_snapshot.lock().add_parquet_file( - database_id, - table_id, - ParquetFile { - id: ParquetFileId::new(), - path, - size_bytes: file_size_bytes, - row_count: file_meta_data.num_rows as u64, - chunk_time, - min_time, - max_time, - }, - ) + { + // we can clear the buffer as we move on + let mut buffer = buffer.write(); + + // add file first + persisted_files.add_persisted_file(&database_id, &table_id, &parquet_file); + // then clear the buffer + if let Some(db) = buffer.db_to_table.get_mut(&database_id) { + if let Some(table) = db.get_mut(&table_id) { + table.clear_snapshots(); + } + } + } + + persisted_snapshot + .lock() + .add_parquet_file(database_id, table_id, parquet_file) }); } @@ -386,24 +401,6 @@ impl QueryableBuffer { } } - // clear out the write buffer and add all the persisted files to the persisted files - // on a background task to ensure that the cache has been populated before we clear - // the buffer - tokio::spawn(async move { - // same reason as explained above, if persist jobs are empty, no snapshotting - // has happened so no need to clear the snapshots - if !persist_jobs_empty { - let mut buffer = buffer.write(); - for (_, table_map) in buffer.db_to_table.iter_mut() { - for (_, table_buffer) in table_map.iter_mut() { - table_buffer.clear_snapshots(); - } - } - - persisted_files.add_persisted_snapshot_files(persisted_snapshot); - } - }); - let _ = sender.send(snapshot_details); });