From aacdeaca52e3d993583eb67b2a52ee82b6c8fa2e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 7 Jul 2022 15:21:48 +0200 Subject: [PATCH] refactor: prep work for #5032 (#5060) * refactor: remove parquet chunk ID to `ChunkMeta` * refactor: return `Arc` from `QueryChunk::summary` This is similar to how we handle other chunk data like schemas. This allows a chunk to change/refine its "believe" over its own payload while it is passed around in the query stack. Helps w/ #5032. --- compactor/src/query.rs | 2 +- ingester/src/query.rs | 2 +- iox_query/src/lib.rs | 26 +++++++++------- iox_query/src/provider/overlap.rs | 2 +- iox_query/src/provider/physical.rs | 10 ++++-- iox_query/src/pruning.rs | 45 +++++++++++++++------------ iox_query/src/test.rs | 4 +-- querier/src/chunk/mod.rs | 43 +++++++++---------------- querier/src/chunk/query_access.rs | 8 ++--- querier/src/ingester/mod.rs | 12 ++++--- querier/src/table/state_reconciler.rs | 2 +- 11 files changed, 80 insertions(+), 76 deletions(-) diff --git a/compactor/src/query.rs b/compactor/src/query.rs index a1843804d0..eef37f3795 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -118,7 +118,7 @@ impl QueryableParquetChunk { } impl QueryChunkMeta for QueryableParquetChunk { - fn summary(&self) -> Option<&TableSummary> { + fn summary(&self) -> Option> { None } diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 50cee18c4a..8ff0b2498b 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -94,7 +94,7 @@ impl QueryableBatch { } impl QueryChunkMeta for QueryableBatch { - fn summary(&self) -> Option<&TableSummary> { + fn summary(&self) -> Option> { None } diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 7eb93ccd72..c83146dfcd 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -39,8 +39,8 @@ pub use query_functions::group_by::{Aggregate, WindowDuration}; /// Trait for an object (designed to be a Chunk) which can provide /// metadata pub trait QueryChunkMeta { - /// Return a reference to the summary of the data - fn summary(&self) -> Option<&TableSummary>; + /// Return a summary of the data + fn summary(&self) -> Option>; /// return a reference to the summary of the data held in this chunk fn schema(&self) -> Arc; @@ -200,7 +200,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { ) -> Result { Ok(self .summary() - .map(|summary| predicate.apply_to_table_summary(summary, self.schema().as_arrow())) + .map(|summary| predicate.apply_to_table_summary(&summary, self.schema().as_arrow())) .unwrap_or(PredicateMatch::Unknown)) } @@ -259,7 +259,7 @@ impl

QueryChunkMeta for Arc

