diff --git a/compactor/src/components/df_planner/query_chunk.rs b/compactor/src/components/df_planner/query_chunk.rs index f944295928..7860e97c40 100644 --- a/compactor/src/components/df_planner/query_chunk.rs +++ b/compactor/src/components/df_planner/query_chunk.rs @@ -1,8 +1,8 @@ //! QueryableParquetChunk for building query plan use std::{any::Any, sync::Arc}; -use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; -use datafusion::error::DataFusionError; +use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId}; +use datafusion::{error::DataFusionError, physical_plan::Statistics}; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::create_basic_summary, @@ -26,7 +26,7 @@ pub struct QueryableParquetChunk { partition_id: PartitionId, sort_key: Option, order: ChunkOrder, - summary: Arc, + stats: Arc, } impl QueryableParquetChunk { @@ -37,7 +37,7 @@ impl QueryableParquetChunk { sort_key: Option, order: ChunkOrder, ) -> Self { - let summary = Arc::new(create_basic_summary( + let stats = Arc::new(create_basic_summary( data.rows() as u64, data.schema(), data.timestamp_min_max(), @@ -48,7 +48,7 @@ impl QueryableParquetChunk { partition_id, sort_key, order, - summary, + stats, } } @@ -68,8 +68,8 @@ impl QueryableParquetChunk { } impl QueryChunkMeta for QueryableParquetChunk { - fn summary(&self) -> Arc { - Arc::clone(&self.summary) + fn stats(&self) -> Arc { + Arc::clone(&self.stats) } fn schema(&self) -> &Schema { diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index 8e93855243..f9b9964b5f 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -5,8 +5,8 @@ use std::{any::Any, sync::Arc}; use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; -use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; -use datafusion::error::DataFusionError; +use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId}; +use datafusion::{error::DataFusionError, physical_plan::Statistics}; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::{compute_timenanosecond_min_max, create_basic_summary}, @@ -41,8 +41,8 @@ pub struct QueryAdaptor { /// An interned schema for all [`RecordBatch`] in data. schema: Schema, - /// An interned table summary. - summary: OnceCell>, + /// An interned stats. + stats: OnceCell>, } impl QueryAdaptor { @@ -67,7 +67,7 @@ impl QueryAdaptor { // use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process. id: ChunkId::new(), schema, - summary: OnceCell::default(), + stats: OnceCell::default(), } } @@ -110,8 +110,8 @@ impl QueryAdaptor { } impl QueryChunkMeta for QueryAdaptor { - fn summary(&self) -> Arc { - Arc::clone(self.summary.get_or_init(|| { + fn stats(&self) -> Arc { + Arc::clone(self.stats.get_or_init(|| { let ts_min_max = compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref())) .expect("Should have time range"); diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 6eac1f08c2..242b9f194c 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -20,8 +20,8 @@ use arrow::{ record_batch::RecordBatch, }; use async_trait::async_trait; -use data_types::{ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary}; -use datafusion::{error::DataFusionError, prelude::SessionContext}; +use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId}; +use datafusion::{error::DataFusionError, physical_plan::Statistics, prelude::SessionContext}; use exec::{stringset::StringSet, IOxSessionContext}; use hashbrown::HashMap; use observability_deps::tracing::{debug, trace}; @@ -30,7 +30,7 @@ use parquet_file::storage::ParquetExecInput; use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate}; use schema::{ sort::{SortKey, SortKeyBuilder}, - Projection, Schema, TIME_COLUMN_NAME, + InfluxColumnType, Projection, Schema, TIME_COLUMN_NAME, }; use std::{any::Any, collections::BTreeSet, fmt::Debug, iter::FromIterator, sync::Arc}; @@ -62,8 +62,8 @@ pub fn chunk_order_field() -> Arc { /// Trait for an object (designed to be a Chunk) which can provide /// metadata pub trait QueryChunkMeta { - /// Return a summary of the data - fn summary(&self) -> Arc; + /// Return a statistics of the data + fn stats(&self) -> Arc; /// return a reference to the summary of the data held in this chunk fn schema(&self) -> &Schema; @@ -283,8 +283,8 @@ impl

QueryChunkMeta for Arc

