refactor: migrate `iox_query` to use DataFusion statistics (#7908)

This is the major part of #7470. Additional clean ups (e.g. to remove
the actual types from `data_types`) will follow.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-06-02 11:18:59 +02:00 committed by GitHub
parent 0962ba32e7
commit fa5011197c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 433 additions and 825 deletions

View File

@ -1,8 +1,8 @@
//! QueryableParquetChunk for building query plan
use std::{any::Any, sync::Arc};
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary};
use datafusion::error::DataFusionError;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId};
use datafusion::{error::DataFusionError, physical_plan::Statistics};
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::create_basic_summary,
@ -26,7 +26,7 @@ pub struct QueryableParquetChunk {
partition_id: PartitionId,
sort_key: Option<SortKey>,
order: ChunkOrder,
summary: Arc<TableSummary>,
stats: Arc<Statistics>,
}
impl QueryableParquetChunk {
@ -37,7 +37,7 @@ impl QueryableParquetChunk {
sort_key: Option<SortKey>,
order: ChunkOrder,
) -> Self {
let summary = Arc::new(create_basic_summary(
let stats = Arc::new(create_basic_summary(
data.rows() as u64,
data.schema(),
data.timestamp_min_max(),
@ -48,7 +48,7 @@ impl QueryableParquetChunk {
partition_id,
sort_key,
order,
summary,
stats,
}
}
@ -68,8 +68,8 @@ impl QueryableParquetChunk {
}
impl QueryChunkMeta for QueryableParquetChunk {
fn summary(&self) -> Arc<TableSummary> {
Arc::clone(&self.summary)
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
}
fn schema(&self) -> &Schema {

View File

@ -5,8 +5,8 @@ use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary};
use datafusion::error::DataFusionError;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId};
use datafusion::{error::DataFusionError, physical_plan::Statistics};
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::{compute_timenanosecond_min_max, create_basic_summary},
@ -41,8 +41,8 @@ pub struct QueryAdaptor {
/// An interned schema for all [`RecordBatch`] in data.
schema: Schema,
/// An interned table summary.
summary: OnceCell<Arc<TableSummary>>,
/// An interned stats.
stats: OnceCell<Arc<Statistics>>,
}
impl QueryAdaptor {
@ -67,7 +67,7 @@ impl QueryAdaptor {
// use Uuid for this. Draw this UUID during chunk generation so that it is stable during the whole query process.
id: ChunkId::new(),
schema,
summary: OnceCell::default(),
stats: OnceCell::default(),
}
}
@ -110,8 +110,8 @@ impl QueryAdaptor {
}
impl QueryChunkMeta for QueryAdaptor {
fn summary(&self) -> Arc<TableSummary> {
Arc::clone(self.summary.get_or_init(|| {
fn stats(&self) -> Arc<Statistics> {
Arc::clone(self.stats.get_or_init(|| {
let ts_min_max = compute_timenanosecond_min_max(self.data.iter().map(|b| b.as_ref()))
.expect("Should have time range");

View File

@ -20,8 +20,8 @@ use arrow::{
record_batch::RecordBatch,
};
use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary};
use datafusion::{error::DataFusionError, prelude::SessionContext};
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId};
use datafusion::{error::DataFusionError, physical_plan::Statistics, prelude::SessionContext};
use exec::{stringset::StringSet, IOxSessionContext};
use hashbrown::HashMap;
use observability_deps::tracing::{debug, trace};
@ -30,7 +30,7 @@ use parquet_file::storage::ParquetExecInput;
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
use schema::{
sort::{SortKey, SortKeyBuilder},
Projection, Schema, TIME_COLUMN_NAME,
InfluxColumnType, Projection, Schema, TIME_COLUMN_NAME,
};
use std::{any::Any, collections::BTreeSet, fmt::Debug, iter::FromIterator, sync::Arc};
@ -62,8 +62,8 @@ pub fn chunk_order_field() -> Arc<Field> {
/// Trait for an object (designed to be a Chunk) which can provide
/// metadata
pub trait QueryChunkMeta {
/// Return a summary of the data
fn summary(&self) -> Arc<TableSummary>;
/// Return a statistics of the data
fn stats(&self) -> Arc<Statistics>;
/// return a reference to the summary of the data held in this chunk
fn schema(&self) -> &Schema;
@ -283,8 +283,8 @@ impl<P> QueryChunkMeta for Arc<P>
where
P: QueryChunkMeta,
{
fn summary(&self) -> Arc<TableSummary> {
self.as_ref().summary()
fn stats(&self) -> Arc<Statistics> {
self.as_ref().stats()
}
fn schema(&self) -> &Schema {
@ -308,8 +308,8 @@ where
/// Implement `ChunkMeta` for `Arc<dyn QueryChunk>`
impl QueryChunkMeta for Arc<dyn QueryChunk> {
fn summary(&self) -> Arc<TableSummary> {
self.as_ref().summary()
fn stats(&self) -> Arc<Statistics> {
self.as_ref().stats()
}
fn schema(&self) -> &Schema {
@ -339,11 +339,10 @@ pub fn chunks_have_distinct_counts<'a>(
// do not need to compute potential duplicates. We will treat
// as all of them have duplicates
chunks.into_iter().all(|chunk| {
chunk
.summary()
.columns
.iter()
.all(|col| col.stats.distinct_count().is_some())
let Some(col_stats) = &chunk
.stats()
.column_statistics else {return false};
col_stats.iter().all(|col| col.distinct_count.is_some())
})
}
@ -356,8 +355,7 @@ pub fn compute_sort_key_for_chunks<'a>(
// sorted lexicographically but time column always last
SortKey::from_columns(schema.primary_key())
} else {
let summaries = chunks.into_iter().map(|x| x.summary());
compute_sort_key(summaries)
compute_sort_key(chunks.into_iter())
}
}
@ -368,19 +366,18 @@ pub fn compute_sort_key_for_chunks<'a>(
///
/// The cardinality is estimated by the sum of unique counts over all summaries. This may overestimate cardinality since
/// it does not account for shared/repeated values.
fn compute_sort_key(summaries: impl Iterator<Item = Arc<TableSummary>>) -> SortKey {
fn compute_sort_key<'a>(chunks: impl Iterator<Item = &'a Arc<dyn QueryChunk>>) -> SortKey {
let mut cardinalities: HashMap<String, u64> = Default::default();
for summary in summaries {
for column in &summary.columns {
if column.influxdb_type != InfluxDbType::Tag {
for chunk in chunks {
let stats = chunk.stats();
let Some(col_stats) = stats.column_statistics.as_ref() else {continue};
for ((influxdb_type, field), stats) in chunk.schema().iter().zip(col_stats) {
if influxdb_type != InfluxColumnType::Tag {
continue;
}
let mut cnt = 0;
if let Some(count) = column.stats.distinct_count() {
cnt = count.get();
}
*cardinalities.entry_ref(column.name.as_str()).or_default() += cnt;
let cnt = stats.distinct_count.unwrap_or_default() as u64;
*cardinalities.entry_ref(field.name().as_str()).or_default() += cnt;
}
}

View File

@ -5,7 +5,9 @@
use crate::QueryChunk;
use data_types::TimestampMinMax;
use datafusion::scalar::ScalarValue;
use observability_deps::tracing::debug;
use schema::TIME_COLUMN_NAME;
use std::sync::Arc;
/// Groups query chunks into disjoint sets of overlapped time range.
@ -90,7 +92,27 @@ pub fn group_potential_duplicates(
}
fn timestamp_min_max(chunk: &dyn QueryChunk) -> Option<TimestampMinMax> {
chunk.summary().time_range()
chunk
.stats()
.column_statistics
.as_ref()
.and_then(|stats| {
chunk
.schema()
.find_index_of(TIME_COLUMN_NAME)
.map(|idx| &stats[idx])
})
.and_then(|stats| {
if let (
Some(ScalarValue::TimestampNanosecond(Some(min), None)),
Some(ScalarValue::TimestampNanosecond(Some(max), None)),
) = (&stats.min_value, &stats.max_value)
{
Some(TimestampMinMax::new(*min, *max))
} else {
None
}
})
}
#[cfg(test)]
@ -118,8 +140,16 @@ mod test {
#[test]
fn one_time_column_overlap_same_min_max() {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 1));
let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(1, 1));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 1),
);
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_time_column()
.with_timestamp_min_max(1, 1),
);
let groups = group_potential_duplicates(vec![c1, c2]);
@ -129,10 +159,26 @@ mod test {
#[test]
fn one_time_column_overlap_bad_case() {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10));
let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(15, 30));
let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(7, 20));
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10),
);
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_time_column()
.with_timestamp_min_max(15, 30),
);
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_time_column()
.with_timestamp_min_max(7, 20),
);
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_time_column()
.with_timestamp_min_max(25, 35),
);
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]);
@ -142,10 +188,26 @@ mod test {
#[test]
fn one_time_column_overlap_contiguous() {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10));
let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20));
let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(15, 30));
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10),
);
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_time_column()
.with_timestamp_min_max(7, 20),
);
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_time_column()
.with_timestamp_min_max(15, 30),
);
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_time_column()
.with_timestamp_min_max(25, 35),
);
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]);
@ -155,10 +217,26 @@ mod test {
#[test]
fn one_time_column_overlap_2_groups() {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10));
let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20));
let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(21, 30));
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10),
);
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_time_column()
.with_timestamp_min_max(7, 20),
);
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_time_column()
.with_timestamp_min_max(21, 30),
);
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_time_column()
.with_timestamp_min_max(25, 35),
);
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]);
@ -168,10 +246,26 @@ mod test {
#[test]
fn one_time_column_overlap_3_groups() {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10));
let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20));
let c3 = Arc::new(TestChunk::new("chunk3").with_timestamp_min_max(21, 24));
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10),
);
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_time_column()
.with_timestamp_min_max(7, 20),
);
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_time_column()
.with_timestamp_min_max(21, 24),
);
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_time_column()
.with_timestamp_min_max(25, 35),
);
let groups = group_potential_duplicates(vec![c1, c4, c3, c2]);
@ -185,7 +279,11 @@ mod test {
#[test]
fn one_time_column_overlap_1_chunk() {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10),
);
let groups = group_potential_duplicates(vec![c1]);
@ -202,18 +300,28 @@ mod test {
#[test]
fn multi_columns_overlap_bad_case() {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10));
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10),
);
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_time_column()
.with_timestamp_min_max(15, 30)
.with_i64_field_column("field1"),
);
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_time_column()
.with_timestamp_min_max(7, 20)
.with_tag_column("tag1"),
);
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_time_column()
.with_timestamp_min_max(25, 35),
);
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]);
@ -225,6 +333,7 @@ mod test {
fn multi_columns_overlap_1_chunk() {
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10)
.with_tag_column("tag1"),
);
@ -239,16 +348,26 @@ mod test {
fn multi_columns_overlap_3_groups() {
let c1 = Arc::new(
TestChunk::new("chunk1")
.with_time_column()
.with_timestamp_min_max(1, 10)
.with_tag_column("tag1"),
);
let c2 = Arc::new(TestChunk::new("chunk2").with_timestamp_min_max(7, 20));
let c2 = Arc::new(
TestChunk::new("chunk2")
.with_time_column()
.with_timestamp_min_max(7, 20),
);
let c3 = Arc::new(
TestChunk::new("chunk3")
.with_time_column()
.with_timestamp_min_max(21, 24)
.with_tag_column("tag2"),
);
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let c4 = Arc::new(
TestChunk::new("chunk4")
.with_time_column()
.with_timestamp_min_max(25, 35),
);
let groups = group_potential_duplicates(vec![c1, c4, c3, c2]);

View File

@ -198,10 +198,13 @@ pub fn chunks_to_physical_nodes(
// ensure that chunks are actually ordered by chunk order
chunks.sort_by_key(|(_meta, c)| c.order());
let num_rows = chunks
.iter()
.map(|(_meta, c)| c.summary().total_count() as usize)
.sum::<usize>();
let num_rows = chunks.iter().map(|(_meta, c)| c.stats().num_rows).fold(
Some(0usize),
|accu, x| match (accu, x) {
(Some(accu), Some(x)) => Some(accu + x),
_ => None,
},
);
let chunk_order_min = chunks
.iter()
.map(|(_meta, c)| c.order().get())
@ -265,7 +268,7 @@ pub fn chunks_to_physical_nodes(
};
let statistics = Statistics {
num_rows: Some(num_rows),
num_rows,
total_byte_size: None,
column_statistics: Some(
schema

View File

@ -1,10 +1,12 @@
//! Implementation of a DataFusion PhysicalPlan node across partition chunks
use crate::{QueryChunk, CHUNK_ORDER_COLUMN_NAME};
use crate::{statistics::DFStatsAggregator, QueryChunk, CHUNK_ORDER_COLUMN_NAME};
use super::adapter::SchemaAdapterStream;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use data_types::{ColumnSummary, InfluxDbType, TableSummary};
use arrow::{
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion::{
error::DataFusionError,
execution::context::TaskContext,
@ -12,17 +14,16 @@ use datafusion::{
expressions::{Column, PhysicalSortExpr},
memory::MemoryStream,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics,
},
scalar::ScalarValue,
};
use observability_deps::tracing::trace;
use schema::sort::SortKey;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fmt,
num::NonZeroU64,
sync::Arc,
};
@ -59,7 +60,9 @@ impl RecordBatchesExec {
schema: SchemaRef,
output_sort_key_memo: Option<SortKey>,
) -> Self {
let has_chunk_order_col = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).is_ok();
let chunk_order_field = schema.field_with_name(CHUNK_ORDER_COLUMN_NAME).ok();
let chunk_order_only_schema =
chunk_order_field.map(|field| Schema::new(vec![field.clone()]));
let chunks: Vec<_> = chunks
.into_iter()
@ -76,53 +79,35 @@ impl RecordBatchesExec {
let statistics = chunks
.iter()
.fold(
None,
|mut combined_summary: Option<TableSummary>, (chunk, _batches)| {
let summary = chunk.summary();
DFStatsAggregator::new(&schema),
|mut agg, (chunk, _batches)| {
agg.update(&chunk.stats(), chunk.schema().as_arrow().as_ref());
let summary = if has_chunk_order_col {
// add chunk order column
if let Some(schema) = chunk_order_only_schema.as_ref() {
let order = chunk.order().get();
let summary = TableSummary {
columns: summary
.columns
.iter()
.cloned()
.chain(std::iter::once(ColumnSummary {
name: CHUNK_ORDER_COLUMN_NAME.to_owned(),
influxdb_type: InfluxDbType::Field,
stats: data_types::Statistics::I64(data_types::StatValues {
min: Some(order),
max: Some(order),
total_count: summary.total_count(),
null_count: Some(0),
distinct_count: Some(NonZeroU64::new(1).unwrap()),
}),
}))
.collect(),
};
Cow::Owned(summary)
} else {
Cow::Borrowed(summary.as_ref())
};
match combined_summary.as_mut() {
None => {
combined_summary = Some(summary.into_owned());
}
Some(combined_summary) => {
combined_summary.update_from(&summary);
}
let order = ScalarValue::from(order);
agg.update(
&Statistics {
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: Some(vec![ColumnStatistics {
null_count: Some(0),
max_value: Some(order.clone()),
min_value: Some(order),
distinct_count: Some(1),
}]),
is_exact: true,
},
schema,
);
}
combined_summary
agg
},
)
.map(|combined_summary| crate::statistics::df_from_iox(&schema, &combined_summary))
.unwrap_or_default();
.build();
let output_ordering = if has_chunk_order_col {
let output_ordering = if chunk_order_field.is_some() {
Some(vec![
// every chunk gets its own partition, so we can claim that the output is ordered
PhysicalSortExpr {

View File

@ -2,15 +2,15 @@
use crate::{QueryChunk, QueryChunkMeta};
use arrow::{
array::{
ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt64Array,
},
datatypes::{DataType, Int32Type, TimeUnit},
array::{ArrayRef, UInt64Array},
datatypes::{DataType, SchemaRef},
};
use data_types::{StatValues, Statistics, TableSummary};
use datafusion::{
physical_expr::execution_props::ExecutionProps, physical_optimizer::pruning::PruningStatistics,
physical_expr::execution_props::ExecutionProps,
physical_optimizer::pruning::PruningStatistics,
physical_plan::{ColumnStatistics, Statistics},
prelude::Column,
scalar::ScalarValue,
};
use datafusion_util::create_pruning_predicate;
use observability_deps::tracing::{debug, trace, warn};
@ -79,7 +79,10 @@ pub fn prune_chunks(
) -> Result<Vec<bool>, NotPrunedReason> {
let num_chunks = chunks.len();
debug!(num_chunks, %predicate, "Pruning chunks");
let summaries: Vec<_> = chunks.iter().map(|c| c.summary()).collect();
let summaries: Vec<_> = chunks
.iter()
.map(|c| (c.stats(), c.schema().as_arrow()))
.collect();
prune_summaries(table_schema, &summaries, predicate)
}
@ -87,7 +90,7 @@ pub fn prune_chunks(
/// predicate can be proven to evaluate to `false` for every single row.
pub fn prune_summaries(
table_schema: &Schema,
summaries: &[Arc<TableSummary>],
summaries: &[(Arc<Statistics>, SchemaRef)],
predicate: &Predicate,
) -> Result<Vec<bool>, NotPrunedReason> {
let filter_expr = match predicate.filter_expr() {
@ -129,7 +132,7 @@ pub fn prune_summaries(
/// interface required for pruning
struct ChunkPruningStatistics<'a> {
table_schema: &'a Schema,
summaries: &'a [Arc<TableSummary>],
summaries: &'a [(Arc<Statistics>, SchemaRef)],
}
impl<'a> ChunkPruningStatistics<'a> {
@ -144,10 +147,12 @@ impl<'a> ChunkPruningStatistics<'a> {
fn column_summaries<'b: 'a, 'c: 'a>(
&'c self,
column: &'b Column,
) -> impl Iterator<Item = Option<Statistics>> + 'a {
self.summaries
.iter()
.map(|summary| Some(summary.column(&column.name)?.stats.clone()))
) -> impl Iterator<Item = Option<&'a ColumnStatistics>> + 'a {
self.summaries.iter().map(|(stats, schema)| {
let stats = stats.column_statistics.as_ref()?;
let idx = schema.index_of(&column.name).ok()?;
Some(&stats[idx])
})
}
}
@ -171,7 +176,7 @@ impl<'a> PruningStatistics for ChunkPruningStatistics<'a> {
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let null_counts = self
.column_summaries(column)
.map(|x| x.and_then(|s| s.null_count()));
.map(|x| x.and_then(|s| s.null_count.map(|x| x as u64)));
Some(Arc::new(UInt64Array::from_iter(null_counts)))
}
@ -179,70 +184,26 @@ impl<'a> PruningStatistics for ChunkPruningStatistics<'a> {
/// Collects an [`ArrayRef`] containing the aggregate statistic corresponding to
/// `aggregate` for each of the provided [`Statistics`]
fn collect_pruning_stats(
fn collect_pruning_stats<'a>(
data_type: &DataType,
statistics: impl Iterator<Item = Option<Statistics>>,
statistics: impl Iterator<Item = Option<&'a ColumnStatistics>>,
aggregate: Aggregate,
) -> Option<ArrayRef> {
match data_type {
DataType::Int64 | DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let values = statistics.map(|s| match s {
Some(Statistics::I64(v)) => get_aggregate(v, aggregate),
_ => None,
});
Some(Arc::new(Int64Array::from_iter(values)))
}
DataType::UInt64 => {
let values = statistics.map(|s| match s {
Some(Statistics::U64(v)) => get_aggregate(v, aggregate),
_ => None,
});
Some(Arc::new(UInt64Array::from_iter(values)))
}
DataType::Float64 => {
let values = statistics.map(|s| match s {
Some(Statistics::F64(v)) => get_aggregate(v, aggregate),
_ => None,
});
Some(Arc::new(Float64Array::from_iter(values)))
}
DataType::Boolean => {
let values = statistics.map(|s| match s {
Some(Statistics::Bool(v)) => get_aggregate(v, aggregate),
_ => None,
});
Some(Arc::new(BooleanArray::from_iter(values)))
}
DataType::Utf8 => {
let values = statistics.map(|s| match s {
Some(Statistics::String(v)) => get_aggregate(v, aggregate),
_ => None,
});
Some(Arc::new(StringArray::from_iter(values)))
}
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
let values = statistics.map(|s| match s {
Some(Statistics::String(v)) => get_aggregate(v, aggregate),
_ => None,
});
let null = ScalarValue::try_from(data_type).ok()?;
// DictionaryArray can only be built from string references (`str`), not from owned strings (`String`), so
// we need to collect the strings first
let values: Vec<_> = values.collect();
let values = values.iter().map(|s| s.as_deref());
Some(Arc::new(DictionaryArray::<Int32Type>::from_iter(values)))
}
_ => None,
}
ScalarValue::iter_to_array(statistics.map(|stats| {
stats
.and_then(|stats| get_aggregate(stats, aggregate).cloned())
.unwrap_or_else(|| null.clone())
}))
.ok()
}
/// Returns the aggregate statistic corresponding to `aggregate` from `stats`
fn get_aggregate<T>(stats: StatValues<T>, aggregate: Aggregate) -> Option<T> {
fn get_aggregate(stats: &ColumnStatistics, aggregate: Aggregate) -> Option<&ScalarValue> {
match aggregate {
Aggregate::Min => stats.min,
Aggregate::Max => stats.max,
Aggregate::Min => stats.min_value.as_ref(),
Aggregate::Max => stats.max_value.as_ref(),
_ => None,
}
}
@ -486,7 +447,7 @@ mod test {
TestChunk::new("chunk3").with_i64_field_column_with_stats("column1", None, None),
) as Arc<dyn QueryChunk>;
let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1"))
let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column("column1"))
as Arc<dyn QueryChunk>;
let predicate = Predicate::new().with_expr(col("column1").gt(lit(100i64)));
@ -691,12 +652,12 @@ mod test {
let c5 = Arc::new(
TestChunk::new("chunk5")
.with_i64_field_column_with_stats("column1", Some(0), Some(10))
.with_i64_field_column_no_stats("column2"),
.with_i64_field_column("column2"),
) as Arc<dyn QueryChunk>;
let c6 = Arc::new(
TestChunk::new("chunk6")
.with_i64_field_column_no_stats("column1")
.with_i64_field_column("column1")
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
) as Arc<dyn QueryChunk>;

View File

@ -3,106 +3,11 @@
use std::{cmp::Ordering, collections::HashMap};
use arrow::datatypes::Schema;
use data_types::{ColumnSummary, InfluxDbType, Statistics as IOxStatistics, TableSummary};
use datafusion::{
physical_plan::{ColumnStatistics, Statistics as DFStatistics},
scalar::ScalarValue,
};
/// Converts stats.min and an appropriate `ScalarValue`
pub(crate) fn min_to_scalar(
influx_type: &InfluxDbType,
stats: &IOxStatistics,
) -> Option<ScalarValue> {
match stats {
IOxStatistics::I64(v) => {
if InfluxDbType::Timestamp == *influx_type {
v.min
.map(|x| ScalarValue::TimestampNanosecond(Some(x), None))
} else {
v.min.map(ScalarValue::from)
}
}
IOxStatistics::U64(v) => v.min.map(ScalarValue::from),
IOxStatistics::F64(v) => v.min.map(ScalarValue::from),
IOxStatistics::Bool(v) => v.min.map(ScalarValue::from),
IOxStatistics::String(v) => v.min.as_deref().map(ScalarValue::from),
}
}
/// Converts stats.max to an appropriate `ScalarValue`
pub(crate) fn max_to_scalar(
influx_type: &InfluxDbType,
stats: &IOxStatistics,
) -> Option<ScalarValue> {
match stats {
IOxStatistics::I64(v) => {
if InfluxDbType::Timestamp == *influx_type {
v.max
.map(|x| ScalarValue::TimestampNanosecond(Some(x), None))
} else {
v.max.map(ScalarValue::from)
}
}
IOxStatistics::U64(v) => v.max.map(ScalarValue::from),
IOxStatistics::F64(v) => v.max.map(ScalarValue::from),
IOxStatistics::Bool(v) => v.max.map(ScalarValue::from),
IOxStatistics::String(v) => v.max.as_deref().map(ScalarValue::from),
}
}
/// Creates a DataFusion `Statistics` object from an IOx `TableSummary`
pub(crate) fn df_from_iox(
schema: &arrow::datatypes::Schema,
summary: &TableSummary,
) -> DFStatistics {
let column_by_name = summary
.columns
.iter()
.map(|c| (&c.name, c))
.collect::<hashbrown::HashMap<_, _>>();
// compute statistics for all columns in the schema, in order
let column_statistics = schema
.fields()
.iter()
.map(|field| {
column_by_name
.get(field.name())
.map(|c| df_from_iox_col(c))
// use default statisics of none available for this column
.unwrap_or_default()
})
.collect::<Vec<_>>();
DFStatistics {
num_rows: Some(summary.total_count() as usize),
total_byte_size: Some(summary.size()),
column_statistics: Some(column_statistics),
is_exact: true,
}
}
/// Convert IOx `ColumnSummary` to DataFusion's `ColumnStatistics`
fn df_from_iox_col(col: &ColumnSummary) -> ColumnStatistics {
let stats = &col.stats;
let col_data_type = &col.influxdb_type;
let distinct_count = stats.distinct_count().map(|v| {
let v: u64 = v.into();
v as usize
});
let null_count = stats.null_count().map(|x| x as usize);
ColumnStatistics {
null_count,
max_value: max_to_scalar(col_data_type, stats),
min_value: min_to_scalar(col_data_type, stats),
distinct_count,
}
}
/// Aggregates DataFusion [statistics](DFStatistics).
#[derive(Debug)]
pub struct DFStatsAggregator<'a> {
@ -325,161 +230,6 @@ impl TriStateScalar {
mod test {
use super::*;
use arrow::datatypes::{DataType, Field};
use data_types::{InfluxDbType, StatValues};
use schema::{builder::SchemaBuilder, InfluxFieldType};
use std::num::NonZeroU64;
macro_rules! assert_nice_eq {
($actual:ident, $expected:ident) => {
assert_eq!(
$actual, $expected,
"\n\nactual:\n\n{:#?}\n\nexpected:\n\n{:#?}",
$actual, $expected,
);
};
}
#[test]
fn convert() {
let c1_stats = StatValues {
min: Some(11),
max: Some(11),
total_count: 3,
null_count: Some(1),
distinct_count: None,
};
let c1_summary = ColumnSummary {
name: "c1".to_string(),
influxdb_type: InfluxDbType::Tag,
stats: IOxStatistics::I64(c1_stats),
};
let c2_stats = StatValues {
min: Some(-5),
max: Some(6),
total_count: 3,
null_count: Some(0),
distinct_count: Some(NonZeroU64::new(33).unwrap()),
};
let c2_summary = ColumnSummary {
name: "c2".to_string(),
influxdb_type: InfluxDbType::Field,
stats: IOxStatistics::I64(c2_stats),
};
let table_summary = TableSummary {
columns: vec![c1_summary, c2_summary],
};
let df_c1_stats = ColumnStatistics {
null_count: Some(1),
max_value: Some(ScalarValue::Int64(Some(11))),
min_value: Some(ScalarValue::Int64(Some(11))),
distinct_count: None,
};
let df_c2_stats = ColumnStatistics {
null_count: Some(0),
max_value: Some(ScalarValue::Int64(Some(6))),
min_value: Some(ScalarValue::Int64(Some(-5))),
distinct_count: Some(33),
};
// test 1: columns in c1, c2 order
let schema = SchemaBuilder::new()
.tag("c1")
.influx_field("c2", InfluxFieldType::Integer)
.build()
.unwrap();
let expected = DFStatistics {
num_rows: Some(3),
total_byte_size: Some(412),
column_statistics: Some(vec![df_c1_stats.clone(), df_c2_stats.clone()]),
is_exact: true,
};
let actual = df_from_iox(schema.inner(), &table_summary);
assert_nice_eq!(actual, expected);
// test 1: columns in c1, c2 order in shcema (in c1, c2 in table_summary)
let schema = SchemaBuilder::new()
.tag("c2")
.influx_field("c1", InfluxFieldType::Integer)
.build()
.unwrap();
let expected = DFStatistics {
// in c2, c1 order
column_statistics: Some(vec![df_c2_stats.clone(), df_c1_stats.clone()]),
// other fields the same
..expected
};
let actual = df_from_iox(schema.inner(), &table_summary);
assert_nice_eq!(actual, expected);
// test 3: columns in c1 tag with stats, c3 (tag no stats) and c2column without statistics
let schema = SchemaBuilder::new()
.tag("c2")
.influx_field("c1", InfluxFieldType::Integer)
.tag("c3")
.build()
.unwrap();
let expected = DFStatistics {
// in c2, c1, c3 w/ default stats
column_statistics: Some(vec![df_c2_stats, df_c1_stats, ColumnStatistics::default()]),
// other fields the same
..expected
};
let actual = df_from_iox(schema.inner(), &table_summary);
assert_nice_eq!(actual, expected);
}
#[test]
fn null_ts() {
let c_stats = StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
};
let c_summary = ColumnSummary {
name: "time".to_string(),
influxdb_type: InfluxDbType::Timestamp,
stats: IOxStatistics::I64(c_stats),
};
let table_summary = TableSummary {
columns: vec![c_summary],
};
let df_c_stats = ColumnStatistics {
null_count: None,
// Note min/max values should be `None` (not known)
// NOT `Some(None)` (known to be null)
max_value: None,
min_value: None,
distinct_count: None,
};
let schema = SchemaBuilder::new().timestamp().build().unwrap();
let expected = DFStatistics {
num_rows: Some(3),
total_byte_size: Some(220),
column_statistics: Some(vec![df_c_stats]),
is_exact: true,
};
let actual = df_from_iox(schema.inner(), &table_summary);
assert_nice_eq!(actual, expected);
}
#[test]
fn test_df_stats_agg_no_cols_no_updates() {

View File

@ -21,17 +21,18 @@ use arrow::{
record_batch::RecordBatch,
};
use async_trait::async_trait;
use data_types::{
ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues,
Statistics, TableSummary,
};
use datafusion::datasource::{object_store::ObjectStoreUrl, TableProvider, TableType};
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::{catalog::catalog::CatalogProvider, physical_plan::displayable};
use datafusion::{catalog::schema::SchemaProvider, logical_expr::LogicalPlan};
use datafusion::{
datasource::{object_store::ObjectStoreUrl, TableProvider, TableType},
physical_plan::{ColumnStatistics, Statistics as DataFusionStatistics},
scalar::ScalarValue,
};
use hashbrown::HashSet;
use itertools::Itertools;
use object_store::{path::Path, ObjectMeta};
@ -40,10 +41,16 @@ use parking_lot::Mutex;
use parquet_file::storage::ParquetExecInput;
use predicate::rpc_predicate::QueryNamespaceMeta;
use schema::{
builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Projection,
Schema, TIME_COLUMN_NAME,
builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, Projection, Schema,
TIME_COLUMN_NAME,
};
use std::{
any::Any,
collections::{BTreeMap, HashMap},
fmt,
num::NonZeroU64,
sync::Arc,
};
use std::{any::Any, collections::BTreeMap, fmt, num::NonZeroU64, sync::Arc};
use trace::ctx::SpanContext;
#[derive(Debug)]
@ -313,8 +320,9 @@ pub struct TestChunk {
/// Schema of the table
schema: Schema,
/// Return value for summary()
table_summary: TableSummary,
/// Values for stats()
column_stats: HashMap<String, ColumnStatistics>,
num_rows: Option<usize>,
id: ChunkId,
@ -353,24 +361,7 @@ macro_rules! impl_with_column {
.unwrap()
.build()
.unwrap();
self.add_schema_to_table(new_column_schema, true, None)
}
};
}
/// Implements a method for adding a column without any stats
macro_rules! impl_with_column_no_stats {
($NAME:ident, $DATA_TYPE:ident) => {
pub fn $NAME(self, column_name: impl Into<String>) -> Self {
let column_name = column_name.into();
let new_column_schema = SchemaBuilder::new()
.field(&column_name, DataType::$DATA_TYPE)
.unwrap()
.build()
.unwrap();
self.add_schema_to_table(new_column_schema, false, None)
self.add_schema_to_table(new_column_schema, None)
}
};
}
@ -392,13 +383,14 @@ macro_rules! impl_with_column_with_stats {
.build()
.unwrap();
let stats = Statistics::$STAT_TYPE(StatValues {
min,
max,
..Default::default()
});
let stats = ColumnStatistics {
null_count: None,
max_value: max.map(|s| ScalarValue::from(s)),
min_value: min.map(|s| ScalarValue::from(s)),
distinct_count: None,
};
self.add_schema_to_table(new_column_schema, true, Some(stats))
self.add_schema_to_table(new_column_schema, Some(stats))
}
};
}
@ -409,7 +401,8 @@ impl TestChunk {
Self {
table_name,
schema: SchemaBuilder::new().build().unwrap(),
table_summary: TableSummary::default(),
column_stats: Default::default(),
num_rows: None,
id: ChunkId::new_test(0),
may_contain_pk_duplicates: Default::default(),
table_data: QueryChunkData::RecordBatches(vec![]),
@ -523,7 +516,7 @@ impl TestChunk {
// merge it in to any existing schema
let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap();
self.add_schema_to_table(new_column_schema, true, None)
self.add_schema_to_table(new_column_schema, None)
}
/// Register a tag column with stats with the test chunk
@ -556,9 +549,16 @@ impl TestChunk {
)
}
fn update_count(&mut self, count: usize) {
match self.num_rows {
Some(existing) => assert_eq!(existing, count),
None => self.num_rows = Some(count),
}
}
/// Register a tag column with stats with the test chunk
pub fn with_tag_column_with_nulls_and_full_stats(
self,
mut self,
column_name: impl Into<String>,
min: Option<&str>,
max: Option<&str>,
@ -573,15 +573,15 @@ impl TestChunk {
let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap();
// Construct stats
let stats = Statistics::String(StatValues {
min: min.map(ToString::to_string),
max: max.map(ToString::to_string),
total_count: count,
null_count: Some(null_count),
distinct_count,
});
let stats = ColumnStatistics {
null_count: Some(null_count as usize),
max_value: max.map(ScalarValue::from),
min_value: min.map(ScalarValue::from),
distinct_count: distinct_count.map(|c| c.get() as usize),
};
self.add_schema_to_table(new_column_schema, true, Some(stats))
self.update_count(count as usize);
self.add_schema_to_table(new_column_schema, Some(stats))
}
/// Register a timestamp column with the test chunk with default stats
@ -590,7 +590,7 @@ impl TestChunk {
// merge it in to any existing schema
let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap();
self.add_schema_to_table(new_column_schema, true, None)
self.add_schema_to_table(new_column_schema, None)
}
/// Register a timestamp column with the test chunk
@ -600,7 +600,7 @@ impl TestChunk {
/// Register a timestamp column with full stats with the test chunk
pub fn with_time_column_with_full_stats(
self,
mut self,
min: Option<i64>,
max: Option<i64>,
count: u64,
@ -612,66 +612,39 @@ impl TestChunk {
let null_count = 0;
// Construct stats
let stats = Statistics::I64(StatValues {
min,
max,
total_count: count,
null_count: Some(null_count),
distinct_count,
});
let stats = ColumnStatistics {
null_count: Some(null_count as usize),
max_value: max.map(|v| ScalarValue::TimestampNanosecond(Some(v), None)),
min_value: min.map(|v| ScalarValue::TimestampNanosecond(Some(v), None)),
distinct_count: distinct_count.map(|c| c.get() as usize),
};
self.add_schema_to_table(new_column_schema, true, Some(stats))
self.update_count(count as usize);
self.add_schema_to_table(new_column_schema, Some(stats))
}
pub fn with_timestamp_min_max(mut self, min: i64, max: i64) -> Self {
match self
.table_summary
.columns
.iter_mut()
.find(|c| c.name == TIME_COLUMN_NAME)
{
Some(col) => {
let stats = &mut col.stats;
*stats = Statistics::I64(StatValues {
min: Some(min),
max: Some(max),
total_count: stats.total_count(),
null_count: stats.null_count(),
distinct_count: stats.distinct_count(),
});
}
None => {
let total_count = self.table_summary.total_count();
self.table_summary.columns.push(ColumnSummary {
name: TIME_COLUMN_NAME.to_string(),
influxdb_type: InfluxDbType::Timestamp,
stats: Statistics::I64(StatValues {
min: Some(min),
max: Some(max),
total_count,
null_count: None,
distinct_count: None,
}),
});
}
}
let stats = self
.column_stats
.get_mut(TIME_COLUMN_NAME)
.expect("stats in sync w/ columns");
stats.min_value = Some(ScalarValue::TimestampNanosecond(Some(min), None));
stats.max_value = Some(ScalarValue::TimestampNanosecond(Some(max), None));
self
}
impl_with_column!(with_i64_field_column, Int64);
impl_with_column_no_stats!(with_i64_field_column_no_stats, Int64);
impl_with_column_with_stats!(with_i64_field_column_with_stats, Int64, i64, I64);
impl_with_column!(with_u64_column, UInt64);
impl_with_column_no_stats!(with_u64_field_column_no_stats, UInt64);
impl_with_column_with_stats!(with_u64_field_column_with_stats, UInt64, u64, U64);
impl_with_column!(with_f64_field_column, Float64);
impl_with_column_no_stats!(with_f64_field_column_no_stats, Float64);
impl_with_column_with_stats!(with_f64_field_column_with_stats, Float64, f64, F64);
impl_with_column!(with_bool_field_column, Boolean);
impl_with_column_no_stats!(with_bool_field_column_no_stats, Boolean);
impl_with_column_with_stats!(with_bool_field_column_with_stats, Boolean, bool, Bool);
/// Register a string field column with the test chunk
@ -692,13 +665,14 @@ impl TestChunk {
.unwrap();
// Construct stats
let stats = Statistics::String(StatValues {
min: min.map(ToString::to_string),
max: max.map(ToString::to_string),
..Default::default()
});
let stats = ColumnStatistics {
null_count: None,
max_value: max.map(ScalarValue::from),
min_value: min.map(ScalarValue::from),
distinct_count: None,
};
self.add_schema_to_table(new_column_schema, true, Some(stats))
self.add_schema_to_table(new_column_schema, Some(stats))
}
/// Adds the specified schema and optionally a column summary containing optional stats.
@ -707,46 +681,18 @@ impl TestChunk {
fn add_schema_to_table(
mut self,
new_column_schema: Schema,
add_column_summary: bool,
input_stats: Option<Statistics>,
input_stats: Option<ColumnStatistics>,
) -> Self {
let mut merger = SchemaMerger::new();
merger = merger.merge(&new_column_schema).unwrap();
merger = merger.merge(&self.schema).expect("merging was successful");
self.schema = merger.build();
for i in 0..new_column_schema.len() {
let (col_type, new_field) = new_column_schema.field(i);
if add_column_summary {
let influxdb_type = match col_type {
InfluxColumnType::Tag => InfluxDbType::Tag,
InfluxColumnType::Field(_) => InfluxDbType::Field,
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
};
let stats = input_stats.clone();
let stats = stats.unwrap_or_else(|| match new_field.data_type() {
DataType::Boolean => Statistics::Bool(StatValues::default()),
DataType::Int64 => Statistics::I64(StatValues::default()),
DataType::UInt64 => Statistics::U64(StatValues::default()),
DataType::Utf8 => Statistics::String(StatValues::default()),
DataType::Dictionary(_, value_type) => {
assert!(matches!(**value_type, DataType::Utf8));
Statistics::String(StatValues::default())
}
DataType::Float64 => Statistics::F64(StatValues::default()),
DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()),
_ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()),
});
let column_summary = ColumnSummary {
name: new_field.name().clone(),
influxdb_type,
stats,
};
self.table_summary.columns.push(column_summary);
}
for f in new_column_schema.inner().fields() {
self.column_stats.insert(
f.name().clone(),
input_stats.as_ref().cloned().unwrap_or_default(),
);
}
self
@ -1197,10 +1143,22 @@ impl QueryChunk for TestChunk {
}
impl QueryChunkMeta for TestChunk {
fn summary(&self) -> Arc<TableSummary> {
fn stats(&self) -> Arc<DataFusionStatistics> {
self.check_error().unwrap();
Arc::new(self.table_summary.clone())
Arc::new(DataFusionStatistics {
num_rows: self.num_rows,
total_byte_size: None,
column_statistics: Some(
self.schema
.inner()
.fields()
.iter()
.map(|f| self.column_stats.get(f.name()).cloned().unwrap_or_default())
.collect(),
),
is_exact: true,
})
}
fn schema(&self) -> &Schema {

View File

@ -13,9 +13,7 @@ use arrow::{
record_batch::RecordBatch,
};
use data_types::{
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary, TimestampMinMax,
};
use data_types::TimestampMinMax;
use datafusion::{
self,
common::{tree_node::TreeNodeRewriter, DFSchema, ToDFSchema},
@ -27,7 +25,7 @@ use datafusion::{
physical_expr::create_physical_expr,
physical_plan::{
expressions::{col as physical_col, PhysicalSortExpr},
ExecutionPlan, PhysicalExpr,
ColumnStatistics, ExecutionPlan, PhysicalExpr, Statistics,
},
prelude::{binary_expr, lit, Column, Expr},
scalar::ScalarValue,
@ -36,7 +34,7 @@ use datafusion::{
use itertools::Itertools;
use observability_deps::tracing::trace;
use predicate::rpc_predicate::{FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME};
use schema::{sort::SortKey, InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME};
use schema::{sort::SortKey, InfluxColumnType, Schema, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
@ -290,78 +288,34 @@ pub fn compute_timenanosecond_min_max_for_one_record_batch(
///
/// This contains:
/// - correct column types
/// - [total count](StatValues::total_count) for all columns
/// - [min](StatValues::min)/[max](StatValues::max) for the timestamp column
/// - [total count](Statistics::num_rows)
/// - [min](ColumnStatistics::min_value)/[max](ColumnStatistics::max_value) for the timestamp column
pub fn create_basic_summary(
row_count: u64,
schema: &Schema,
ts_min_max: TimestampMinMax,
) -> TableSummary {
) -> Statistics {
let mut columns = Vec::with_capacity(schema.len());
for i in 0..schema.len() {
let (t, field) = schema.field(i);
let influxdb_type = match t {
InfluxColumnType::Tag => InfluxDbType::Tag,
InfluxColumnType::Field(_) => InfluxDbType::Field,
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
};
for (t, _field) in schema.iter() {
let stats = match t {
InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => {
Statistics::String(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
})
}
InfluxColumnType::Timestamp => Statistics::I64(StatValues {
min: Some(ts_min_max.min),
max: Some(ts_min_max.max),
total_count: row_count,
null_count: None,
InfluxColumnType::Timestamp => ColumnStatistics {
null_count: Some(0),
max_value: Some(ScalarValue::TimestampNanosecond(Some(ts_min_max.max), None)),
min_value: Some(ScalarValue::TimestampNanosecond(Some(ts_min_max.min), None)),
distinct_count: None,
}),
InfluxColumnType::Field(InfluxFieldType::Integer) => Statistics::I64(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
}),
InfluxColumnType::Field(InfluxFieldType::UInteger) => Statistics::U64(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
}),
InfluxColumnType::Field(InfluxFieldType::Float) => Statistics::F64(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
}),
InfluxColumnType::Field(InfluxFieldType::Boolean) => Statistics::Bool(StatValues {
min: None,
max: None,
total_count: row_count,
null_count: None,
distinct_count: None,
}),
},
_ => ColumnStatistics::default(),
};
columns.push(ColumnSummary {
name: field.name().clone(),
influxdb_type,
stats,
})
columns.push(stats)
}
TableSummary { columns }
Statistics {
num_rows: Some(row_count as usize),
total_byte_size: None,
column_statistics: Some(columns),
is_exact: true,
}
}
#[cfg(test)]
@ -372,7 +326,7 @@ mod tests {
prelude::{col, lit},
scalar::ScalarValue,
};
use schema::builder::SchemaBuilder;
use schema::{builder::SchemaBuilder, InfluxFieldType};
use super::*;
@ -494,7 +448,12 @@ mod tests {
let ts_min_max = TimestampMinMax { min: 10, max: 20 };
let actual = create_basic_summary(row_count, &schema, ts_min_max);
let expected = TableSummary { columns: vec![] };
let expected = Statistics {
num_rows: Some(row_count as usize),
total_byte_size: None,
column_statistics: Some(vec![]),
is_exact: true,
};
assert_eq!(actual, expected);
}
@ -505,86 +464,24 @@ mod tests {
let ts_min_max = TimestampMinMax { min: 10, max: 20 };
let actual = create_basic_summary(row_count, &schema, ts_min_max);
let expected = TableSummary {
columns: vec![
ColumnSummary {
name: String::from("tag"),
influxdb_type: InfluxDbType::Tag,
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
let expected = Statistics {
num_rows: Some(0),
total_byte_size: None,
column_statistics: Some(vec![
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics {
null_count: Some(0),
min_value: Some(ScalarValue::TimestampNanosecond(Some(10), None)),
max_value: Some(ScalarValue::TimestampNanosecond(Some(20), None)),
distinct_count: None,
},
ColumnSummary {
name: String::from("field_bool"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::Bool(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_float"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::F64(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_integer"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::I64(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_string"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_uinteger"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::U64(StatValues {
min: None,
max: None,
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("time"),
influxdb_type: InfluxDbType::Timestamp,
stats: Statistics::I64(StatValues {
min: Some(10),
max: Some(20),
total_count: 0,
null_count: None,
distinct_count: None,
}),
},
],
]),
is_exact: true,
};
assert_eq!(actual, expected);
}
@ -596,86 +493,24 @@ mod tests {
let ts_min_max = TimestampMinMax { min: 42, max: 42 };
let actual = create_basic_summary(row_count, &schema, ts_min_max);
let expected = TableSummary {
columns: vec![
ColumnSummary {
name: String::from("tag"),
influxdb_type: InfluxDbType::Tag,
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
let expected = Statistics {
num_rows: Some(3),
total_byte_size: None,
column_statistics: Some(vec![
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics::default(),
ColumnStatistics {
null_count: Some(0),
min_value: Some(ScalarValue::TimestampNanosecond(Some(42), None)),
max_value: Some(ScalarValue::TimestampNanosecond(Some(42), None)),
distinct_count: None,
},
ColumnSummary {
name: String::from("field_bool"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::Bool(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_float"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::F64(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_integer"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::I64(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_string"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::String(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("field_uinteger"),
influxdb_type: InfluxDbType::Field,
stats: Statistics::U64(StatValues {
min: None,
max: None,
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
ColumnSummary {
name: String::from("time"),
influxdb_type: InfluxDbType::Timestamp,
stats: Statistics::I64(StatValues {
min: Some(42),
max: Some(42),
total_count: 3,
null_count: None,
distinct_count: None,
}),
},
],
]),
is_exact: true,
};
assert_eq!(actual, expected);
}

View File

@ -12,10 +12,8 @@ use arrow_flight::decode::DecodedPayload;
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig, BackoffError};
use client_util::connection;
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, NamespaceId, PartitionId, TableSummary, TimestampMinMax,
};
use datafusion::error::DataFusionError;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, NamespaceId, PartitionId, TimestampMinMax};
use datafusion::{error::DataFusionError, physical_plan::Statistics};
use futures::{stream::FuturesUnordered, TryStreamExt};
use ingester_query_grpc::{
encode_proto_predicate_as_base64, influxdata::iox::ingester::v1::IngesterQueryResponseMetadata,
@ -824,7 +822,7 @@ impl IngesterPartition {
let ts_min_max = compute_timenanosecond_min_max(&batches).expect("Should have time range");
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>() as u64;
let summary = Arc::new(create_basic_summary(
let stats = Arc::new(create_basic_summary(
row_count,
&expected_schema,
ts_min_max,
@ -837,7 +835,7 @@ impl IngesterPartition {
partition_sort_key: self.partition_sort_key.clone(),
batches,
ts_min_max,
summary,
stats,
delete_predicates: vec![],
};
@ -900,7 +898,7 @@ pub struct IngesterChunk {
ts_min_max: TimestampMinMax,
/// Summary Statistics
summary: Arc<TableSummary>,
stats: Arc<Statistics>,
delete_predicates: Vec<Arc<DeletePredicate>>,
}
@ -928,8 +926,8 @@ impl IngesterChunk {
}
impl QueryChunkMeta for IngesterChunk {
fn summary(&self) -> Arc<TableSummary> {
Arc::clone(&self.summary)
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
}
fn schema(&self) -> &Schema {

View File

@ -86,7 +86,7 @@ impl IngesterConnection for MockIngesterConnection {
let total_row_count =
batches.iter().map(|b| b.num_rows()).sum::<usize>() as u64;
let summary =
let stats =
create_basic_summary(total_row_count, &new_schema, ic.ts_min_max);
super::IngesterChunk {
@ -96,7 +96,7 @@ impl IngesterConnection for MockIngesterConnection {
partition_sort_key: ic.partition_sort_key,
batches,
ts_min_max: ic.ts_min_max,
summary: Arc::new(summary),
stats: Arc::new(stats),
delete_predicates: vec![],
}
})

View File

@ -75,14 +75,17 @@ impl ChunkAdapter {
files
.iter()
.map(|p| {
Arc::new(create_basic_summary(
let stats = Arc::new(create_basic_summary(
p.row_count as u64,
&cached_table.schema,
TimestampMinMax {
min: p.min_time.get(),
max: p.max_time.get(),
},
))
));
let schema = Arc::clone(cached_table.schema.inner());
(stats, schema)
})
.collect()
};

View File

@ -1,8 +1,7 @@
//! Querier Chunks
use data_types::{
ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId, TableSummary,
};
use data_types::{ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId};
use datafusion::physical_plan::Statistics;
use iox_query::util::create_basic_summary;
use parquet_file::chunk::ParquetChunk;
use schema::sort::SortKey;
@ -65,14 +64,14 @@ pub struct QuerierParquetChunk {
/// Chunk of the Parquet file
parquet_chunk: Arc<ParquetChunk>,
/// Table summary
table_summary: Arc<TableSummary>,
/// Stats
stats: Arc<Statistics>,
}
impl QuerierParquetChunk {
/// Create new parquet-backed chunk (object store data).
pub fn new(parquet_chunk: Arc<ParquetChunk>, meta: Arc<QuerierParquetChunkMeta>) -> Self {
let table_summary = Arc::new(create_basic_summary(
let stats = Arc::new(create_basic_summary(
parquet_chunk.rows() as u64,
parquet_chunk.schema(),
parquet_chunk.timestamp_min_max(),
@ -82,7 +81,7 @@ impl QuerierParquetChunk {
meta,
delete_predicates: Vec::new(),
parquet_chunk,
table_summary,
stats,
}
}
@ -152,8 +151,8 @@ pub mod tests {
// check sort key
assert_sort_key(&chunk);
// back up table summary
let table_summary_1 = chunk.summary();
// back up stats
let stats_1 = chunk.stats();
// check if chunk can be queried
assert_content(&chunk, &test_data).await;
@ -161,9 +160,9 @@ pub mod tests {
// check state again
assert_eq!(chunk.chunk_type(), "parquet");
// summary has NOT changed
let table_summary_2 = chunk.summary();
assert_eq!(table_summary_1, table_summary_2);
// stats have NOT changed
let stats_2 = chunk.stats();
assert_eq!(stats_1, stats_2);
// retrieving the chunk again should not require any catalog requests
test_data.chunk(namespace_schema).await;

View File

@ -1,6 +1,6 @@
use crate::parquet::QuerierParquetChunk;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary};
use datafusion::error::DataFusionError;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId};
use datafusion::{error::DataFusionError, physical_plan::Statistics};
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkData, QueryChunkMeta,
@ -10,8 +10,8 @@ use schema::{sort::SortKey, Projection, Schema};
use std::{any::Any, sync::Arc};
impl QueryChunkMeta for QuerierParquetChunk {
fn summary(&self) -> Arc<TableSummary> {
Arc::clone(&self.table_summary)
fn stats(&self) -> Arc<Statistics> {
Arc::clone(&self.stats)
}
fn schema(&self) -> &Schema {