refactor: Minor cleanup of retention predicate code (#6211)

* refactor: Minor cleanup of retention predicate code

* fix: use cow
pull/24376/head
Andrew Lamb 2022-11-22 13:28:54 -05:00 committed by GitHub
parent dd1755b23a
commit f89d542715
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 25 additions and 30 deletions

View File

@ -19,6 +19,7 @@ use predicate::Predicate;
use schema::Schema;
use sharder::JumpHash;
use snafu::{ResultExt, Snafu};
use std::borrow::Cow;
use std::collections::HashSet;
use std::time::Duration;
use std::{
@ -234,36 +235,30 @@ impl QuerierTable {
"Fetching all chunks"
);
let (predicate_with_retention, retention_delete_pred) =
match self.namespace_retention_period {
// The retention is not fininte, add predicate to filter out data outside retention period
Some(retention_period) => {
let retention_time_ns = self
.chunk_adapter
.catalog_cache()
.time_provider()
.now()
.timestamp_nanos()
- retention_period.as_nanos() as i64;
let (predicate, retention_delete_pred) = match self.namespace_retention_period {
// The retention is not fininte, add predicate to filter out data outside retention period
Some(retention_period) => {
let retention_time_ns = self
.chunk_adapter
.catalog_cache()
.time_provider()
.now()
.timestamp_nanos()
- retention_period.as_nanos() as i64;
// Add predicate to only keep chunks inside the retention period: time >= retention_period
let predicate = Some(predicate.clone().with_retention(retention_time_ns));
// Add predicate to only keep chunks inside the retention period: time >= retention_period
let predicate = predicate.clone().with_retention(retention_time_ns);
// Expression used to add to delete predicate to delete data older than retention period
// time < retention_time
let retention_delete_pred = Some(DeletePredicate::retention_delete_predicate(
retention_time_ns,
));
// Expression used to add to delete predicate to delete data older than retention period
// time < retention_time
let retention_delete_pred = Some(DeletePredicate::retention_delete_predicate(
retention_time_ns,
));
(predicate, retention_delete_pred)
}
// inifite retention, no need to add predicate
None => (None, None),
};
let predicate = if predicate_with_retention.is_some() {
predicate_with_retention.as_ref().unwrap()
} else {
predicate
(Cow::Owned(predicate), retention_delete_pred)
}
// inifite retention, no need to add predicate
None => (Cow::Borrowed(predicate), None),
};
let catalog_cache = self.chunk_adapter.catalog_cache();
@ -272,7 +267,7 @@ impl QuerierTable {
// contents at the same time to pre-warm cache
let (partitions, _parquet_files, _tombstones) = join!(
self.ingester_partitions(
predicate,
&predicate,
span_recorder.child_span("ingester partitions"),
projection
),
@ -365,7 +360,7 @@ impl QuerierTable {
let keeps = match prune_summaries(
Arc::clone(&cached_table.schema),
&basic_summaries,
predicate,
&predicate,
) {
Ok(keeps) => keeps,
Err(reason) => {
@ -426,7 +421,7 @@ impl QuerierTable {
self.table_name(),
Arc::clone(&self.schema),
chunks,
predicate,
&predicate,
)
.context(ChunkPruningSnafu)?;
debug!(%predicate, num_initial_chunks, num_final_chunks=chunks.len(), "pruned with pushed down predicates");