where P: QueryChunkMeta, { - fn summary(&self) -> Option<&TableSummary> { + fn summary(&self) -> Option> { self.as_ref().summary() } @@ -292,7 +292,7 @@ where /// Implement ChunkMeta for Arc impl QueryChunkMeta for Arc { - fn summary(&self) -> Option<&TableSummary> { + fn summary(&self) -> Option> { self.as_ref().summary() } @@ -344,12 +344,15 @@ pub fn compute_sort_key_for_chunks(schema: &Schema, chunks: &[Arc(summaries: impl Iterator) -> SortKey { - let mut cardinalities: HashMap<&str, u64> = Default::default(); +/// good ordering for RLE compression. +/// +/// The cardinality is estimated by the sum of unique counts over all summaries. This may overestimate cardinality since +/// it does not account for shared/repeated values. +fn compute_sort_key(summaries: impl Iterator>) -> SortKey { + let mut cardinalities: HashMap = Default::default(); for summary in summaries { for column in &summary.columns { if column.influxdb_type != Some(InfluxDbType::Tag) { @@ -360,7 +363,7 @@ pub fn compute_sort_key<'a>(summaries: impl Iterator) - if let Some(count) = column.stats.distinct_count() { cnt = count.get(); } - *cardinalities.entry(column.name.as_str()).or_default() += cnt; + *cardinalities.entry_ref(column.name.as_str()).or_default() += cnt; } } @@ -368,7 +371,8 @@ pub fn compute_sort_key<'a>(summaries: impl Iterator) - let mut cardinalities: Vec<_> = cardinalities.into_iter().collect(); // Sort by (cardinality, column_name) to have deterministic order if same cardinality - cardinalities.sort_by_key(|x| (x.1, x.0)); + cardinalities + .sort_by(|(name_1, card_1), (name_2, card_2)| (card_1, name_1).cmp(&(card_2, name_2))); let mut builder = SortKeyBuilder::with_capacity(cardinalities.len() + 1); for (col, _) in cardinalities { diff --git a/iox_query/src/provider/overlap.rs b/iox_query/src/provider/overlap.rs index b8c627b8a4..c716f3e2c3 100644 --- a/iox_query/src/provider/overlap.rs +++ b/iox_query/src/provider/overlap.rs @@ -37,7 +37,7 @@ pub type Result = std::result::Result; // work on ParquetFileWithMetadata. Since group_potential_duplicates only needs 2 functions // partition_id and timestamp_min_max, other functions are left `umimplemneted` on purpose impl QueryChunkMeta for ParquetFile { - fn summary(&self) -> Option<&TableSummary> { + fn summary(&self) -> Option> { unimplemented!() } diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index 6991be619d..3e6a526e33 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -181,10 +181,16 @@ impl ExecutionPlan for IOxReadFilterNode { } combined_summary_option = match combined_summary_option { - None => Some(chunk.summary().expect("Chunk should have summary").clone()), + None => Some( + chunk + .summary() + .expect("Chunk should have summary") + .as_ref() + .clone(), + ), Some(mut combined_summary) => { combined_summary - .update_from(chunk.summary().expect("Chunk should have summary")); + .update_from(&chunk.summary().expect("Chunk should have summary")); Some(combined_summary) } } diff --git a/iox_query/src/pruning.rs b/iox_query/src/pruning.rs index add166ccea..6da4c069e7 100644 --- a/iox_query/src/pruning.rs +++ b/iox_query/src/pruning.rs @@ -120,10 +120,10 @@ impl<'a> ChunkPruningStatistics<'a> { fn column_summaries<'b: 'a>( &self, column: &'b Column, - ) -> impl Iterator> + 'a { + ) -> impl Iterator> + 'a { self.chunks .iter() - .map(|chunk| Some(&chunk.summary()?.column(&column.name)?.stats)) + .map(|chunk| Some(chunk.summary()?.column(&column.name)?.stats.clone())) } } @@ -155,54 +155,59 @@ impl<'a> PruningStatistics for ChunkPruningStatistics<'a> { /// Collects an [`ArrayRef`] containing the aggregate statistic corresponding to /// `aggregate` for each of the provided [`Statistics`] -fn collect_pruning_stats<'a>( +fn collect_pruning_stats( data_type: &DataType, - statistics: impl Iterator>, + statistics: impl Iterator>, aggregate: Aggregate, ) -> Option { match data_type { DataType::Int64 | DataType::Timestamp(TimeUnit::Nanosecond, None) => { - let values = statistics.map(|s| match &s { + let values = statistics.map(|s| match s { Some(Statistics::I64(v)) => get_aggregate(v, aggregate), - _ => &None, + _ => None, }); Some(Arc::new(Int64Array::from_iter(values))) } DataType::UInt64 => { - let values = statistics.map(|s| match &s { + let values = statistics.map(|s| match s { Some(Statistics::U64(v)) => get_aggregate(v, aggregate), - _ => &None, + _ => None, }); Some(Arc::new(UInt64Array::from_iter(values))) } DataType::Float64 => { - let values = statistics.map(|s| match &s { + let values = statistics.map(|s| match s { Some(Statistics::F64(v)) => get_aggregate(v, aggregate), - _ => &None, + _ => None, }); Some(Arc::new(Float64Array::from_iter(values))) } DataType::Boolean => { - let values = statistics.map(|s| match &s { + let values = statistics.map(|s| match s { Some(Statistics::Bool(v)) => get_aggregate(v, aggregate), - _ => &None, + _ => None, }); Some(Arc::new(BooleanArray::from_iter(values))) } DataType::Utf8 => { - let values = statistics.map(|s| match &s { + let values = statistics.map(|s| match s { Some(Statistics::String(v)) => get_aggregate(v, aggregate), - _ => &None, + _ => None, }); Some(Arc::new(StringArray::from_iter(values))) } DataType::Dictionary(key, value) if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => { - let values = statistics.map(|s| match &s { - Some(Statistics::String(v)) => get_aggregate(v, aggregate).as_deref(), + let values = statistics.map(|s| match s { + Some(Statistics::String(v)) => get_aggregate(v, aggregate), _ => None, }); + + // DictionaryArray can only be built from string references (`str`), not from owned strings (`String`), so + // we need to collect the strings first + let values: Vec<_> = values.collect(); + let values = values.iter().map(|s| s.as_deref()); Some(Arc::new(DictionaryArray::::from_iter(values))) } _ => None, @@ -210,11 +215,11 @@ fn collect_pruning_stats<'a>( } /// Returns the aggregate statistic corresponding to `aggregate` from `stats` -fn get_aggregate(stats: &StatValues, aggregate: Aggregate) -> &Option { +fn get_aggregate(stats: StatValues, aggregate: Aggregate) -> Option { match aggregate { - Aggregate::Min => &stats.min, - Aggregate::Max => &stats.max, - _ => &None, + Aggregate::Min => stats.min, + Aggregate::Max => stats.max, + _ => None, } } diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 1a775bec3b..28d0d09632 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -986,8 +986,8 @@ impl QueryChunk for TestChunk { } impl QueryChunkMeta for TestChunk { - fn summary(&self) -> Option<&TableSummary> { - Some(&self.table_summary) + fn summary(&self) -> Option> { + Some(Arc::new(self.table_summary.clone())) } fn schema(&self) -> Arc { diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 1d44da0159..e0a28c304f 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -21,6 +21,9 @@ pub(crate) mod util; /// Immutable metadata attached to a [`QuerierParquetChunk`]. #[derive(Debug)] pub struct ChunkMeta { + /// ID of the Parquet file of the chunk + parquet_file_id: ParquetFileId, + /// The ID of the chunk chunk_id: ChunkId, @@ -47,6 +50,11 @@ pub struct ChunkMeta { } impl ChunkMeta { + /// ID of the Parquet file of the chunk + pub fn parquet_file_id(&self) -> ParquetFileId { + self.parquet_file_id + } + /// Chunk order. pub fn order(&self) -> ChunkOrder { self.order @@ -81,14 +89,11 @@ impl ChunkMeta { /// Chunk representation of `read_buffer::RBChunk`s for the querier. #[derive(Debug)] pub struct QuerierRBChunk { - /// ID of the Parquet file of the chunk - parquet_file_id: ParquetFileId, - /// Underlying read buffer chunk rb_chunk: Arc, /// Table summary - table_summary: TableSummary, + table_summary: Arc, /// min/max time range of this table (extracted from TableSummary), if known timestamp_min_max: Option, @@ -109,17 +114,15 @@ pub struct QuerierRBChunk { impl QuerierRBChunk { /// Create new read-buffer-backed chunk pub fn new( - parquet_file_id: ParquetFileId, rb_chunk: Arc, meta: Arc, schema: Arc, partition_sort_key: Arc>, ) -> Self { - let table_summary = rb_chunk.table_summary(); + let table_summary = Arc::new(rb_chunk.table_summary()); let timestamp_min_max = table_summary.time_range(); Self { - parquet_file_id, rb_chunk, table_summary, timestamp_min_max, @@ -143,11 +146,6 @@ impl QuerierRBChunk { self.meta.as_ref() } - /// Parquet file ID - pub fn parquet_file_id(&self) -> ParquetFileId { - self.parquet_file_id - } - /// Set partition sort key pub fn with_partition_sort_key(self, partition_sort_key: Arc>) -> Self { Self { @@ -165,9 +163,6 @@ impl QuerierRBChunk { /// the query engine (DataFusion and InfluxRPC) expect. #[derive(Debug)] pub struct QuerierParquetChunk { - /// ID of the Parquet file of the chunk - parquet_file_id: ParquetFileId, - /// Chunk of the Parquet file parquet_chunk: Arc, @@ -181,24 +176,22 @@ pub struct QuerierParquetChunk { partition_sort_key: Arc>, /// Table summary - table_summary: TableSummary, + table_summary: Arc, } impl QuerierParquetChunk { /// Create new parquet-backed chunk (object store data). pub fn new( - parquet_file_id: ParquetFileId, parquet_chunk: Arc, meta: Arc, partition_sort_key: Arc>, ) -> Self { - let table_summary = create_basic_summary( + let table_summary = Arc::new(create_basic_summary( parquet_chunk.rows() as u64, &parquet_chunk.schema(), parquet_chunk.timestamp_min_max(), - ); + )); Self { - parquet_file_id, parquet_chunk, meta, delete_predicates: Vec::new(), @@ -233,11 +226,6 @@ impl QuerierParquetChunk { self.meta.as_ref() } - /// Parquet file ID - pub fn parquet_file_id(&self) -> ParquetFileId { - self.parquet_file_id - } - /// Return time range pub fn timestamp_min_max(&self) -> Option { Some(self.parquet_chunk.timestamp_min_max()) @@ -321,7 +309,6 @@ impl ChunkAdapter { )); Some(QuerierParquetChunk::new( - parts.parquet_file_id, chunk, parts.meta, parts.partition_sort_key, @@ -345,7 +332,6 @@ impl ChunkAdapter { .await; Some(QuerierRBChunk::new( - parts.parquet_file_id, rb_chunk, parts.meta, parts.schema, @@ -440,6 +426,7 @@ impl ChunkAdapter { let order = ChunkOrder::new(parquet_file.min_sequence_number.get()); let meta = Arc::new(ChunkMeta { + parquet_file_id: parquet_file.id, chunk_id, table_name, order, @@ -451,7 +438,6 @@ impl ChunkAdapter { }); Some(ChunkParts { - parquet_file_id: parquet_file.id, meta, schema, partition_sort_key, @@ -460,7 +446,6 @@ impl ChunkAdapter { } struct ChunkParts { - parquet_file_id: ParquetFileId, meta: Arc, schema: Arc, partition_sort_key: Arc>, diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 4f1b534b19..51934ae12b 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -41,8 +41,8 @@ pub enum Error { } impl QueryChunkMeta for QuerierParquetChunk { - fn summary(&self) -> Option<&TableSummary> { - Some(&self.table_summary) + fn summary(&self) -> Option> { + Some(Arc::clone(&self.table_summary)) } fn schema(&self) -> Arc { @@ -143,8 +143,8 @@ impl QueryChunk for QuerierParquetChunk { } impl QueryChunkMeta for QuerierRBChunk { - fn summary(&self) -> Option<&TableSummary> { - Some(&self.table_summary) + fn summary(&self) -> Option> { + Some(Arc::clone(&self.table_summary)) } fn schema(&self) -> Arc { diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index d87a1a7919..8ded2c2b63 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -920,7 +920,11 @@ impl IngesterPartition { let ts_min_max = compute_timenanosecond_min_max(&batches).expect("Should have time range"); let row_count = batches.iter().map(|batch| batch.num_rows()).sum::() as u64; - let summary = create_basic_summary(row_count, &expected_schema, ts_min_max); + let summary = Arc::new(create_basic_summary( + row_count, + &expected_schema, + ts_min_max, + )); let chunk = IngesterChunk { chunk_id, @@ -997,7 +1001,7 @@ pub struct IngesterChunk { ts_min_max: TimestampMinMax, /// Summary Statistics - summary: TableSummary, + summary: Arc, } impl IngesterChunk { @@ -1010,8 +1014,8 @@ impl IngesterChunk { } impl QueryChunkMeta for IngesterChunk { - fn summary(&self) -> Option<&TableSummary> { - Some(&self.summary) + fn summary(&self) -> Option> { + Some(Arc::clone(&self.summary)) } fn schema(&self) -> Arc { diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index b4d8384a0c..d334f5fe93 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -183,7 +183,7 @@ impl Reconciler { .chunk_adapter .catalog_cache() .processed_tombstones() - .exists(chunk.parquet_file_id(), tombstone.tombstone_id()) + .exists(chunk.meta().parquet_file_id(), tombstone.tombstone_id()) .await { continue;