chore: backport retention period implementation from enterprise

Includes two main components:

* Removal of expired data from `PersistedFiles`.
* Modified `ChunkFilter` that precisely excludes expired data from query
  results even if the expired data hasn't been removed from the object
  store yet.

---------

Co-authored-by: Michael Gattozzi <mgattozzi@influxdata.com>
backport/influxdb3_write_retention-period
wayne 2025-06-05 13:58:21 -06:00 committed by wayne warren
parent d499c59bb1
commit bf9fa9f780
7 changed files with 337 additions and 45 deletions

View File

@ -643,18 +643,23 @@ impl Catalog {
Ok((token_info, token))
}
// Return the oldest allowable timestamp for the given table according to the
// currently-available set of retention policies. This is returned as a number of nanoseconds
// since the Unix Epoch.
pub fn get_retention_period_cutoff_ts_nanos(&self, db_id: &DbId, _: &TableId) -> Option<i64> {
let db = self.db_schema_by_id(db_id)?;
let retention_period = match db.retention_period {
RetentionPeriod::Duration(d) => Some(d.as_nanos() as u64),
RetentionPeriod::Indefinite => None,
}?;
let now = self.time_provider.now().timestamp_nanos();
Some(now - retention_period as i64)
// Return a map of all retention periods indexed by their combined database & table IDs.
pub fn get_retention_period_cutoff_map(&self) -> BTreeMap<(DbId, TableId), i64> {
self.list_db_schema()
.into_iter()
.flat_map(|db_schema| {
db_schema
.tables()
.filter_map(|table_def| {
let db_id = db_schema.id();
let table_id = table_def.id();
db_schema
.get_retention_period_cutoff_ts_nanos(self.time_provider())
.map(|cutoff| ((db_id, table_id), cutoff))
})
.collect::<Vec<_>>()
})
.collect()
}
}
@ -1401,6 +1406,22 @@ impl DatabaseSchema {
},
)
}
// Return the oldest allowable timestamp for the given table according to the
// currently-available set of retention policies. This is returned as a number of nanoseconds
// since the Unix Epoch.
pub fn get_retention_period_cutoff_ts_nanos(
&self,
time_provider: Arc<dyn TimeProvider>,
) -> Option<i64> {
let retention_period = match self.retention_period {
RetentionPeriod::Duration(d) => Some(d.as_nanos() as u64),
RetentionPeriod::Indefinite => None,
}?;
let now = time_provider.now().timestamp_nanos();
Some(now - retention_period as i64)
}
}
trait UpdateDatabaseSchema {
@ -2825,6 +2846,97 @@ mod tests {
);
}
#[test_log::test(tokio::test)]
async fn retention_period_cutoff_map() {
use iox_time::MockProvider;
let now = Time::from_timestamp(60 * 60 * 24, 0).unwrap();
let time_provider = Arc::new(MockProvider::new(now));
let catalog =
Catalog::new_in_memory_with_args("test", time_provider as _, CatalogArgs::default())
.await
.unwrap();
let testdb1 = "test-db";
let mut txn = catalog.begin(testdb1).unwrap();
for i in 0..4 {
let table_name = format!("test-table-{i}");
txn.table_or_create(&table_name).unwrap();
txn.column_or_create(&table_name, "field", FieldDataType::String)
.unwrap();
txn.column_or_create(&table_name, "time", FieldDataType::Timestamp)
.unwrap();
}
catalog.commit(txn).await.unwrap();
let testdb2 = "test-db-2";
let mut txn = catalog.begin(testdb2).unwrap();
for i in 0..4 {
let table_name = format!("test-table-{i}");
txn.table_or_create(&table_name).unwrap();
txn.column_or_create(&table_name, "field", FieldDataType::String)
.unwrap();
txn.column_or_create(&table_name, "time", FieldDataType::Timestamp)
.unwrap();
}
catalog.commit(txn).await.unwrap();
let database_retention = Duration::from_secs(15);
let database_cutoff = now - database_retention;
// set per-table and database-level retention periods on table 2
catalog
.set_retention_period_for_database(testdb2, database_retention)
.await
.expect("must be able to set retention for database");
let map = catalog.get_retention_period_cutoff_map();
assert_eq!(map.len(), 4, "expect 4 entries in resulting map");
// validate tables where there is either a table or a database retention set
for (db_name, table_name, expected_cutoff) in [
(testdb2, "test-table-0", database_cutoff.timestamp_nanos()),
(testdb2, "test-table-1", database_cutoff.timestamp_nanos()),
(testdb2, "test-table-2", database_cutoff.timestamp_nanos()),
(testdb2, "test-table-3", database_cutoff.timestamp_nanos()),
] {
let db_schema = catalog
.db_schema(db_name)
.expect("must be able to get expected database schema");
let table_def = db_schema
.table_definition(table_name)
.expect("must be able to get expected table definition");
let cutoff = map
.get(&(db_schema.id(), table_def.id()))
.expect("expected retention period must exist");
assert_eq!(
*cutoff, expected_cutoff,
"expected cutoff must match actual"
);
}
// validate tables with no retention set
for (db_name, table_name) in [
(testdb1, "test-table-0"),
(testdb1, "test-table-1"),
(testdb1, "test-table-2"),
(testdb1, "test-table-3"),
] {
let db_schema = catalog
.db_schema(db_name)
.expect("must be able to get expected database schema");
let table_def = db_schema
.table_definition(table_name)
.expect("must be able to get expected table definition");
let v = map.get(&(db_schema.id(), table_def.id()));
assert!(
v.is_none(),
"no retention period cutoff expected for {db_name}/{table_name}"
);
}
}
#[test_log::test(tokio::test)]
async fn test_catalog_file_ordering() {
let local_disk =

View File

@ -682,9 +682,31 @@ impl QueryTable {
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let buffer_filter = ChunkFilter::new(&self.table_def, filters)
let mut buffer_filter = ChunkFilter::new(&self.table_def, filters)
.map_err(|error| DataFusionError::External(Box::new(error)))?;
let catalog = self.write_buffer.catalog();
let retention_period_cutoff = match self
.db_schema
.get_retention_period_cutoff_ts_nanos(catalog.time_provider())
{
Some(time) => time,
None => {
return self.write_buffer.get_table_chunks(
Arc::clone(&self.db_schema),
Arc::clone(&self.table_def),
&buffer_filter,
projection,
ctx,
);
}
};
buffer_filter.time_lower_bound_ns = buffer_filter
.time_lower_bound_ns
.map(|lb| lb.max(retention_period_cutoff))
.or(Some(retention_period_cutoff));
self.write_buffer.get_table_chunks(
Arc::clone(&self.db_schema),
Arc::clone(&self.table_def),

View File

@ -188,6 +188,10 @@ pub struct PersistedSnapshot {
/// The collection of databases that had tables persisted in this snapshot. The tables will then have their
/// name and the parquet file.
pub databases: SerdeVecMap<DbId, DatabaseTables>,
/// The collection of databases that had files removed in this snapshot.
/// The tables will then have their name and the parquet file that was removed.
#[serde(default)]
pub removed_files: SerdeVecMap<DbId, DatabaseTables>,
}
impl PersistedSnapshot {
@ -208,6 +212,7 @@ impl PersistedSnapshot {
min_time: i64::MAX,
max_time: i64::MIN,
databases: SerdeVecMap::new(),
removed_files: SerdeVecMap::new(),
}
}
@ -273,9 +278,9 @@ pub struct ParquetFile {
pub row_count: u64,
/// chunk time nanos
pub chunk_time: i64,
/// min time nanos
/// min time nanos; aka the time of the oldest record in the file
pub min_time: i64,
/// max time nanos
/// max time nanos; aka the time of the newest record in the file
pub max_time: i64,
}
@ -343,17 +348,18 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision {
/// A derived set of filters that are used to prune data in the buffer when serving queries
#[derive(Debug, Default)]
pub struct ChunkFilter<'a> {
time_lower_bound_ns: Option<i64>,
time_upper_bound_ns: Option<i64>,
pub time_lower_bound_ns: Option<i64>,
pub time_upper_bound_ns: Option<i64>,
filters: &'a [Expr],
}
impl<'a> ChunkFilter<'a> {
/// Create a new `ChunkFilter` given a [`TableDefinition`] and set of filter [`Expr`]s from
/// a logical query plan.
/// Create a new `ChunkFilter` given a [`TableDefinition`] and set of filter
/// [`Expr`]s from a logical query plan.
///
/// This method analyzes the incoming `exprs` to determine if there are any filters on the
/// `time` column and attempt to derive the boundaries on `time` from the query.
/// This method analyzes the incoming `exprs` to determine if there are any
/// filters on the `time` column and attempts to derive the boundaries on
/// `time` from the query.
pub fn new(table_def: &Arc<TableDefinition>, exprs: &'a [Expr]) -> Result<Self> {
debug!(input = ?exprs, "creating chunk filter");
let mut time_interval: Option<Interval> = None;
@ -370,8 +376,8 @@ impl<'a> ChunkFilter<'a> {
let props = ExecutionProps::new();
for expr in exprs.iter().filter(|e| {
// NOTE: filter out most expression types, as they are not relevant to time bound
// analysis:
// NOTE: filter out most expression types, as they are not relevant to
// time bound analysis:
matches!(e, Expr::BinaryExpr(_) | Expr::Not(_) | Expr::Between(_))
// Check if the expression refers to the `time` column:
&& e.column_refs()
@ -390,12 +396,14 @@ impl<'a> ChunkFilter<'a> {
)
.context("unable to analyze provided filters for a boundary on the time column")?;
// Set the boundaries on the time column using the evaluated interval, if it exisxts
// If an interval was already derived from a previous expression, we take their
// intersection, or produce an error if:
// Set the boundaries on the time column using the evaluated
// interval, if it exists.
// If an interval was already derived from a previous expression, we
// take their intersection, or produce an error if:
// - the derived intervals are not compatible (different types)
// - the derived intervals do not intersect, this should be a user error, i.e., a
// poorly formed query
// - the derived intervals do not intersect, this should be a user
// error, i.e., a poorly formed query or querying outside of a
// retention policy for the table
if let Some(ExprBoundaries { interval, .. }) = (time_col_index
< analysis.boundaries.len())
.then_some(analysis.boundaries.remove(time_col_index))
@ -581,6 +589,7 @@ mod tests {
max_time: 1,
row_count: 0,
parquet_size_bytes: 0,
removed_files: SerdeVecMap::new(),
};
// db 2 setup
@ -623,6 +632,7 @@ mod tests {
max_time: 1,
row_count: 0,
parquet_size_bytes: 0,
removed_files: SerdeVecMap::new(),
};
let overall_counts = PersistedSnapshot::overall_db_table_file_counts(&[

View File

@ -374,6 +374,7 @@ mod tests {
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::new(0),
databases: SerdeVecMap::new(),
removed_files: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
@ -396,6 +397,7 @@ mod tests {
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: SerdeVecMap::new(),
removed_files: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
@ -408,6 +410,7 @@ mod tests {
wal_file_sequence_number: WalFileSequenceNumber::new(1),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: SerdeVecMap::new(),
removed_files: SerdeVecMap::new(),
max_time: 1,
min_time: 0,
row_count: 0,
@ -420,6 +423,7 @@ mod tests {
wal_file_sequence_number: WalFileSequenceNumber::new(2),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: SerdeVecMap::new(),
removed_files: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
@ -454,6 +458,7 @@ mod tests {
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: SerdeVecMap::new(),
removed_files: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
@ -481,6 +486,7 @@ mod tests {
wal_file_sequence_number: WalFileSequenceNumber::new(id),
catalog_sequence_number: CatalogSequenceNumber::new(id),
databases: SerdeVecMap::new(),
removed_files: SerdeVecMap::new(),
min_time: 0,
max_time: 1,
row_count: 0,
@ -619,6 +625,7 @@ mod tests {
row_count: 1,
min_time: 0,
max_time: 1,
removed_files: SerdeVecMap::new(),
databases,
});
insta::assert_json_snapshot!(snapshot);

View File

@ -1,6 +1,7 @@
---
source: influxdb3_write/src/persister.rs
expression: snapshot
snapshot_kind: text
---
{
"version": "1",
@ -120,5 +121,6 @@ expression: snapshot
]
}
]
]
],
"removed_files": []
}

View File

@ -2,11 +2,14 @@
//! When queries come in they will combine whatever chunks exist from `QueryableBuffer` with
//! the persisted files to get the full set of data to query.
use crate::ChunkFilter;
use std::sync::Arc;
use crate::{ChunkFilter, DatabaseTables};
use crate::{ParquetFile, PersistedSnapshot};
use hashbrown::HashMap;
use influxdb3_id::DbId;
use hashbrown::{HashMap, HashSet};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_id::TableId;
use influxdb3_id::{DbId, SerdeVecMap};
use influxdb3_telemetry::ParquetMetrics;
use parking_lot::RwLock;
@ -71,6 +74,68 @@ impl PersistedFiles {
files
}
pub fn remove_files_by_retention_period(
&self,
catalog: Arc<Catalog>,
) -> SerdeVecMap<DbId, DatabaseTables> {
let mut removed: SerdeVecMap<DbId, DatabaseTables> = SerdeVecMap::new();
let mut removed_paths: HashSet<String> = HashSet::new();
let mut size = 0;
let mut row_count = 0;
let retention_periods = catalog.get_retention_period_cutoff_map();
// nothing to do if there are no retention periods, return empty result
if retention_periods.is_empty() {
return removed;
}
// do all retention period checking of persisted files under a read lock to prevent
// blocking unnecessarily blocking concurrently-running queries
{
let guard = self.inner.read();
for ((db_id, table_id), cutoff) in catalog.get_retention_period_cutoff_map() {
let Some(files) = guard.files.get(&db_id).and_then(|hm| hm.get(&table_id)) else {
continue;
};
for file in files {
// remove files if their max time (aka newest timestamp) is less than (aka older
// than) the cutoff timestamp for the retention period
if file.max_time < cutoff {
size += file.size_bytes;
row_count += file.row_count;
removed
.entry(db_id)
.or_default()
.tables
.entry(table_id)
.or_default()
.push(file.clone());
removed_paths.insert(file.path.clone());
}
}
}
}
// if no persisted files are found to be in violation of their retention period, then
// return an empty result to avoid unnecessarily acquiring a write lock
if removed.is_empty() {
return removed;
}
let mut guard = self.inner.write();
for (_, tables) in guard.files.iter_mut() {
for (_, files) in tables.iter_mut() {
files.retain(|file| !removed_paths.contains(&file.path))
}
}
guard.parquet_files_count -= removed.len() as u64;
guard.parquet_files_size_mb -= as_mb(size);
guard.parquet_files_row_count -= row_count;
removed
}
}
impl ParquetMetrics for PersistedFiles {
@ -110,9 +175,11 @@ impl Inner {
|mut files, persisted_snapshot| {
size_in_mb += as_mb(persisted_snapshot.parquet_size_bytes);
row_count += persisted_snapshot.row_count;
let parquet_files_added =
let (parquet_files_added, removed_size, removed_row_count) =
update_persisted_files_with_snapshot(true, persisted_snapshot, &mut files);
file_count += parquet_files_added;
size_in_mb -= as_mb(removed_size);
row_count -= removed_row_count;
files
},
);
@ -128,8 +195,10 @@ impl Inner {
pub(crate) fn add_persisted_snapshot(&mut self, persisted_snapshot: PersistedSnapshot) {
self.parquet_files_row_count += persisted_snapshot.row_count;
self.parquet_files_size_mb += as_mb(persisted_snapshot.parquet_size_bytes);
let file_count =
let (file_count, removed_file_size, removed_row_count) =
update_persisted_files_with_snapshot(false, persisted_snapshot, &mut self.files);
self.parquet_files_row_count -= removed_row_count;
self.parquet_files_size_mb -= as_mb(removed_file_size);
self.parquet_files_count += file_count;
}
@ -163,8 +232,8 @@ fn update_persisted_files_with_snapshot(
initial_load: bool,
persisted_snapshot: PersistedSnapshot,
db_to_tables: &mut HashMap<DbId, HashMap<TableId, Vec<ParquetFile>>>,
) -> u64 {
let mut file_count = 0;
) -> (u64, u64, u64) {
let (mut file_count, mut removed_size, mut removed_row_count) = (0, 0, 0);
persisted_snapshot
.databases
.into_iter()
@ -190,7 +259,32 @@ fn update_persisted_files_with_snapshot(
}
});
});
file_count
// We now remove any files as we load the snapshots if they exist.
persisted_snapshot
.removed_files
.into_iter()
.for_each(|(db_id, tables)| {
let db_tables: &mut HashMap<TableId, Vec<ParquetFile>> =
db_to_tables.entry(db_id).or_default();
tables
.tables
.into_iter()
.for_each(|(table_id, remove_parquet_files)| {
let table_files = db_tables.entry(table_id).or_default();
for file in remove_parquet_files {
if let Some(idx) = table_files.iter().position(|f| f.id == file.id) {
let file = table_files.remove(idx);
file_count -= 1;
removed_size -= file.size_bytes;
removed_row_count -= file.row_count;
}
}
});
});
(file_count, removed_size, removed_row_count)
}
#[cfg(test)]

View File

@ -24,6 +24,7 @@ use iox_query::QueryChunk;
use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics};
use iox_query::exec::Executor;
use iox_query::frontend::reorg::ReorgPlanner;
use object_store::Error;
use object_store::path::Path;
use observability_deps::tracing::{error, info};
use parking_lot::Mutex;
@ -218,6 +219,45 @@ impl QueryableBuffer {
persisting_chunks
};
let removed_files = self
.persisted_files
.remove_files_by_retention_period(Arc::clone(&self.catalog));
for (_, tables) in &removed_files {
for (_, files) in &tables.tables {
for file in files {
let path = file.path.clone();
let object_store = Arc::clone(&self.persister.object_store());
// We've removed the file from the PersistedFiles field.
// We'll store them as part of the snapshot so that other parts
// that depend on knowing if they exist or not can update their
// index accordingly. We try to delete them on a best effort
// basis, but if they don't get deleted that's fine they aren't
// referenced anymore.
tokio::spawn(async move {
let mut retry_count = 0;
let path = path.into();
while retry_count <= 10 {
match object_store.delete(&path).await {
Ok(()) => break,
// This was already deleted so we can just skip it
Err(Error::NotFound { .. }) => break,
Err(_) => {
retry_count += 1;
// Sleep and increase the time with each retry.
// This adds up to about 9 minutes over time.
tokio::time::sleep(tokio::time::Duration::from_secs(
retry_count * 10,
))
.await;
}
}
}
});
}
}
}
let (sender, receiver) = oneshot::channel();
let persister = Arc::clone(&self.persister);
@ -235,13 +275,16 @@ impl QueryableBuffer {
persist_jobs.len(),
wal_file_number.as_u64(),
);
// persist the individual files, building the snapshot as we go
let persisted_snapshot = Arc::new(Mutex::new(PersistedSnapshot::new(
let mut snapshot = PersistedSnapshot::new(
persister.node_identifier_prefix().to_string(),
snapshot_details.snapshot_sequence_number,
snapshot_details.last_wal_sequence_number,
catalog.sequence_number(),
)));
);
snapshot.removed_files = removed_files;
// persist the individual files, building the snapshot as we go
let persisted_snapshot = Arc::new(Mutex::new(snapshot));
let persist_jobs_empty = persist_jobs.is_empty();
let mut set = JoinSet::new();
@ -314,11 +357,12 @@ impl QueryableBuffer {
set.join_all().await;
// persist the snapshot file - only if persist jobs are present
// if persist_jobs is empty, then parquet file wouldn't have been
// written out, so it's desirable to not write empty snapshot file.
// persist the snapshot file - only if persist jobs are present or
// files have been removed due to retention policies.
// If persist_jobs is empty, then the parquet file wouldn't have been
// written out, so it's desirable to not write empty snapshot files.
//
// How can persist jobs be empty even though snapshot is triggered?
// How can persist jobs be empty even though a snapshot is triggered?
//
// When force snapshot is set, wal_periods (tracked by
// snapshot_tracker) will never be empty as a no-op is added. This
@ -347,12 +391,13 @@ impl QueryableBuffer {
// force_snapshot) snapshot runs, snapshot_tracker will check if
// wal_periods are empty so it won't trigger a snapshot in the first
// place.
let removed_files_empty = persisted_snapshot.lock().removed_files.is_empty();
let persisted_snapshot = PersistedSnapshotVersion::V1(
Arc::into_inner(persisted_snapshot)
.expect("Should only have one strong reference")
.into_inner(),
);
if !persist_jobs_empty {
if !persist_jobs_empty || !removed_files_empty {
loop {
match persister.persist_snapshot(&persisted_snapshot).await {
Ok(_) => {