where P: QueryChunkMeta, { - fn summary(&self) -> Arc { - self.as_ref().summary() + fn stats(&self) -> Arc { + self.as_ref().stats() } fn schema(&self) -> &Schema { @@ -308,8 +308,8 @@ where /// Implement `ChunkMeta` for `Arc` impl QueryChunkMeta for Arc { - fn summary(&self) -> Arc { - self.as_ref().summary() + fn stats(&self) -> Arc { + self.as_ref().stats() } fn schema(&self) -> &Schema { @@ -339,11 +339,10 @@ pub fn chunks_have_distinct_counts<'a>( // do not need to compute potential duplicates. We will treat // as all of them have duplicates chunks.into_iter().all(|chunk| { - chunk - .summary() - .columns - .iter() - .all(|col| col.stats.distinct_count().is_some()) + let Some(col_stats) = &chunk + .stats() + .column_statistics else {return false}; + col_stats.iter().all(|col| col.distinct_count.is_some()) }) } @@ -356,8 +355,7 @@ pub fn compute_sort_key_for_chunks<'a>( // sorted lexicographically but time column always last SortKey::from_columns(schema.primary_key()) } else { - let summaries = chunks.into_iter().map(|x| x.summary()); - compute_sort_key(summaries) + compute_sort_key(chunks.into_iter()) } } @@ -368,19 +366,18 @@ pub fn compute_sort_key_for_chunks<'a>( /// /// The cardinality is estimated by the sum of unique counts over all summaries. This may overestimate cardinality since /// it does not account for shared/repeated values. -fn compute_sort_key(summaries: impl Iterator>) -> SortKey { +fn compute_sort_key<'a>(chunks: impl Iterator>) -> SortKey { let mut cardinalities: HashMap = Default::default(); - for summary in summaries { - for column in &summary.columns { - if column.influxdb_type != InfluxDbType::Tag { + for chunk in chunks { + let stats = chunk.stats(); + let Some(col_stats) = stats.column_statistics.as_ref() else {continue}; + for ((influxdb_type, field), stats) in chunk.schema().iter().zip(col_stats) { + if influxdb_type != InfluxColumnType::Tag { continue; } - let mut cnt = 0; - if let Some(count) = column.stats.distinct_count() { - cnt = count.get(); - } - *cardinalities.entry_ref(column.name.as_str()).or_default() += cnt; + let cnt = stats.distinct_count.unwrap_or_default() as u64; + *cardinalities.entry_ref(field.name().as_str()).or_default() += cnt; } } diff --git a/iox_query/src/provider/overlap.rs b/iox_query/src/provider/overlap.rs index e9fee6b5e6..4ba1d84e6f 100644 --- a/iox_query/src/provider/overlap.rs +++ b/iox_query/src/provider/overlap.rs @@ -5,7 +5,9 @@ use crate::QueryChunk; use data_types::TimestampMinMax; +use datafusion::scalar::ScalarValue; use observability_deps::tracing::debug; +use schema::TIME_COLUMN_NAME; use std::sync::Arc; /// Groups query chunks into disjoint sets of overlapped time range. @@ -90,7 +92,27 @@ pub fn group_potential_duplicates( } fn timestamp_min_max(chunk: &dyn QueryChunk) -> Option { - chunk.summary().time_range() + chunk + .stats() + .column_statistics + .as_ref() + .and_then(|stats| { + chunk + .schema() + .find_index_of(TIME_COLUMN_NAME) + .map(|idx| &stats[idx]) + }) + .and_then(|stats| { + if let ( + Some(ScalarValue::TimestampNanosecond(Some(min), None)), + Some(ScalarValue::TimestampNanosecond(Some(max), None)), + ) = (&stats.min_value, &stats.max_value) + { + Some(TimestampMinMax::new(*min, *max)) + } else { + None + } + }) } #[cfg(test)] @@ -118,8 +140,16 @@ mod test { #[test] fn one_time_column_overlap_same_min_max() { - let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 1)); - let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(1, 1)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column() + .with_timestamp_min_max(1, 1), + ); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_time_column() + .with_timestamp_min_max(1, 1), + ); let groups = group_potential_duplicates(vec![c1, c2]); @@ -129,10 +159,26 @@ mod test { #[test] fn one_time_column_overlap_bad_case() { - let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10)); - let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(15, 30)); - let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(7, 20)); - let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column() + .with_timestamp_min_max(1, 10), + ); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_time_column() + .with_timestamp_min_max(15, 30), + ); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_time_column() + .with_timestamp_min_max(7, 20), + ); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_time_column() + .with_timestamp_min_max(25, 35), + ); let groups = group_potential_duplicates(vec![c1, c2, c3, c4]); @@ -142,10 +188,26 @@ mod test { #[test] fn one_time_column_overlap_contiguous() { - let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10)); - let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20)); - let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(15, 30)); - let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column() + .with_timestamp_min_max(1, 10), + ); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_time_column() + .with_timestamp_min_max(7, 20), + ); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_time_column() + .with_timestamp_min_max(15, 30), + ); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_time_column() + .with_timestamp_min_max(25, 35), + ); let groups = group_potential_duplicates(vec![c1, c2, c3, c4]); @@ -155,10 +217,26 @@ mod test { #[test] fn one_time_column_overlap_2_groups() { - let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10)); - let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20)); - let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(21, 30)); - let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column() + .with_timestamp_min_max(1, 10), + ); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_time_column() + .with_timestamp_min_max(7, 20), + ); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_time_column() + .with_timestamp_min_max(21, 30), + ); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_time_column() + .with_timestamp_min_max(25, 35), + ); let groups = group_potential_duplicates(vec![c1, c2, c3, c4]); @@ -168,10 +246,26 @@ mod test { #[test] fn one_time_column_overlap_3_groups() { - let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10)); - let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20)); - let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(21, 24)); - let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column() + .with_timestamp_min_max(1, 10), + ); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_time_column() + .with_timestamp_min_max(7, 20), + ); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_time_column() + .with_timestamp_min_max(21, 24), + ); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_time_column() + .with_timestamp_min_max(25, 35), + ); let groups = group_potential_duplicates(vec![c1, c4, c3, c2]); @@ -185,7 +279,11 @@ mod test { #[test] fn one_time_column_overlap_1_chunk() { - let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column() + .with_timestamp_min_max(1, 10), + ); let groups = group_potential_duplicates(vec![c1]); @@ -202,18 +300,28 @@ mod test { #[test] fn multi_columns_overlap_bad_case() { - let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column() + .with_timestamp_min_max(1, 10), + ); let c2 = Arc::new( TestChunk::new("chunk2") + .with_time_column() .with_timestamp_min_max(15, 30) .with_i64_field_column("field1"), ); let c3 = Arc::new( TestChunk::new("chunk3") + .with_time_column() .with_timestamp_min_max(7, 20) .with_tag_column("tag1"), ); - let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35)); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_time_column() + .with_timestamp_min_max(25, 35), + ); let groups = group_potential_duplicates(vec![c1, c2, c3, c4]); @@ -225,6 +333,7 @@ mod test { fn multi_columns_overlap_1_chunk() { let c1 = Arc::new( TestChunk::new("chunk1") + .with_time_column() .with_timestamp_min_max(1, 10) .with_tag_column("tag1"), ); @@ -239,16 +348,26 @@ mod test { fn multi_columns_overlap_3_groups() { let c1 = Arc::new( TestChunk::new("chunk1") + .with_time_column() .with_timestamp_min_max(1, 10) .with_tag_column("tag1"), ); - let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20)); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_time_column() + .with_timestamp_min_max(7, 20), + ); let c3 = Arc::new( TestChunk::new("chunk3") + .with_time_column() .with_timestamp_min_max(21, 24) .with_tag_column("tag2"), ); - let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35)); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_time_column() + .with_timestamp_min_max(25, 35), + ); let groups = group_potential_duplicates(vec![c1, c4, c3, c2]); diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index 27b6125415..48e9fb24ba 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -198,10 +198,13 @@ pub fn chunks_to_physical_nodes( // ensure that chunks are actually ordered by chunk order chunks.sort_by_key(|(_meta, c)| c.order()); - let num_rows = chunks - .iter() - .map(|(_meta, c)| c.summary().total_count() as usize) - .sum::(); + let num_rows = chunks.iter().map(|(_meta, c)| c.stats().num_rows).fold( + Some(0usize), + |accu, x| match (accu, x) { + (Some(accu), Some(x)) => Some(accu + x), + _ => None, + }, + ); let chunk_order_min = chunks .iter() .map(|(_meta, c)| c.order().get()) @@ -265,7 +268,7 @@ pub fn chunks_to_physical_nodes( }; let statistics = Statistics { - num_rows: Some(num_rows), + num_rows, total_byte_size: None, column_statistics: Some( schema diff --git a/iox_query/src/provider/record_batch_exec.rs b/iox_query/src/provider/record_batch_exec.rs index 5d1a94ba3e..f16a48bad2 100644 --- a/iox_query/src/provider/record_batch_exec.rs +++ b/iox_query/src/provider/record_batch_exec.rs @@ -1,10 +1,12 @@ //! Implementation of a DataFusion PhysicalPlan node across partition chunks -use crate::{QueryChunk, CHUNK_ORDER_COLUMN_NAME}; +use crate::{statistics::DFStatsAggregator, QueryChunk, CHUNK_ORDER_COLUMN_NAME}; use super::adapter::SchemaAdapterStream; -use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; -use data_types::{ColumnSummary, InfluxDbType, TableSummary}; +use arrow::{ + datatypes::{Schema, SchemaRef}, + record_batch::RecordBatch, +}; use datafusion::{ error::DataFusionError, execution::context::TaskContext, @@ -12,17 +14,16 @@ use datafusion::{ expressions::{Column, PhysicalSortExpr}, memory::MemoryStream, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, + SendableRecordBatchStream, Statistics, }, scalar::ScalarValue, }; use observability_deps::tracing::trace; use schema::sort::SortKey; use std::{ - borrow::Cow, collections::{HashMap, HashSet}, fmt, - num::NonZeroU64, sync::Arc, }; @@ -59,7 +60,9 @@ impl RecordBatchesExec { schema: SchemaRef, output_sort_key_memo: Option, ) -> Self { - let has_chunk_order_col = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).is_ok(); + let chunk_order_field = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).ok(); + let chunk_order_only_schema = + chunk_order_field.map(|field| Schema::new(vec![field.clone()])); let chunks: Vec<_> = chunks .into_iter() @@ -76,53 +79,35 @@ impl RecordBatchesExec { let statistics = chunks .iter() .fold( - None, - |mut combined_summary: Option, (chunk, _batches)| { - let summary = chunk.summary(); + DFStatsAggregator::new(&schema), + |mut agg, (chunk, _batches)| { + agg.update(&chunk.stats(), chunk.schema().as_arrow().as_ref()); - let summary = if has_chunk_order_col { - // add chunk order column + if let Some(schema) = chunk_order_only_schema.as_ref() { let order = chunk.order().get(); - let summary = TableSummary { - columns: summary - .columns - .iter() - .cloned() - .chain(std::iter::once(ColumnSummary { - name: CHUNK_ORDER_COLUMN_NAME.to_owned(), - influxdb_type: InfluxDbType::Field, - stats: data_types::Statistics::I64(data_types::StatValues { - min: Some(order), - max: Some(order), - total_count: summary.total_count(), - null_count: Some(0), - distinct_count: Some(NonZeroU64::new(1).unwrap()), - }), - })) - .collect(), - }; - - Cow::Owned(summary) - } else { - Cow::Borrowed(summary.as_ref()) - }; - - match combined_summary.as_mut() { - None => { - combined_summary = Some(summary.into_owned()); - } - Some(combined_summary) => { - combined_summary.update_from(&summary); - } + let order = ScalarValue::from(order); + agg.update( + &Statistics { + num_rows: Some(0), + total_byte_size: Some(0), + column_statistics: Some(vec![ColumnStatistics { + null_count: Some(0), + max_value: Some(order.clone()), + min_value: Some(order), + distinct_count: Some(1), + }]), + is_exact: true, + }, + schema, + ); } - combined_summary + agg }, ) - .map(|combined_summary| crate::statistics::df_from_iox(&schema, &combined_summary)) - .unwrap_or_default(); + .build(); - let output_ordering = if has_chunk_order_col { + let output_ordering = if chunk_order_field.is_some() { Some(vec![ // every chunk gets its own partition, so we can claim that the output is ordered PhysicalSortExpr { diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index 4836e4063d..21514fba42 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -2,15 +2,15 @@ use crate::{QueryChunk, QueryChunkMeta}; use arrow::{ - array::{ - ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt64Array, - }, - datatypes::{DataType, Int32Type, TimeUnit}, + array::{ArrayRef, UInt64Array}, + datatypes::{DataType, SchemaRef}, }; -use data_types::{StatValues, Statistics, TableSummary}; use datafusion::{ - physical_expr::execution_props::ExecutionProps, physical_optimizer::pruning::PruningStatistics, + physical_expr::execution_props::ExecutionProps, + physical_optimizer::pruning::PruningStatistics, + physical_plan::{ColumnStatistics, Statistics}, prelude::Column, + scalar::ScalarValue, }; use datafusion_util::create_pruning_predicate; use observability_deps::tracing::{debug, trace, warn}; @@ -79,7 +79,10 @@ pub fn prune_chunks( ) -> Result, NotPrunedReason> { let num_chunks = chunks.len(); debug!(num_chunks, %predicate, "Pruning chunks"); - let summaries: Vec<_> = chunks.iter().map(|c| c.summary()).collect(); + let summaries: Vec<_> = chunks + .iter() + .map(|c| (c.stats(), c.schema().as_arrow())) + .collect(); prune_summaries(table_schema, &summaries, predicate) } @@ -87,7 +90,7 @@ pub fn prune_chunks( /// predicate can be proven to evaluate to `false` for every single row. pub fn prune_summaries( table_schema: &Schema, - summaries: &[Arc], + summaries: &[(Arc, SchemaRef)], predicate: &Predicate, ) -> Result, NotPrunedReason> { let filter_expr = match predicate.filter_expr() { @@ -129,7 +132,7 @@ pub fn prune_summaries( /// interface required for pruning struct ChunkPruningStatistics<'a> { table_schema: &'a Schema, - summaries: &'a [Arc], + summaries: &'a [(Arc, SchemaRef)], } impl<'a> ChunkPruningStatistics<'a> { @@ -144,10 +147,12 @@ impl<'a> ChunkPruningStatistics<'a> { fn column_summaries<'b: 'a, 'c: 'a>( &'c self, column: &'b Column, - ) -> impl Iterator> + 'a { - self.summaries - .iter() - .map(|summary| Some(summary.column(&column.name)?.stats.clone())) + ) -> impl Iterator> + 'a { + self.summaries.iter().map(|(stats, schema)| { + let stats = stats.column_statistics.as_ref()?; + let idx = schema.index_of(&column.name).ok()?; + Some(&stats[idx]) + }) } } @@ -171,7 +176,7 @@ impl<'a> PruningStatistics for ChunkPruningStatistics<'a> { fn null_counts(&self, column: &Column) -> Option { let null_counts = self .column_summaries(column) - .map(|x| x.and_then(|s| s.null_count())); + .map(|x| x.and_then(|s| s.null_count.map(|x| x as u64))); Some(Arc::new(UInt64Array::from_iter(null_counts))) } @@ -179,70 +184,26 @@ impl<'a> PruningStatistics for ChunkPruningStatistics<'a> { /// Collects an [`ArrayRef`] containing the aggregate statistic corresponding to /// `aggregate` for each of the provided [`Statistics`] -fn collect_pruning_stats( +fn collect_pruning_stats<'a>( data_type: &DataType, - statistics: impl Iterator>, + statistics: impl Iterator>, aggregate: Aggregate, ) -> Option { - match data_type { - DataType::Int64 | DataType::Timestamp(TimeUnit::Nanosecond, None) => { - let values = statistics.map(|s| match s { - Some(Statistics::I64(v)) => get_aggregate(v, aggregate), - _ => None, - }); - Some(Arc::new(Int64Array::from_iter(values))) - } - DataType::UInt64 => { - let values = statistics.map(|s| match s { - Some(Statistics::U64(v)) => get_aggregate(v, aggregate), - _ => None, - }); - Some(Arc::new(UInt64Array::from_iter(values))) - } - DataType::Float64 => { - let values = statistics.map(|s| match s { - Some(Statistics::F64(v)) => get_aggregate(v, aggregate), - _ => None, - }); - Some(Arc::new(Float64Array::from_iter(values))) - } - DataType::Boolean => { - let values = statistics.map(|s| match s { - Some(Statistics::Bool(v)) => get_aggregate(v, aggregate), - _ => None, - }); - Some(Arc::new(BooleanArray::from_iter(values))) - } - DataType::Utf8 => { - let values = statistics.map(|s| match s { - Some(Statistics::String(v)) => get_aggregate(v, aggregate), - _ => None, - }); - Some(Arc::new(StringArray::from_iter(values))) - } - DataType::Dictionary(key, value) - if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => - { - let values = statistics.map(|s| match s { - Some(Statistics::String(v)) => get_aggregate(v, aggregate), - _ => None, - }); + let null = ScalarValue::try_from(data_type).ok()?; - // DictionaryArray can only be built from string references (`str`), not from owned strings (`String`), so - // we need to collect the strings first - let values: Vec<_> = values.collect(); - let values = values.iter().map(|s| s.as_deref()); - Some(Arc::new(DictionaryArray::::from_iter(values))) - } - _ => None, - } + ScalarValue::iter_to_array(statistics.map(|stats| { + stats + .and_then(|stats| get_aggregate(stats, aggregate).cloned()) + .unwrap_or_else(|| null.clone()) + })) + .ok() } /// Returns the aggregate statistic corresponding to `aggregate` from `stats` -fn get_aggregate(stats: StatValues, aggregate: Aggregate) -> Option { +fn get_aggregate(stats: &ColumnStatistics, aggregate: Aggregate) -> Option<&ScalarValue> { match aggregate { - Aggregate::Min => stats.min, - Aggregate::Max => stats.max, + Aggregate::Min => stats.min_value.as_ref(), + Aggregate::Max => stats.max_value.as_ref(), _ => None, } } @@ -486,7 +447,7 @@ mod test { TestChunk::new("chunk3").with_i64_field_column_with_stats("column1", None, None), ) as Arc; - let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1")) + let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column("column1")) as Arc; let predicate = Predicate::new().with_expr(col("column1").gt(lit(100i64))); @@ -691,12 +652,12 @@ mod test { let c5 = Arc::new( TestChunk::new("chunk5") .with_i64_field_column_with_stats("column1", Some(0), Some(10)) - .with_i64_field_column_no_stats("column2"), + .with_i64_field_column("column2"), ) as Arc; let c6 = Arc::new( TestChunk::new("chunk6") - .with_i64_field_column_no_stats("column1") + .with_i64_field_column("column1") .with_i64_field_column_with_stats("column2", Some(0), Some(4)), ) as Arc; diff --git a/iox_query/src/statistics.rs b/iox_query/src/statistics.rs index ec71e62cd6..15a82ac55a 100644 --- a/iox_query/src/statistics.rs +++ b/iox_query/src/statistics.rs @@ -3,106 +3,11 @@ use std::{cmp::Ordering, collections::HashMap}; use arrow::datatypes::Schema; -use data_types::{ColumnSummary, InfluxDbType, Statistics as IOxStatistics, TableSummary}; use datafusion::{ physical_plan::{ColumnStatistics, Statistics as DFStatistics}, scalar::ScalarValue, }; -/// Converts stats.min and an appropriate `ScalarValue` -pub(crate) fn min_to_scalar( - influx_type: &InfluxDbType, - stats: &IOxStatistics, -) -> Option { - match stats { - IOxStatistics::I64(v) => { - if InfluxDbType::Timestamp == *influx_type { - v.min - .map(|x| ScalarValue::TimestampNanosecond(Some(x), None)) - } else { - v.min.map(ScalarValue::from) - } - } - IOxStatistics::U64(v) => v.min.map(ScalarValue::from), - IOxStatistics::F64(v) => v.min.map(ScalarValue::from), - IOxStatistics::Bool(v) => v.min.map(ScalarValue::from), - IOxStatistics::String(v) => v.min.as_deref().map(ScalarValue::from), - } -} - -/// Converts stats.max to an appropriate `ScalarValue` -pub(crate) fn max_to_scalar( - influx_type: &InfluxDbType, - stats: &IOxStatistics, -) -> Option { - match stats { - IOxStatistics::I64(v) => { - if InfluxDbType::Timestamp == *influx_type { - v.max - .map(|x| ScalarValue::TimestampNanosecond(Some(x), None)) - } else { - v.max.map(ScalarValue::from) - } - } - IOxStatistics::U64(v) => v.max.map(ScalarValue::from), - IOxStatistics::F64(v) => v.max.map(ScalarValue::from), - IOxStatistics::Bool(v) => v.max.map(ScalarValue::from), - IOxStatistics::String(v) => v.max.as_deref().map(ScalarValue::from), - } -} - -/// Creates a DataFusion `Statistics` object from an IOx `TableSummary` -pub(crate) fn df_from_iox( - schema: &arrow::datatypes::Schema, - summary: &TableSummary, -) -> DFStatistics { - let column_by_name = summary - .columns - .iter() - .map(|c| (&c.name, c)) - .collect::>(); - - // compute statistics for all columns in the schema, in order - let column_statistics = schema - .fields() - .iter() - .map(|field| { - column_by_name - .get(field.name()) - .map(|c| df_from_iox_col(c)) - // use default statisics of none available for this column - .unwrap_or_default() - }) - .collect::>(); - - DFStatistics { - num_rows: Some(summary.total_count() as usize), - total_byte_size: Some(summary.size()), - column_statistics: Some(column_statistics), - is_exact: true, - } -} - -/// Convert IOx `ColumnSummary` to DataFusion's `ColumnStatistics` -fn df_from_iox_col(col: &ColumnSummary) -> ColumnStatistics { - let stats = &col.stats; - let col_data_type = &col.influxdb_type; - - let distinct_count = stats.distinct_count().map(|v| { - let v: u64 = v.into(); - v as usize - }); - - let null_count = stats.null_count().map(|x| x as usize); - - ColumnStatistics { - null_count, - max_value: max_to_scalar(col_data_type, stats), - min_value: min_to_scalar(col_data_type, stats), - distinct_count, - } -} - /// Aggregates DataFusion [statistics](DFStatistics). #[derive(Debug)] pub struct DFStatsAggregator<'a> { @@ -325,161 +230,6 @@ impl TriStateScalar { mod test { use super::*; use arrow::datatypes::{DataType, Field}; - use data_types::{InfluxDbType, StatValues}; - use schema::{builder::SchemaBuilder, InfluxFieldType}; - use std::num::NonZeroU64; - - macro_rules! assert_nice_eq { - ($actual:ident, $expected:ident) => { - assert_eq!( - $actual, $expected, - "\n\nactual:\n\n{:#?}\n\nexpected:\n\n{:#?}", - $actual, $expected, - ); - }; - } - - #[test] - fn convert() { - let c1_stats = StatValues { - min: Some(11), - max: Some(11), - total_count: 3, - null_count: Some(1), - distinct_count: None, - }; - let c1_summary = ColumnSummary { - name: "c1".to_string(), - influxdb_type: InfluxDbType::Tag, - stats: IOxStatistics::I64(c1_stats), - }; - - let c2_stats = StatValues { - min: Some(-5), - max: Some(6), - total_count: 3, - null_count: Some(0), - distinct_count: Some(NonZeroU64::new(33).unwrap()), - }; - let c2_summary = ColumnSummary { - name: "c2".to_string(), - influxdb_type: InfluxDbType::Field, - stats: IOxStatistics::I64(c2_stats), - }; - - let table_summary = TableSummary { - columns: vec![c1_summary, c2_summary], - }; - - let df_c1_stats = ColumnStatistics { - null_count: Some(1), - max_value: Some(ScalarValue::Int64(Some(11))), - min_value: Some(ScalarValue::Int64(Some(11))), - distinct_count: None, - }; - - let df_c2_stats = ColumnStatistics { - null_count: Some(0), - max_value: Some(ScalarValue::Int64(Some(6))), - min_value: Some(ScalarValue::Int64(Some(-5))), - distinct_count: Some(33), - }; - - // test 1: columns in c1, c2 order - - let schema = SchemaBuilder::new() - .tag("c1") - .influx_field("c2", InfluxFieldType::Integer) - .build() - .unwrap(); - - let expected = DFStatistics { - num_rows: Some(3), - total_byte_size: Some(412), - column_statistics: Some(vec![df_c1_stats.clone(), df_c2_stats.clone()]), - is_exact: true, - }; - - let actual = df_from_iox(schema.inner(), &table_summary); - assert_nice_eq!(actual, expected); - - // test 1: columns in c1, c2 order in shcema (in c1, c2 in table_summary) - - let schema = SchemaBuilder::new() - .tag("c2") - .influx_field("c1", InfluxFieldType::Integer) - .build() - .unwrap(); - - let expected = DFStatistics { - // in c2, c1 order - column_statistics: Some(vec![df_c2_stats.clone(), df_c1_stats.clone()]), - // other fields the same - ..expected - }; - - let actual = df_from_iox(schema.inner(), &table_summary); - assert_nice_eq!(actual, expected); - - // test 3: columns in c1 tag with stats, c3 (tag no stats) and c2column without statistics - let schema = SchemaBuilder::new() - .tag("c2") - .influx_field("c1", InfluxFieldType::Integer) - .tag("c3") - .build() - .unwrap(); - - let expected = DFStatistics { - // in c2, c1, c3 w/ default stats - column_statistics: Some(vec![df_c2_stats, df_c1_stats, ColumnStatistics::default()]), - // other fields the same - ..expected - }; - - let actual = df_from_iox(schema.inner(), &table_summary); - assert_nice_eq!(actual, expected); - } - - #[test] - fn null_ts() { - let c_stats = StatValues { - min: None, - max: None, - total_count: 3, - null_count: None, - distinct_count: None, - }; - let c_summary = ColumnSummary { - name: "time".to_string(), - influxdb_type: InfluxDbType::Timestamp, - stats: IOxStatistics::I64(c_stats), - }; - - let table_summary = TableSummary { - columns: vec![c_summary], - }; - - let df_c_stats = ColumnStatistics { - null_count: None, - // Note min/max values should be `None` (not known) - // NOT `Some(None)` (known to be null) - max_value: None, - min_value: None, - distinct_count: None, - }; - - let schema = SchemaBuilder::new().timestamp().build().unwrap(); - - let expected = DFStatistics { - num_rows: Some(3), - total_byte_size: Some(220), - column_statistics: Some(vec![df_c_stats]), - is_exact: true, - }; - - let actual = df_from_iox(schema.inner(), &table_summary); - assert_nice_eq!(actual, expected); - } #[test] fn test_df_stats_agg_no_cols_no_updates() { diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index e760c68a2f..5410625622 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -21,17 +21,18 @@ use arrow::{ record_batch::RecordBatch, }; use async_trait::async_trait; -use data_types::{ - ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues, - Statistics, TableSummary, -}; -use datafusion::datasource::{object_store::ObjectStoreUrl, TableProvider, TableType}; +use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId}; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use datafusion::logical_expr::Expr; use datafusion::physical_plan::ExecutionPlan; use datafusion::{catalog::catalog::CatalogProvider, physical_plan::displayable}; use datafusion::{catalog::schema::SchemaProvider, logical_expr::LogicalPlan}; +use datafusion::{ + datasource::{object_store::ObjectStoreUrl, TableProvider, TableType}, + physical_plan::{ColumnStatistics, Statistics as DataFusionStatistics}, + scalar::ScalarValue, +}; use hashbrown::HashSet; use itertools::Itertools; use object_store::{path::Path, ObjectMeta}; @@ -40,10 +41,16 @@ use parking_lot::Mutex; use parquet_file::storage::ParquetExecInput; use predicate::rpc_predicate::QueryNamespaceMeta; use schema::{ - builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Projection, - Schema, TIME_COLUMN_NAME, + builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, Projection, Schema, + TIME_COLUMN_NAME, +}; +use std::{ + any::Any, + collections::{BTreeMap, HashMap}, + fmt, + num::NonZeroU64, + sync::Arc, }; -use std::{any::Any, collections::BTreeMap, fmt, num::NonZeroU64, sync::Arc}; use trace::ctx::SpanContext; #[derive(Debug)] @@ -313,8 +320,9 @@ pub struct TestChunk { /// Schema of the table schema: Schema, - /// Return value for summary() - table_summary: TableSummary, + /// Values for stats() + column_stats: HashMap, + num_rows: Option, id: ChunkId, @@ -353,24 +361,7 @@ macro_rules! impl_with_column { .unwrap() .build() .unwrap(); - self.add_schema_to_table(new_column_schema, true, None) - } - }; -} - -/// Implements a method for adding a column without any stats -macro_rules! impl_with_column_no_stats { - ($NAME:ident, $DATA_TYPE:ident) => { - pub fn $NAME(self, column_name: impl Into) -> Self { - let column_name = column_name.into(); - - let new_column_schema = SchemaBuilder::new() - .field(&column_name, DataType::$DATA_TYPE) - .unwrap() - .build() - .unwrap(); - - self.add_schema_to_table(new_column_schema, false, None) + self.add_schema_to_table(new_column_schema, None) } }; } @@ -392,13 +383,14 @@ macro_rules! impl_with_column_with_stats { .build() .unwrap(); - let stats = Statistics::$STAT_TYPE(StatValues { - min, - max, - ..Default::default() - }); + let stats = ColumnStatistics { + null_count: None, + max_value: max.map(|s| ScalarValue::from(s)), + min_value: min.map(|s| ScalarValue::from(s)), + distinct_count: None, + }; - self.add_schema_to_table(new_column_schema, true, Some(stats)) + self.add_schema_to_table(new_column_schema, Some(stats)) } }; } @@ -409,7 +401,8 @@ impl TestChunk { Self { table_name, schema: SchemaBuilder::new().build().unwrap(), - table_summary: TableSummary::default(), + column_stats: Default::default(), + num_rows: None, id: ChunkId::new_test(0), may_contain_pk_duplicates: Default::default(), table_data: QueryChunkData::RecordBatches(vec![]), @@ -523,7 +516,7 @@ impl TestChunk { // merge it in to any existing schema let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap(); - self.add_schema_to_table(new_column_schema, true, None) + self.add_schema_to_table(new_column_schema, None) } /// Register a tag column with stats with the test chunk @@ -556,9 +549,16 @@ impl TestChunk { ) } + fn update_count(&mut self, count: usize) { + match self.num_rows { + Some(existing) => assert_eq!(existing, count), + None => self.num_rows = Some(count), + } + } + /// Register a tag column with stats with the test chunk pub fn with_tag_column_with_nulls_and_full_stats( - self, + mut self, column_name: impl Into, min: Option<&str>, max: Option<&str>, @@ -573,15 +573,15 @@ impl TestChunk { let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap(); // Construct stats - let stats = Statistics::String(StatValues { - min: min.map(ToString::to_string), - max: max.map(ToString::to_string), - total_count: count, - null_count: Some(null_count), - distinct_count, - }); + let stats = ColumnStatistics { + null_count: Some(null_count as usize), + max_value: max.map(ScalarValue::from), + min_value: min.map(ScalarValue::from), + distinct_count: distinct_count.map(|c| c.get() as usize), + }; - self.add_schema_to_table(new_column_schema, true, Some(stats)) + self.update_count(count as usize); + self.add_schema_to_table(new_column_schema, Some(stats)) } /// Register a timestamp column with the test chunk with default stats @@ -590,7 +590,7 @@ impl TestChunk { // merge it in to any existing schema let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap(); - self.add_schema_to_table(new_column_schema, true, None) + self.add_schema_to_table(new_column_schema, None) } /// Register a timestamp column with the test chunk @@ -600,7 +600,7 @@ impl TestChunk { /// Register a timestamp column with full stats with the test chunk pub fn with_time_column_with_full_stats( - self, + mut self, min: Option, max: Option, count: u64, @@ -612,66 +612,39 @@ impl TestChunk { let null_count = 0; // Construct stats - let stats = Statistics::I64(StatValues { - min, - max, - total_count: count, - null_count: Some(null_count), - distinct_count, - }); + let stats = ColumnStatistics { + null_count: Some(null_count as usize), + max_value: max.map(|v| ScalarValue::TimestampNanosecond(Some(v), None)), + min_value: min.map(|v| ScalarValue::TimestampNanosecond(Some(v), None)), + distinct_count: distinct_count.map(|c| c.get() as usize), + }; - self.add_schema_to_table(new_column_schema, true, Some(stats)) + self.update_count(count as usize); + self.add_schema_to_table(new_column_schema, Some(stats)) } pub fn with_timestamp_min_max(mut self, min: i64, max: i64) -> Self { - match self - .table_summary - .columns - .iter_mut() - .find(|c| c.name == TIME_COLUMN_NAME) - { - Some(col) => { - let stats = &mut col.stats; - *stats = Statistics::I64(StatValues { - min: Some(min), - max: Some(max), - total_count: stats.total_count(), - null_count: stats.null_count(), - distinct_count: stats.distinct_count(), - }); - } - None => { - let total_count = self.table_summary.total_count(); - self.table_summary.columns.push(ColumnSummary { - name: TIME_COLUMN_NAME.to_string(), - influxdb_type: InfluxDbType::Timestamp, - stats: Statistics::I64(StatValues { - min: Some(min), - max: Some(max), - total_count, - null_count: None, - distinct_count: None, - }), - }); - } - } + let stats = self + .column_stats + .get_mut(TIME_COLUMN_NAME) + .expect("stats in sync w/ columns"); + + stats.min_value = Some(ScalarValue::TimestampNanosecond(Some(min), None)); + stats.max_value = Some(ScalarValue::TimestampNanosecond(Some(max), None)); + self } impl_with_column!(with_i64_field_column, Int64); - impl_with_column_no_stats!(with_i64_field_column_no_stats, Int64); impl_with_column_with_stats!(with_i64_field_column_with_stats, Int64, i64, I64); impl_with_column!(with_u64_column, UInt64); - impl_with_column_no_stats!(with_u64_field_column_no_stats, UInt64); impl_with_column_with_stats!(with_u64_field_column_with_stats, UInt64, u64, U64); impl_with_column!(with_f64_field_column, Float64); - impl_with_column_no_stats!(with_f64_field_column_no_stats, Float64); impl_with_column_with_stats!(with_f64_field_column_with_stats, Float64, f64, F64); impl_with_column!(with_bool_field_column, Boolean); - impl_with_column_no_stats!(with_bool_field_column_no_stats, Boolean); impl_with_column_with_stats!(with_bool_field_column_with_stats, Boolean, bool, Bool); /// Register a string field column with the test chunk @@ -692,13 +665,14 @@ impl TestChunk { .unwrap(); // Construct stats - let stats = Statistics::String(StatValues { - min: min.map(ToString::to_string), - max: max.map(ToString::to_string), - ..Default::default() - }); + let stats = ColumnStatistics { + null_count: None, + max_value: max.map(ScalarValue::from), + min_value: min.map(ScalarValue::from), + distinct_count: None, + }; - self.add_schema_to_table(new_column_schema, true, Some(stats)) + self.add_schema_to_table(new_column_schema, Some(stats)) } /// Adds the specified schema and optionally a column summary containing optional stats. @@ -707,46 +681,18 @@ impl TestChunk { fn add_schema_to_table( mut self, new_column_schema: Schema, - add_column_summary: bool, - input_stats: Option, + input_stats: Option, ) -> Self { let mut merger = SchemaMerger::new(); merger = merger.merge(&new_column_schema).unwrap(); merger = merger.merge(&self.schema).expect("merging was successful"); self.schema = merger.build(); - for i in 0..new_column_schema.len() { - let (col_type, new_field) = new_column_schema.field(i); - if add_column_summary { - let influxdb_type = match col_type { - InfluxColumnType::Tag => InfluxDbType::Tag, - InfluxColumnType::Field(_) => InfluxDbType::Field, - InfluxColumnType::Timestamp => InfluxDbType::Timestamp, - }; - - let stats = input_stats.clone(); - let stats = stats.unwrap_or_else(|| match new_field.data_type() { - DataType::Boolean => Statistics::Bool(StatValues::default()), - DataType::Int64 => Statistics::I64(StatValues::default()), - DataType::UInt64 => Statistics::U64(StatValues::default()), - DataType::Utf8 => Statistics::String(StatValues::default()), - DataType::Dictionary(_, value_type) => { - assert!(matches!(**value_type, DataType::Utf8)); - Statistics::String(StatValues::default()) - } - DataType::Float64 => Statistics::F64(StatValues::default()), - DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()), - _ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()), - }); - - let column_summary = ColumnSummary { - name: new_field.name().clone(), - influxdb_type, - stats, - }; - - self.table_summary.columns.push(column_summary); - } + for f in new_column_schema.inner().fields() { + self.column_stats.insert( + f.name().clone(), + input_stats.as_ref().cloned().unwrap_or_default(), + ); } self @@ -1197,10 +1143,22 @@ impl QueryChunk for TestChunk { } impl QueryChunkMeta for TestChunk { - fn summary(&self) -> Arc { + fn stats(&self) -> Arc { self.check_error().unwrap(); - Arc::new(self.table_summary.clone()) + Arc::new(DataFusionStatistics { + num_rows: self.num_rows, + total_byte_size: None, + column_statistics: Some( + self.schema + .inner() + .fields() + .iter() + .map(|f| self.column_stats.get(f.name()).cloned().unwrap_or_default()) + .collect(), + ), + is_exact: true, + }) } fn schema(&self) -> &Schema { diff --git a/iox_query/src/util.rs b/iox_query/src/util.rs index 521dbb26ca..42002a74fe 100644 --- a/iox_query/src/util.rs +++ b/iox_query/src/util.rs @@ -13,9 +13,7 @@ use arrow::{ record_batch::RecordBatch, }; -use data_types::{ - ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary, TimestampMinMax, -}; +use data_types::TimestampMinMax; use datafusion::{ self, common::{tree_node::TreeNodeRewriter, DFSchema, ToDFSchema}, @@ -27,7 +25,7 @@ use datafusion::{ physical_expr::create_physical_expr, physical_plan::{ expressions::{col as physical_col, PhysicalSortExpr}, - ExecutionPlan, PhysicalExpr, + ColumnStatistics, ExecutionPlan, PhysicalExpr, Statistics, }, prelude::{binary_expr, lit, Column, Expr}, scalar::ScalarValue, @@ -36,7 +34,7 @@ use datafusion::{ use itertools::Itertools; use observability_deps::tracing::trace; use predicate::rpc_predicate::{FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME}; -use schema::{sort::SortKey, InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME}; +use schema::{sort::SortKey, InfluxColumnType, Schema, TIME_COLUMN_NAME}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; #[derive(Debug, Snafu)] @@ -290,78 +288,34 @@ pub fn compute_timenanosecond_min_max_for_one_record_batch( /// /// This contains: /// - correct column types -/// - [total count](StatValues::total_count) for all columns -/// - [min](StatValues::min)/[max](StatValues::max) for the timestamp column +/// - [total count](Statistics::num_rows) +/// - [min](ColumnStatistics::min_value)/[max](ColumnStatistics::max_value) for the timestamp column pub fn create_basic_summary( row_count: u64, schema: &Schema, ts_min_max: TimestampMinMax, -) -> TableSummary { +) -> Statistics { let mut columns = Vec::with_capacity(schema.len()); - for i in 0..schema.len() { - let (t, field) = schema.field(i); - - let influxdb_type = match t { - InfluxColumnType::Tag => InfluxDbType::Tag, - InfluxColumnType::Field(_) => InfluxDbType::Field, - InfluxColumnType::Timestamp => InfluxDbType::Timestamp, - }; + for (t, _field) in schema.iter() { let stats = match t { - InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => { - Statistics::String(StatValues { - min: None, - max: None, - total_count: row_count, - null_count: None, - distinct_count: None, - }) - } - InfluxColumnType::Timestamp => Statistics::I64(StatValues { - min: Some(ts_min_max.min), - max: Some(ts_min_max.max), - total_count: row_count, - null_count: None, + InfluxColumnType::Timestamp => ColumnStatistics { + null_count: Some(0), + max_value: Some(ScalarValue::TimestampNanosecond(Some(ts_min_max.max), None)), + min_value: Some(ScalarValue::TimestampNanosecond(Some(ts_min_max.min), None)), distinct_count: None, - }), - InfluxColumnType::Field(InfluxFieldType::Integer) => Statistics::I64(StatValues { - min: None, - max: None, - total_count: row_count, - null_count: None, - distinct_count: None, - }), - InfluxColumnType::Field(InfluxFieldType::UInteger) => Statistics::U64(StatValues { - min: None, - max: None, - total_count: row_count, - null_count: None, - distinct_count: None, - }), - InfluxColumnType::Field(InfluxFieldType::Float) => Statistics::F64(StatValues { - min: None, - max: None, - total_count: row_count, - null_count: None, - distinct_count: None, - }), - InfluxColumnType::Field(InfluxFieldType::Boolean) => Statistics::Bool(StatValues { - min: None, - max: None, - total_count: row_count, - null_count: None, - distinct_count: None, - }), + }, + _ => ColumnStatistics::default(), }; - - columns.push(ColumnSummary { - name: field.name().clone(), - influxdb_type, - stats, - }) + columns.push(stats) } - TableSummary { columns } + Statistics { + num_rows: Some(row_count as usize), + total_byte_size: None, + column_statistics: Some(columns), + is_exact: true, + } } #[cfg(test)] @@ -372,7 +326,7 @@ mod tests { prelude::{col, lit}, scalar::ScalarValue, }; - use schema::builder::SchemaBuilder; + use schema::{builder::SchemaBuilder, InfluxFieldType}; use super::*; @@ -494,7 +448,12 @@ mod tests { let ts_min_max = TimestampMinMax { min: 10, max: 20 }; let actual = create_basic_summary(row_count, &schema, ts_min_max); - let expected = TableSummary { columns: vec![] }; + let expected = Statistics { + num_rows: Some(row_count as usize), + total_byte_size: None, + column_statistics: Some(vec![]), + is_exact: true, + }; assert_eq!(actual, expected); } @@ -505,86 +464,24 @@ mod tests { let ts_min_max = TimestampMinMax { min: 10, max: 20 }; let actual = create_basic_summary(row_count, &schema, ts_min_max); - let expected = TableSummary { - columns: vec![ - ColumnSummary { - name: String::from("tag"), - influxdb_type: InfluxDbType::Tag, - stats: Statistics::String(StatValues { - min: None, - max: None, - total_count: 0, - null_count: None, - distinct_count: None, - }), + let expected = Statistics { + num_rows: Some(0), + total_byte_size: None, + column_statistics: Some(vec![ + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics { + null_count: Some(0), + min_value: Some(ScalarValue::TimestampNanosecond(Some(10), None)), + max_value: Some(ScalarValue::TimestampNanosecond(Some(20), None)), + distinct_count: None, }, - ColumnSummary { - name: String::from("field_bool"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::Bool(StatValues { - min: None, - max: None, - total_count: 0, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_float"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::F64(StatValues { - min: None, - max: None, - total_count: 0, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_integer"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::I64(StatValues { - min: None, - max: None, - total_count: 0, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_string"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::String(StatValues { - min: None, - max: None, - total_count: 0, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_uinteger"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::U64(StatValues { - min: None, - max: None, - total_count: 0, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("time"), - influxdb_type: InfluxDbType::Timestamp, - stats: Statistics::I64(StatValues { - min: Some(10), - max: Some(20), - total_count: 0, - null_count: None, - distinct_count: None, - }), - }, - ], + ]), + is_exact: true, }; assert_eq!(actual, expected); } @@ -596,86 +493,24 @@ mod tests { let ts_min_max = TimestampMinMax { min: 42, max: 42 }; let actual = create_basic_summary(row_count, &schema, ts_min_max); - let expected = TableSummary { - columns: vec![ - ColumnSummary { - name: String::from("tag"), - influxdb_type: InfluxDbType::Tag, - stats: Statistics::String(StatValues { - min: None, - max: None, - total_count: 3, - null_count: None, - distinct_count: None, - }), + let expected = Statistics { + num_rows: Some(3), + total_byte_size: None, + column_statistics: Some(vec![ + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics::default(), + ColumnStatistics { + null_count: Some(0), + min_value: Some(ScalarValue::TimestampNanosecond(Some(42), None)), + max_value: Some(ScalarValue::TimestampNanosecond(Some(42), None)), + distinct_count: None, }, - ColumnSummary { - name: String::from("field_bool"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::Bool(StatValues { - min: None, - max: None, - total_count: 3, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_float"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::F64(StatValues { - min: None, - max: None, - total_count: 3, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_integer"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::I64(StatValues { - min: None, - max: None, - total_count: 3, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_string"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::String(StatValues { - min: None, - max: None, - total_count: 3, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("field_uinteger"), - influxdb_type: InfluxDbType::Field, - stats: Statistics::U64(StatValues { - min: None, - max: None, - total_count: 3, - null_count: None, - distinct_count: None, - }), - }, - ColumnSummary { - name: String::from("time"), - influxdb_type: InfluxDbType::Timestamp, - stats: Statistics::I64(StatValues { - min: Some(42), - max: Some(42), - total_count: 3, - null_count: None, - distinct_count: None, - }), - }, - ], + ]), + is_exact: true, }; assert_eq!(actual, expected); } diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 98be3a067d..710e3fd6c5 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -12,10 +12,8 @@ use arrow_flight::decode::DecodedPayload; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig, BackoffError}; use client_util::connection; -use data_types::{ - ChunkId, ChunkOrder, DeletePredicate, NamespaceId, PartitionId, TableSummary, TimestampMinMax, -}; -use datafusion::error::DataFusionError; +use data_types::{ChunkId, ChunkOrder, DeletePredicate, NamespaceId, PartitionId, TimestampMinMax}; +use datafusion::{error::DataFusionError, physical_plan::Statistics}; use futures::{stream::FuturesUnordered, TryStreamExt}; use ingester_query_grpc::{ encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata, @@ -824,7 +822,7 @@ impl IngesterPartition { let ts_min_max = compute_timenanosecond_min_max(&batches).expect("Should have time range"); let row_count = batches.iter().map(|batch| batch.num_rows()).sum::() as u64; - let summary = Arc::new(create_basic_summary( + let stats = Arc::new(create_basic_summary( row_count, &expected_schema, ts_min_max, @@ -837,7 +835,7 @@ impl IngesterPartition { partition_sort_key: self.partition_sort_key.clone(), batches, ts_min_max, - summary, + stats, delete_predicates: vec![], }; @@ -900,7 +898,7 @@ pub struct IngesterChunk { ts_min_max: TimestampMinMax, /// Summary Statistics - summary: Arc, + stats: Arc, delete_predicates: Vec>, } @@ -928,8 +926,8 @@ impl IngesterChunk { } impl QueryChunkMeta for IngesterChunk { - fn summary(&self) -> Arc { - Arc::clone(&self.summary) + fn stats(&self) -> Arc { + Arc::clone(&self.stats) } fn schema(&self) -> &Schema { diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index b275f717a8..fb237f2e6f 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -86,7 +86,7 @@ impl IngesterConnection for MockIngesterConnection { let total_row_count = batches.iter().map(|b| b.num_rows()).sum::() as u64; - let summary = + let stats = create_basic_summary(total_row_count, &new_schema, ic.ts_min_max); super::IngesterChunk { @@ -96,7 +96,7 @@ impl IngesterConnection for MockIngesterConnection { partition_sort_key: ic.partition_sort_key, batches, ts_min_max: ic.ts_min_max, - summary: Arc::new(summary), + stats: Arc::new(stats), delete_predicates: vec![], } }) diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index 29a9f1990e..9112176f38 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -75,14 +75,17 @@ impl ChunkAdapter { files .iter() .map(|p| { - Arc::new(create_basic_summary( + let stats = Arc::new(create_basic_summary( p.row_count as u64, &cached_table.schema, TimestampMinMax { min: p.min_time.get(), max: p.max_time.get(), }, - )) + )); + let schema = Arc::clone(cached_table.schema.inner()); + + (stats, schema) }) .collect() }; diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index 9d76fdd47b..8900f3da10 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -1,8 +1,7 @@ //! Querier Chunks -use data_types::{ - ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId, TableSummary, -}; +use data_types::{ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId}; +use datafusion::physical_plan::Statistics; use iox_query::util::create_basic_summary; use parquet_file::chunk::ParquetChunk; use schema::sort::SortKey; @@ -65,14 +64,14 @@ pub struct QuerierParquetChunk { /// Chunk of the Parquet file parquet_chunk: Arc, - /// Table summary - table_summary: Arc, + /// Stats + stats: Arc, } impl QuerierParquetChunk { /// Create new parquet-backed chunk (object store data). pub fn new(parquet_chunk: Arc, meta: Arc) -> Self { - let table_summary = Arc::new(create_basic_summary( + let stats = Arc::new(create_basic_summary( parquet_chunk.rows() as u64, parquet_chunk.schema(), parquet_chunk.timestamp_min_max(), @@ -82,7 +81,7 @@ impl QuerierParquetChunk { meta, delete_predicates: Vec::new(), parquet_chunk, - table_summary, + stats, } } @@ -152,8 +151,8 @@ pub mod tests { // check sort key assert_sort_key(&chunk); - // back up table summary - let table_summary_1 = chunk.summary(); + // back up stats + let stats_1 = chunk.stats(); // check if chunk can be queried assert_content(&chunk, &test_data).await; @@ -161,9 +160,9 @@ pub mod tests { // check state again assert_eq!(chunk.chunk_type(), "parquet"); - // summary has NOT changed - let table_summary_2 = chunk.summary(); - assert_eq!(table_summary_1, table_summary_2); + // stats have NOT changed + let stats_2 = chunk.stats(); + assert_eq!(stats_1, stats_2); // retrieving the chunk again should not require any catalog requests test_data.chunk(namespace_schema).await; diff --git a/querier/src/parquet/query_access.rs b/querier/src/parquet/query_access.rs index 1e12d2b82b..07485aa679 100644 --- a/querier/src/parquet/query_access.rs +++ b/querier/src/parquet/query_access.rs @@ -1,6 +1,6 @@ use crate::parquet::QuerierParquetChunk; -use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; -use datafusion::error::DataFusionError; +use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId}; +use datafusion::{error::DataFusionError, physical_plan::Statistics}; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, QueryChunk, QueryChunkData, QueryChunkMeta, @@ -10,8 +10,8 @@ use schema::{sort::SortKey, Projection, Schema}; use std::{any::Any, sync::Arc}; impl QueryChunkMeta for QuerierParquetChunk { - fn summary(&self) -> Arc { - Arc::clone(&self.table_summary) + fn stats(&self) -> Arc { + Arc::clone(&self.stats) } fn schema(&self) -> &Schema {