From bf9fa9f780168d6122a90f3db3f0a89c0d01f4d4 Mon Sep 17 00:00:00 2001 From: wayne Date: Thu, 5 Jun 2025 13:58:21 -0600 Subject: [PATCH] chore: backport retention period implementation from enterprise Includes two main components: * Removal of expired data from `PersistedFiles`. * Modified `ChunkFilter` that precisely excludes expired data from query results even if the expired data hasn't been removed from the object store yet. --------- Co-authored-by: Michael Gattozzi --- influxdb3_catalog/src/catalog.rs | 136 ++++++++++++++++-- influxdb3_server/src/query_executor/mod.rs | 24 +++- influxdb3_write/src/lib.rs | 40 ++++-- influxdb3_write/src/persister.rs | 7 + ...__tests__persisted_snapshot_structure.snap | 4 +- .../src/write_buffer/persisted_files.rs | 110 ++++++++++++-- .../src/write_buffer/queryable_buffer.rs | 61 ++++++-- 7 files changed, 337 insertions(+), 45 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 00180166ec..1e52d7dda4 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -643,18 +643,23 @@ impl Catalog { Ok((token_info, token)) } - // Return the oldest allowable timestamp for the given table according to the - // currently-available set of retention policies. This is returned as a number of nanoseconds - // since the Unix Epoch. - pub fn get_retention_period_cutoff_ts_nanos(&self, db_id: &DbId, _: &TableId) -> Option { - let db = self.db_schema_by_id(db_id)?; - let retention_period = match db.retention_period { - RetentionPeriod::Duration(d) => Some(d.as_nanos() as u64), - RetentionPeriod::Indefinite => None, - }?; - - let now = self.time_provider.now().timestamp_nanos(); - Some(now - retention_period as i64) + // Return a map of all retention periods indexed by their combined database & table IDs. + pub fn get_retention_period_cutoff_map(&self) -> BTreeMap<(DbId, TableId), i64> { + self.list_db_schema() + .into_iter() + .flat_map(|db_schema| { + db_schema + .tables() + .filter_map(|table_def| { + let db_id = db_schema.id(); + let table_id = table_def.id(); + db_schema + .get_retention_period_cutoff_ts_nanos(self.time_provider()) + .map(|cutoff| ((db_id, table_id), cutoff)) + }) + .collect::>() + }) + .collect() } } @@ -1401,6 +1406,22 @@ impl DatabaseSchema { }, ) } + + // Return the oldest allowable timestamp for the given table according to the + // currently-available set of retention policies. This is returned as a number of nanoseconds + // since the Unix Epoch. + pub fn get_retention_period_cutoff_ts_nanos( + &self, + time_provider: Arc, + ) -> Option { + let retention_period = match self.retention_period { + RetentionPeriod::Duration(d) => Some(d.as_nanos() as u64), + RetentionPeriod::Indefinite => None, + }?; + + let now = time_provider.now().timestamp_nanos(); + Some(now - retention_period as i64) + } } trait UpdateDatabaseSchema { @@ -2825,6 +2846,97 @@ mod tests { ); } + #[test_log::test(tokio::test)] + async fn retention_period_cutoff_map() { + use iox_time::MockProvider; + let now = Time::from_timestamp(60 * 60 * 24, 0).unwrap(); + let time_provider = Arc::new(MockProvider::new(now)); + let catalog = + Catalog::new_in_memory_with_args("test", time_provider as _, CatalogArgs::default()) + .await + .unwrap(); + + let testdb1 = "test-db"; + let mut txn = catalog.begin(testdb1).unwrap(); + + for i in 0..4 { + let table_name = format!("test-table-{i}"); + txn.table_or_create(&table_name).unwrap(); + txn.column_or_create(&table_name, "field", FieldDataType::String) + .unwrap(); + txn.column_or_create(&table_name, "time", FieldDataType::Timestamp) + .unwrap(); + } + catalog.commit(txn).await.unwrap(); + + let testdb2 = "test-db-2"; + let mut txn = catalog.begin(testdb2).unwrap(); + + for i in 0..4 { + let table_name = format!("test-table-{i}"); + txn.table_or_create(&table_name).unwrap(); + txn.column_or_create(&table_name, "field", FieldDataType::String) + .unwrap(); + txn.column_or_create(&table_name, "time", FieldDataType::Timestamp) + .unwrap(); + } + catalog.commit(txn).await.unwrap(); + + let database_retention = Duration::from_secs(15); + let database_cutoff = now - database_retention; + + // set per-table and database-level retention periods on table 2 + catalog + .set_retention_period_for_database(testdb2, database_retention) + .await + .expect("must be able to set retention for database"); + + let map = catalog.get_retention_period_cutoff_map(); + assert_eq!(map.len(), 4, "expect 4 entries in resulting map"); + + // validate tables where there is either a table or a database retention set + for (db_name, table_name, expected_cutoff) in [ + (testdb2, "test-table-0", database_cutoff.timestamp_nanos()), + (testdb2, "test-table-1", database_cutoff.timestamp_nanos()), + (testdb2, "test-table-2", database_cutoff.timestamp_nanos()), + (testdb2, "test-table-3", database_cutoff.timestamp_nanos()), + ] { + let db_schema = catalog + .db_schema(db_name) + .expect("must be able to get expected database schema"); + let table_def = db_schema + .table_definition(table_name) + .expect("must be able to get expected table definition"); + let cutoff = map + .get(&(db_schema.id(), table_def.id())) + .expect("expected retention period must exist"); + assert_eq!( + *cutoff, expected_cutoff, + "expected cutoff must match actual" + ); + } + + // validate tables with no retention set + for (db_name, table_name) in [ + (testdb1, "test-table-0"), + (testdb1, "test-table-1"), + (testdb1, "test-table-2"), + (testdb1, "test-table-3"), + ] { + let db_schema = catalog + .db_schema(db_name) + .expect("must be able to get expected database schema"); + let table_def = db_schema + .table_definition(table_name) + .expect("must be able to get expected table definition"); + let v = map.get(&(db_schema.id(), table_def.id())); + assert!( + v.is_none(), + "no retention period cutoff expected for {db_name}/{table_name}" + ); + } + } + #[test_log::test(tokio::test)] async fn test_catalog_file_ordering() { let local_disk = diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index bba29811a1..33db9d514b 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -682,9 +682,31 @@ impl QueryTable { filters: &[Expr], _limit: Option, ) -> Result>, DataFusionError> { - let buffer_filter = ChunkFilter::new(&self.table_def, filters) + let mut buffer_filter = ChunkFilter::new(&self.table_def, filters) .map_err(|error| DataFusionError::External(Box::new(error)))?; + let catalog = self.write_buffer.catalog(); + let retention_period_cutoff = match self + .db_schema + .get_retention_period_cutoff_ts_nanos(catalog.time_provider()) + { + Some(time) => time, + None => { + return self.write_buffer.get_table_chunks( + Arc::clone(&self.db_schema), + Arc::clone(&self.table_def), + &buffer_filter, + projection, + ctx, + ); + } + }; + + buffer_filter.time_lower_bound_ns = buffer_filter + .time_lower_bound_ns + .map(|lb| lb.max(retention_period_cutoff)) + .or(Some(retention_period_cutoff)); + self.write_buffer.get_table_chunks( Arc::clone(&self.db_schema), Arc::clone(&self.table_def), diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index d6102ca789..d4a4774264 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -188,6 +188,10 @@ pub struct PersistedSnapshot { /// The collection of databases that had tables persisted in this snapshot. The tables will then have their /// name and the parquet file. pub databases: SerdeVecMap, + /// The collection of databases that had files removed in this snapshot. + /// The tables will then have their name and the parquet file that was removed. + #[serde(default)] + pub removed_files: SerdeVecMap, } impl PersistedSnapshot { @@ -208,6 +212,7 @@ impl PersistedSnapshot { min_time: i64::MAX, max_time: i64::MIN, databases: SerdeVecMap::new(), + removed_files: SerdeVecMap::new(), } } @@ -273,9 +278,9 @@ pub struct ParquetFile { pub row_count: u64, /// chunk time nanos pub chunk_time: i64, - /// min time nanos + /// min time nanos; aka the time of the oldest record in the file pub min_time: i64, - /// max time nanos + /// max time nanos; aka the time of the newest record in the file pub max_time: i64, } @@ -343,17 +348,18 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision { /// A derived set of filters that are used to prune data in the buffer when serving queries #[derive(Debug, Default)] pub struct ChunkFilter<'a> { - time_lower_bound_ns: Option, - time_upper_bound_ns: Option, + pub time_lower_bound_ns: Option, + pub time_upper_bound_ns: Option, filters: &'a [Expr], } impl<'a> ChunkFilter<'a> { - /// Create a new `ChunkFilter` given a [`TableDefinition`] and set of filter [`Expr`]s from - /// a logical query plan. + /// Create a new `ChunkFilter` given a [`TableDefinition`] and set of filter + /// [`Expr`]s from a logical query plan. /// - /// This method analyzes the incoming `exprs` to determine if there are any filters on the - /// `time` column and attempt to derive the boundaries on `time` from the query. + /// This method analyzes the incoming `exprs` to determine if there are any + /// filters on the `time` column and attempts to derive the boundaries on + /// `time` from the query. pub fn new(table_def: &Arc, exprs: &'a [Expr]) -> Result { debug!(input = ?exprs, "creating chunk filter"); let mut time_interval: Option = None; @@ -370,8 +376,8 @@ impl<'a> ChunkFilter<'a> { let props = ExecutionProps::new(); for expr in exprs.iter().filter(|e| { - // NOTE: filter out most expression types, as they are not relevant to time bound - // analysis: + // NOTE: filter out most expression types, as they are not relevant to + // time bound analysis: matches!(e, Expr::BinaryExpr(_) | Expr::Not(_) | Expr::Between(_)) // Check if the expression refers to the `time` column: && e.column_refs() @@ -390,12 +396,14 @@ impl<'a> ChunkFilter<'a> { ) .context("unable to analyze provided filters for a boundary on the time column")?; - // Set the boundaries on the time column using the evaluated interval, if it exisxts - // If an interval was already derived from a previous expression, we take their - // intersection, or produce an error if: + // Set the boundaries on the time column using the evaluated + // interval, if it exists. + // If an interval was already derived from a previous expression, we + // take their intersection, or produce an error if: // - the derived intervals are not compatible (different types) - // - the derived intervals do not intersect, this should be a user error, i.e., a - // poorly formed query + // - the derived intervals do not intersect, this should be a user + // error, i.e., a poorly formed query or querying outside of a + // retention policy for the table if let Some(ExprBoundaries { interval, .. }) = (time_col_index < analysis.boundaries.len()) .then_some(analysis.boundaries.remove(time_col_index)) @@ -581,6 +589,7 @@ mod tests { max_time: 1, row_count: 0, parquet_size_bytes: 0, + removed_files: SerdeVecMap::new(), }; // db 2 setup @@ -623,6 +632,7 @@ mod tests { max_time: 1, row_count: 0, parquet_size_bytes: 0, + removed_files: SerdeVecMap::new(), }; let overall_counts = PersistedSnapshot::overall_db_table_file_counts(&[ diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index be7d85fabd..25e4483a1c 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -374,6 +374,7 @@ mod tests { wal_file_sequence_number: WalFileSequenceNumber::new(0), catalog_sequence_number: CatalogSequenceNumber::new(0), databases: SerdeVecMap::new(), + removed_files: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -396,6 +397,7 @@ mod tests { wal_file_sequence_number: WalFileSequenceNumber::new(0), catalog_sequence_number: CatalogSequenceNumber::default(), databases: SerdeVecMap::new(), + removed_files: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -408,6 +410,7 @@ mod tests { wal_file_sequence_number: WalFileSequenceNumber::new(1), catalog_sequence_number: CatalogSequenceNumber::default(), databases: SerdeVecMap::new(), + removed_files: SerdeVecMap::new(), max_time: 1, min_time: 0, row_count: 0, @@ -420,6 +423,7 @@ mod tests { wal_file_sequence_number: WalFileSequenceNumber::new(2), catalog_sequence_number: CatalogSequenceNumber::default(), databases: SerdeVecMap::new(), + removed_files: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -454,6 +458,7 @@ mod tests { wal_file_sequence_number: WalFileSequenceNumber::new(0), catalog_sequence_number: CatalogSequenceNumber::default(), databases: SerdeVecMap::new(), + removed_files: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -481,6 +486,7 @@ mod tests { wal_file_sequence_number: WalFileSequenceNumber::new(id), catalog_sequence_number: CatalogSequenceNumber::new(id), databases: SerdeVecMap::new(), + removed_files: SerdeVecMap::new(), min_time: 0, max_time: 1, row_count: 0, @@ -619,6 +625,7 @@ mod tests { row_count: 1, min_time: 0, max_time: 1, + removed_files: SerdeVecMap::new(), databases, }); insta::assert_json_snapshot!(snapshot); diff --git a/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap b/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap index 6f88476fe0..5cbe815bcc 100644 --- a/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap +++ b/influxdb3_write/src/snapshots/influxdb3_write__persister__tests__persisted_snapshot_structure.snap @@ -1,6 +1,7 @@ --- source: influxdb3_write/src/persister.rs expression: snapshot +snapshot_kind: text --- { "version": "1", @@ -120,5 +121,6 @@ expression: snapshot ] } ] - ] + ], + "removed_files": [] } diff --git a/influxdb3_write/src/write_buffer/persisted_files.rs b/influxdb3_write/src/write_buffer/persisted_files.rs index dc2235b0ea..488c354414 100644 --- a/influxdb3_write/src/write_buffer/persisted_files.rs +++ b/influxdb3_write/src/write_buffer/persisted_files.rs @@ -2,11 +2,14 @@ //! When queries come in they will combine whatever chunks exist from `QueryableBuffer` with //! the persisted files to get the full set of data to query. -use crate::ChunkFilter; +use std::sync::Arc; + +use crate::{ChunkFilter, DatabaseTables}; use crate::{ParquetFile, PersistedSnapshot}; -use hashbrown::HashMap; -use influxdb3_id::DbId; +use hashbrown::{HashMap, HashSet}; +use influxdb3_catalog::catalog::Catalog; use influxdb3_id::TableId; +use influxdb3_id::{DbId, SerdeVecMap}; use influxdb3_telemetry::ParquetMetrics; use parking_lot::RwLock; @@ -71,6 +74,68 @@ impl PersistedFiles { files } + + pub fn remove_files_by_retention_period( + &self, + catalog: Arc, + ) -> SerdeVecMap { + let mut removed: SerdeVecMap = SerdeVecMap::new(); + let mut removed_paths: HashSet = HashSet::new(); + let mut size = 0; + let mut row_count = 0; + + let retention_periods = catalog.get_retention_period_cutoff_map(); + + // nothing to do if there are no retention periods, return empty result + if retention_periods.is_empty() { + return removed; + } + + // do all retention period checking of persisted files under a read lock to prevent + // blocking unnecessarily blocking concurrently-running queries + { + let guard = self.inner.read(); + for ((db_id, table_id), cutoff) in catalog.get_retention_period_cutoff_map() { + let Some(files) = guard.files.get(&db_id).and_then(|hm| hm.get(&table_id)) else { + continue; + }; + for file in files { + // remove files if their max time (aka newest timestamp) is less than (aka older + // than) the cutoff timestamp for the retention period + if file.max_time < cutoff { + size += file.size_bytes; + row_count += file.row_count; + removed + .entry(db_id) + .or_default() + .tables + .entry(table_id) + .or_default() + .push(file.clone()); + removed_paths.insert(file.path.clone()); + } + } + } + } + + // if no persisted files are found to be in violation of their retention period, then + // return an empty result to avoid unnecessarily acquiring a write lock + if removed.is_empty() { + return removed; + } + + let mut guard = self.inner.write(); + for (_, tables) in guard.files.iter_mut() { + for (_, files) in tables.iter_mut() { + files.retain(|file| !removed_paths.contains(&file.path)) + } + } + + guard.parquet_files_count -= removed.len() as u64; + guard.parquet_files_size_mb -= as_mb(size); + guard.parquet_files_row_count -= row_count; + removed + } } impl ParquetMetrics for PersistedFiles { @@ -110,9 +175,11 @@ impl Inner { |mut files, persisted_snapshot| { size_in_mb += as_mb(persisted_snapshot.parquet_size_bytes); row_count += persisted_snapshot.row_count; - let parquet_files_added = + let (parquet_files_added, removed_size, removed_row_count) = update_persisted_files_with_snapshot(true, persisted_snapshot, &mut files); file_count += parquet_files_added; + size_in_mb -= as_mb(removed_size); + row_count -= removed_row_count; files }, ); @@ -128,8 +195,10 @@ impl Inner { pub(crate) fn add_persisted_snapshot(&mut self, persisted_snapshot: PersistedSnapshot) { self.parquet_files_row_count += persisted_snapshot.row_count; self.parquet_files_size_mb += as_mb(persisted_snapshot.parquet_size_bytes); - let file_count = + let (file_count, removed_file_size, removed_row_count) = update_persisted_files_with_snapshot(false, persisted_snapshot, &mut self.files); + self.parquet_files_row_count -= removed_row_count; + self.parquet_files_size_mb -= as_mb(removed_file_size); self.parquet_files_count += file_count; } @@ -163,8 +232,8 @@ fn update_persisted_files_with_snapshot( initial_load: bool, persisted_snapshot: PersistedSnapshot, db_to_tables: &mut HashMap>>, -) -> u64 { - let mut file_count = 0; +) -> (u64, u64, u64) { + let (mut file_count, mut removed_size, mut removed_row_count) = (0, 0, 0); persisted_snapshot .databases .into_iter() @@ -190,7 +259,32 @@ fn update_persisted_files_with_snapshot( } }); }); - file_count + + // We now remove any files as we load the snapshots if they exist. + persisted_snapshot + .removed_files + .into_iter() + .for_each(|(db_id, tables)| { + let db_tables: &mut HashMap> = + db_to_tables.entry(db_id).or_default(); + + tables + .tables + .into_iter() + .for_each(|(table_id, remove_parquet_files)| { + let table_files = db_tables.entry(table_id).or_default(); + for file in remove_parquet_files { + if let Some(idx) = table_files.iter().position(|f| f.id == file.id) { + let file = table_files.remove(idx); + file_count -= 1; + removed_size -= file.size_bytes; + removed_row_count -= file.row_count; + } + } + }); + }); + + (file_count, removed_size, removed_row_count) } #[cfg(test)] diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 450bf2ed29..f856f28cc4 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -24,6 +24,7 @@ use iox_query::QueryChunk; use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics}; use iox_query::exec::Executor; use iox_query::frontend::reorg::ReorgPlanner; +use object_store::Error; use object_store::path::Path; use observability_deps::tracing::{error, info}; use parking_lot::Mutex; @@ -218,6 +219,45 @@ impl QueryableBuffer { persisting_chunks }; + let removed_files = self + .persisted_files + .remove_files_by_retention_period(Arc::clone(&self.catalog)); + + for (_, tables) in &removed_files { + for (_, files) in &tables.tables { + for file in files { + let path = file.path.clone(); + let object_store = Arc::clone(&self.persister.object_store()); + // We've removed the file from the PersistedFiles field. + // We'll store them as part of the snapshot so that other parts + // that depend on knowing if they exist or not can update their + // index accordingly. We try to delete them on a best effort + // basis, but if they don't get deleted that's fine they aren't + // referenced anymore. + tokio::spawn(async move { + let mut retry_count = 0; + let path = path.into(); + while retry_count <= 10 { + match object_store.delete(&path).await { + Ok(()) => break, + // This was already deleted so we can just skip it + Err(Error::NotFound { .. }) => break, + Err(_) => { + retry_count += 1; + // Sleep and increase the time with each retry. + // This adds up to about 9 minutes over time. + tokio::time::sleep(tokio::time::Duration::from_secs( + retry_count * 10, + )) + .await; + } + } + } + }); + } + } + } + let (sender, receiver) = oneshot::channel(); let persister = Arc::clone(&self.persister); @@ -235,13 +275,16 @@ impl QueryableBuffer { persist_jobs.len(), wal_file_number.as_u64(), ); - // persist the individual files, building the snapshot as we go - let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new( + let mut snapshot = PersistedSnapshot::new( persister.node_identifier_prefix().to_string(), snapshot_details.snapshot_sequence_number, snapshot_details.last_wal_sequence_number, catalog.sequence_number(), - ))); + ); + + snapshot.removed_files = removed_files; + // persist the individual files, building the snapshot as we go + let persisted_snapshot = Arc::new(Mutex::new(snapshot)); let persist_jobs_empty = persist_jobs.is_empty(); let mut set = JoinSet::new(); @@ -314,11 +357,12 @@ impl QueryableBuffer { set.join_all().await; - // persist the snapshot file - only if persist jobs are present - // if persist_jobs is empty, then parquet file wouldn't have been - // written out, so it's desirable to not write empty snapshot file. + // persist the snapshot file - only if persist jobs are present or + // files have been removed due to retention policies. + // If persist_jobs is empty, then the parquet file wouldn't have been + // written out, so it's desirable to not write empty snapshot files. // - // How can persist jobs be empty even though snapshot is triggered? + // How can persist jobs be empty even though a snapshot is triggered? // // When force snapshot is set, wal_periods (tracked by // snapshot_tracker) will never be empty as a no-op is added. This @@ -347,12 +391,13 @@ impl QueryableBuffer { // force_snapshot) snapshot runs, snapshot_tracker will check if // wal_periods are empty so it won't trigger a snapshot in the first // place. + let removed_files_empty = persisted_snapshot.lock().removed_files.is_empty(); let persisted_snapshot = PersistedSnapshotVersion::V1( Arc::into_inner(persisted_snapshot) .expect("Should only have one strong reference") .into_inner(), ); - if !persist_jobs_empty { + if !persist_jobs_empty || !removed_files_empty { loop { match persister.persist_snapshot(&persisted_snapshot).await { Ok(_) => {