diff --git a/Cargo.lock b/Cargo.lock index 922aac4734..cbd6afd06f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2628,6 +2628,7 @@ dependencies = [ "parquet_file", "paste", "pin-project", + "predicate", "prost", "rand", "schema", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index e36ac29330..91c168deaf 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -32,6 +32,7 @@ once_cell = "1.18" parking_lot = "0.12.1" parquet_file = { version = "0.1.0", path = "../parquet_file" } pin-project = "1.1.2" +predicate = { version = "0.1.0", path = "../predicate" } prost = { version = "0.11.9", default-features = false, features = ["std"] } rand = "0.8.5" schema = { version = "0.1.0", path = "../schema" } diff --git a/ingester/src/buffer_tree/table.rs b/ingester/src/buffer_tree/table.rs index 1953a8dd44..b050a13449 100644 --- a/ingester/src/buffer_tree/table.rs +++ b/ingester/src/buffer_tree/table.rs @@ -2,12 +2,18 @@ pub(crate) mod metadata_resolver; -use std::{fmt::Debug, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; use async_trait::async_trait; use data_types::{ - partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionKey, SequenceNumber, - Table, TableId, + partition_template::{build_column_values, ColumnValue, TablePartitionTemplateOverride}, + NamespaceId, PartitionKey, SequenceNumber, Table, TableId, +}; +use datafusion::scalar::ScalarValue; +use iox_query::{ + chunk_statistics::{create_chunk_statistics, ColumnRange}, + pruning::keep_after_pruning, + QueryChunk, }; use mutable_batch::MutableBatch; use parking_lot::Mutex; @@ -247,7 +253,7 @@ where table_id: TableId, columns: Vec, span: Option, - _predicate: Option, + predicate: Option, ) -> Result { assert_eq!(self.table_id, table_id, "buffer tree index inconsistency"); assert_eq!( @@ -255,26 +261,118 @@ where "buffer tree index inconsistency" ); + let table_partition_template = self.catalog_table.get().await.partition_template; + // Gather the partition data from all of the partitions in this table. let span = SpanRecorder::new(span); let partitions = self.partitions().into_iter().map(move |p| { let mut span = span.child("partition read"); - let (id, hash_id, completed_persistence_count, data) = { + let (id, hash_id, completed_persistence_count, data, partition_key) = { let mut p = p.lock(); ( p.partition_id(), p.partition_hash_id().cloned(), p.completed_persistence_count(), p.get_query_data(), + p.partition_key().clone(), ) }; let ret = match data { Some(data) => { - // TODO(savage): Apply predicate here through the projection? assert_eq!(id, data.partition_id()); + let data = Arc::new(data); + if let Some(predicate) = &predicate { + // Filter using the partition key + let column_ranges = Arc::new( + build_column_values(&table_partition_template, partition_key.inner()) + .filter_map(|(col, val)| { + let range = match val { + ColumnValue::Identity(s) => { + let s = Arc::new(ScalarValue::from(s.as_ref())); + ColumnRange { + min_value: Arc::clone(&s), + max_value: s, + } + } + ColumnValue::Prefix(p) => { + if p.is_empty() { + // full range => value is useless + return None; + } + + // If the partition only has a prefix of the tag value (it was truncated) then form a conservative + // range: + // + // + // # Minimum + // Use the prefix itself. + // + // Note that the minimum is inclusive. + // + // All values in the partition are either: + // - identical to the prefix, in which case they are included by the inclusive minimum + // - have the form `""`, and it holds that `"" > ""` for all + // strings `""`. + // + // + // # Maximum + // Use `""`. + // + // Note that the maximum is inclusive. + // + // All strings in this partition must be smaller than this constructed maximum, because + // string comparison is front-to-back and the `"" > ""`. + + let min_value = Arc::new(ScalarValue::from(p.as_ref())); + + let mut chars = p.as_ref().chars().collect::>(); + *chars + .last_mut() + .expect("checked that prefix is not empty") = + std::char::MAX; + let max_value = Arc::new(ScalarValue::from( + chars.into_iter().collect::().as_str(), + )); + + ColumnRange { + min_value, + max_value, + } + } + }; + + Some((Arc::from(col), range)) + }) + .collect::>(), + ); + + let chunk_statistics = Arc::new(create_chunk_statistics( + data.num_rows(), + data.schema(), + data.ts_min_max(), + &column_ranges, + )); + + if !keep_after_pruning( + data.schema(), + chunk_statistics, + data.schema().as_arrow(), + predicate, + ) + .expect("TODO FIX THIS") + { + return PartitionResponse::new( + vec![], + id, + hash_id, + completed_persistence_count, + ); + } + } + // Project the data if necessary let columns = columns.iter().map(String::as_str).collect::>(); let selection = if columns.is_empty() { diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index 591fca003c..2a4b7a91e1 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -5,7 +5,7 @@ use std::{any::Any, sync::Arc}; use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; -use data_types::{ChunkId, ChunkOrder, PartitionId}; +use data_types::{ChunkId, ChunkOrder, PartitionId, TimestampMinMax}; use datafusion::physical_plan::Statistics; use iox_query::{ util::{compute_timenanosecond_min_max, create_basic_summary}, @@ -105,16 +105,26 @@ impl QueryAdaptor { pub(crate) fn partition_id(&self) -> PartitionId { self.partition_id } + + /// Number of rows, useful for building stats + pub(crate) fn num_rows(&self) -> u64 { + self.data.iter().map(|b| b.num_rows()).sum::() as u64 + } + + /// Time range, useful for building stats + pub(crate) fn ts_min_max(&self) -> TimestampMinMax { + compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref())) + .expect("Should have time range") + } } impl QueryChunk for QueryAdaptor { fn stats(&self) -> Arc { Arc::clone(self.stats.get_or_init(|| { - let ts_min_max = compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref())) - .expect("Should have time range"); + let ts_min_max = self.ts_min_max(); Arc::new(create_basic_summary( - self.data.iter().map(|b| b.num_rows()).sum::() as u64, + self.num_rows(), self.schema(), ts_min_max, )) diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index d75195d0a0..6db84f385a 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -86,6 +86,45 @@ pub fn prune_chunks( prune_summaries(table_schema, &summaries, predicate) } +/// Check one set of statistics against the predicate +pub fn keep_after_pruning( + table_schema: &Schema, + chunk_statistics: Arc, + chunk_schema: SchemaRef, + predicate: &Predicate, +) -> Result { + let filter_expr = match predicate.filter_expr() { + Some(expr) => expr, + None => { + return Ok(true); + } + }; + trace!(%filter_expr, "Filter_expr of pruning chunk"); + + let props = ExecutionProps::new(); + let pruning_predicate = + match create_pruning_predicate(&props, &filter_expr, &table_schema.as_arrow()) { + Ok(p) => p, + Err(e) => { + warn!(%e, ?filter_expr, "Cannot create pruning predicate"); + return Err(NotPrunedReason::CanNotCreatePruningPredicate); + } + }; + + let statistics = ChunkPruningStatistics { + table_schema, + summaries: &[(chunk_statistics, chunk_schema)], + }; + + let result = match pruning_predicate.prune(&statistics) { + Ok(results) => results.into_iter().next().expect("TODO FIX THIS"), + Err(e) => { + warn!(%e, ?filter_expr, "DataFusion pruning failed"); + return Err(NotPrunedReason::DataFusionPruningFailed); + } + }; + Ok(result) +} /// Given a `Vec` of pruning summaries, return a `Vec` where `false` indicates that the /// predicate can be proven to evaluate to `false` for every single row. pub fn prune_summaries(