From d1df95df87d0d520a62d84f12bac0642ef44d337 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 21 Mar 2022 12:47:54 +0100 Subject: [PATCH] refactor: dyn-dispatch chunks in query subsystem - this is what DataFusion is doing as well; it's also fast enough because the number of chunks in a query is not THAT massive (it's not like we are doing row-level dyn dispatching) - it simplifies abstracting over different databases - it allows us to drop our enum-based dispatching that we have for `DbChunk` and that we would also need for the querier (e.g. depending on if a chunk is backed by a parquet file or ingester data) - it likely speeds up compile times because the `query` is no longer contains massive amounts of generic code For #3934. --- compactor/src/compact.rs | 12 +- compactor/src/query.rs | 6 +- db/src/access.rs | 34 +- db/src/lib.rs | 5 +- db/src/lifecycle.rs | 5 +- db/src/lifecycle/compact.rs | 5 +- db/src/lifecycle/compact_object_store.rs | 8 +- db/src/lifecycle/load.rs | 4 +- db/src/lifecycle/persist.rs | 5 +- querier/src/namespace/query_access.rs | 7 +- query/src/frontend.rs | 8 +- query/src/frontend/influxrpc.rs | 73 ++-- query/src/frontend/reorg.rs | 46 +-- query/src/lib.rs | 14 +- query/src/provider.rs | 133 ++++---- query/src/provider/overlap.rs | 403 ++++++++++++++--------- query/src/provider/physical.rs | 12 +- query/src/pruning.rs | 88 +++-- query/src/test.rs | 8 +- query_tests/src/db.rs | 99 +----- query_tests/src/table_schema.rs | 2 +- server/tests/delete.rs | 5 +- 22 files changed, 459 insertions(+), 523 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 4d37357258..67c9c5503d 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -14,11 +14,11 @@ use iox_catalog::interface::Catalog; use object_store::DynObjectStore; use observability_deps::tracing::warn; use parquet_file::metadata::IoxMetadata; -use query::exec::Executor; use query::{ compute_sort_key_for_chunks, exec::ExecutorType, frontend::reorg::ReorgPlanner, util::compute_timenanosecond_min_max, }; +use query::{exec::Executor, QueryChunk}; use snafu::{ensure, ResultExt, Snafu}; use std::{ cmp::{max, min}, @@ -387,6 +387,10 @@ impl Compactor { } // Merge schema of the compacting chunks + let query_chunks: Vec<_> = query_chunks + .into_iter() + .map(|c| Arc::new(c) as Arc) + .collect(); let merged_schema = QueryableParquetChunk::merge_schemas(&query_chunks); // Compute the sorted output of the compacting result @@ -394,11 +398,7 @@ impl Compactor { // Build compact query plan let plan = ReorgPlanner::new() - .compact_plan( - Arc::clone(&merged_schema), - query_chunks.into_iter().map(Arc::new), - sort_key.clone(), - ) + .compact_plan(Arc::clone(&merged_schema), query_chunks, sort_key.clone()) .context(CompactLogicalPlanSnafu)?; let ctx = self.exec.new_context(ExecutorType::Reorg); let physical_plan = ctx diff --git a/compactor/src/query.rs b/compactor/src/query.rs index cf16567d5a..2fdc96344e 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -62,12 +62,10 @@ impl QueryableParquetChunk { } /// Merge schema of the given chunks - pub fn merge_schemas(chunks: &[Self]) -> Arc { + pub fn merge_schemas(chunks: &[Arc]) -> Arc { let mut merger = SchemaMerger::new(); for chunk in chunks { - merger = merger - .merge(&chunk.data.schema()) - .expect("schemas compatible"); + merger = merger.merge(&chunk.schema()).expect("schemas compatible"); } Arc::new(merger.build()) } diff --git a/db/src/access.rs b/db/src/access.rs index 2f983801ac..c790e0a69c 100644 --- a/db/src/access.rs +++ b/db/src/access.rs @@ -14,11 +14,11 @@ use metric::{Attributes, DurationCounter, Metric, U64Counter}; use observability_deps::tracing::debug; use parking_lot::Mutex; use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; -use query::exec::IOxExecutionContext; +use query::{exec::IOxExecutionContext, QueryChunk}; use query::{ provider::{ChunkPruner, ProviderBuilder}, pruning::{prune_chunks, PruningObserver}, - QueryChunkMeta, QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA, + QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA, }; use schema::Schema; use std::time::Instant; @@ -207,11 +207,15 @@ impl ChunkAccess { /// Returns all chunks from `table_name` that may have data that passes the /// specified predicates. The chunks are pruned as aggressively as /// possible based on metadata. - fn candidate_chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { + fn candidate_chunks( + &self, + table_name: &str, + predicate: &Predicate, + ) -> Vec> { let start = Instant::now(); // Get chunks and schema as a single transaction - let (mut chunks, schema) = { + let (chunks, schema) = { let table = match self.catalog.table(table_name).ok() { Some(table) => table, None => return vec![], @@ -227,6 +231,11 @@ impl ChunkAccess { (chunks, schema) }; + let mut chunks: Vec<_> = chunks + .into_iter() + .map(|c| c as Arc) + .collect(); + self.access_metrics.catalog_snapshot_count.inc(1); self.access_metrics .catalog_snapshot_duration @@ -249,14 +258,14 @@ impl ChunkAccess { } } -impl ChunkPruner for ChunkAccess { +impl ChunkPruner for ChunkAccess { fn prune_chunks( &self, table_name: &str, table_schema: Arc, - chunks: Vec>, + chunks: Vec>, predicate: &Predicate, - ) -> Vec> { + ) -> Vec> { let start = Instant::now(); debug!(num_chunks=chunks.len(), %predicate, "Attempting to prune chunks"); @@ -271,9 +280,7 @@ impl ChunkPruner for ChunkAccess { } impl PruningObserver for TableAccessMetrics { - type Observed = DbChunk; - - fn was_pruned(&self, chunk: &Self::Observed) { + fn was_pruned(&self, chunk: &dyn QueryChunk) { let chunk_summary = chunk.summary().expect("Chunk should have summary"); self.pruned_chunks.inc(1); self.pruned_rows.inc(chunk_summary.total_count()) @@ -282,10 +289,8 @@ impl PruningObserver for TableAccessMetrics { #[async_trait] impl QueryDatabase for QueryCatalogAccess { - type Chunk = DbChunk; - /// Return a covering set of chunks for a particular table and predicate - async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { + async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { self.chunk_access.candidate_chunks(table_name, predicate) } @@ -376,8 +381,7 @@ impl SchemaProvider for DbSchemaProvider { }; let mut builder = ProviderBuilder::new(table_name, schema); - builder = - builder.add_pruner(Arc::clone(&self.chunk_access) as Arc>); + builder = builder.add_pruner(Arc::clone(&self.chunk_access) as Arc); // TODO: Better chunk pruning (#3570) for chunk in self diff --git a/db/src/lib.rs b/db/src/lib.rs index 31aa51c98e..93d4ed80f8 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -44,6 +44,7 @@ use parquet_catalog::{ }; use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; +use query::QueryChunk; use query::{ exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext}, QueryCompletedToken, QueryDatabase, QueryText, @@ -1228,9 +1229,7 @@ impl Db { /// can just use Db as a `Database` even though the implementation /// lives in `catalog_access` impl QueryDatabase for Db { - type Chunk = DbChunk; - - async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { + async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { self.catalog_access.chunks(table_name, predicate).await } diff --git a/db/src/lifecycle.rs b/db/src/lifecycle.rs index 5caadcc9ce..a18c5c8636 100644 --- a/db/src/lifecycle.rs +++ b/db/src/lifecycle.rs @@ -1,4 +1,3 @@ -use super::DbChunk; use crate::{ catalog::{chunk::CatalogChunk, partition::Partition}, Db, @@ -25,7 +24,7 @@ use lifecycle::{ use observability_deps::tracing::{info, trace, warn}; use parking_lot::Mutex; use persistence_windows::persistence_windows::FlushHandle; -use query::QueryChunkMeta; +use query::QueryChunk; use schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME}; use std::{future::Future, sync::Arc}; use time::{Time, TimeProvider}; @@ -395,7 +394,7 @@ fn collect_rub( /// This is infallable because the schemas of chunks within a /// partition are assumed to be compatible because that schema was /// enforced as part of writing into the partition -fn merge_schemas(chunks: &[Arc]) -> Arc { +fn merge_schemas(chunks: &[Arc]) -> Arc { let mut merger = SchemaMerger::new(); for db_chunk in chunks { merger = merger diff --git a/db/src/lifecycle/compact.rs b/db/src/lifecycle/compact.rs index e522c71aef..e17557dbb6 100644 --- a/db/src/lifecycle/compact.rs +++ b/db/src/lifecycle/compact.rs @@ -10,7 +10,8 @@ use crate::{ use data_types::{chunk_metadata::ChunkOrder, delete_predicate::DeletePredicate, job::Job}; use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::*; -use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use query::QueryChunk; +use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner}; use std::{collections::HashSet, future::Future, sync::Arc}; use time::Time; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -73,7 +74,7 @@ pub(crate) fn compact_chunks( max_order = max_order.max(chunk.order()); chunk.set_compacting(®istration)?; - Ok(DbChunk::snapshot(&*chunk)) + Ok(DbChunk::snapshot(&*chunk) as Arc) }) .collect::>>()?; diff --git a/db/src/lifecycle/compact_object_store.rs b/db/src/lifecycle/compact_object_store.rs index 5c061cc3f5..8a6e0f6d35 100644 --- a/db/src/lifecycle/compact_object_store.rs +++ b/db/src/lifecycle/compact_object_store.rs @@ -32,7 +32,7 @@ use parquet_file::{ storage::Storage, }; use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint}; -use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunk}; use schema::sort::SortKey; use schema::Schema; use snafu::{OptionExt, ResultExt}; @@ -281,7 +281,7 @@ fn mark_chunks_to_compact( // Get the parquet dbchunk snapshot and also keep its file path to remove later let dbchunk = DbChunk::parquet_file_snapshot(&*chunk); compacted_parquet_file_paths.push(dbchunk.object_store_path().unwrap().clone()); - Ok(dbchunk) + Ok(dbchunk as Arc) }) .collect::>>()?; @@ -322,7 +322,7 @@ struct CompactingOsChunks { input_rows: u64, delete_predicates: HashSet>, compacted_parquet_file_paths: Vec, - os_chunks: Vec>, + os_chunks: Vec>, max_order: ChunkOrder, database_checkpoint: DatabaseCheckpoint, partition_checkpoint: PartitionCheckpoint, @@ -335,7 +335,7 @@ struct CompactingOsChunks { /// Deleted and duplicated data will be eliminated during the scan /// . Output schema of the compact plan /// . Sort Key of the output data -async fn compact_chunks(db: &Db, query_chunks: &[Arc]) -> Result { +async fn compact_chunks(db: &Db, query_chunks: &[Arc]) -> Result { // Tracking metric let ctx = db.exec.new_context(ExecutorType::Reorg); diff --git a/db/src/lifecycle/load.rs b/db/src/lifecycle/load.rs index 29dac2606d..23dd3b92d6 100644 --- a/db/src/lifecycle/load.rs +++ b/db/src/lifecycle/load.rs @@ -8,7 +8,7 @@ use crate::{catalog::chunk::CatalogChunk, lifecycle::collect_rub, DbChunk}; use data_types::job::Job; use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; -use query::{exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use query::{exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunk}; use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -30,7 +30,7 @@ pub fn load_chunk( chunk.set_loading_to_read_buffer(®istration)?; // Get queryable chunk - let db_chunk = DbChunk::snapshot(&*chunk); + let db_chunk = DbChunk::snapshot(&*chunk) as Arc; // Drop locks let chunk = chunk.into_data().chunk; diff --git a/db/src/lifecycle/persist.rs b/db/src/lifecycle/persist.rs index 71b13ee8c1..2997aefd87 100644 --- a/db/src/lifecycle/persist.rs +++ b/db/src/lifecycle/persist.rs @@ -11,7 +11,8 @@ use data_types::{chunk_metadata::ChunkOrder, delete_predicate::DeletePredicate, use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition}; use observability_deps::tracing::info; use persistence_windows::persistence_windows::FlushHandle; -use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use query::QueryChunk; +use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner}; use std::{collections::HashSet, future::Future, sync::Arc}; use time::Time; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -75,7 +76,7 @@ pub fn persist_chunks( max_order = max_order.max(chunk.order()); chunk.set_writing_to_object_store(®istration)?; - query_chunks.push(DbChunk::snapshot(&*chunk)); + query_chunks.push(DbChunk::snapshot(&*chunk) as Arc); } // drop partition lock guard diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index f0aa9d5dcd..5af675f9ed 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -2,11 +2,10 @@ use std::{any::Any, sync::Arc}; use async_trait::async_trait; use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; -use db::chunk::DbChunk; use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; use query::{ exec::{ExecutionContextProvider, ExecutorType, IOxExecutionContext}, - QueryCompletedToken, QueryDatabase, QueryText, + QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, }; use schema::Schema; use trace::ctx::SpanContext; @@ -25,9 +24,7 @@ impl QueryDatabaseMeta for QuerierNamespace { #[async_trait] impl QueryDatabase for QuerierNamespace { - type Chunk = DbChunk; - - async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { + async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { self.catalog_access.chunks(table_name, predicate).await } diff --git a/query/src/frontend.rs b/query/src/frontend.rs index d33e015664..5d61c76ec2 100644 --- a/query/src/frontend.rs +++ b/query/src/frontend.rs @@ -19,7 +19,7 @@ mod test { frontend::reorg::ReorgPlanner, provider::{DeduplicateExec, IOxReadFilterNode}, test::TestChunk, - QueryChunkMeta, + QueryChunk, QueryChunkMeta, }; /// A macro to asserts the contents of the extracted metrics is reasonable @@ -89,9 +89,7 @@ mod test { // now validate metrics are good let extracted = extract_metrics(plan.as_ref(), |plan| { - plan.as_any() - .downcast_ref::>() - .is_some() + plan.as_any().downcast_ref::().is_some() }) .unwrap(); @@ -197,7 +195,7 @@ mod test { extractor.inner } - fn get_test_chunks() -> (Arc, Vec>) { + fn get_test_chunks() -> (Arc, Vec>) { let chunk1 = Arc::new( TestChunk::new("t") .with_time_column_with_stats(Some(50), Some(7000)) diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 73aa92867f..8a0b2934d3 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -37,7 +37,7 @@ use crate::{ }, provider::ProviderBuilder, util::MissingColumnsToNull, - QueryChunk, QueryChunkMeta, QueryDatabase, + QueryChunk, QueryDatabase, }; #[derive(Debug, Snafu)] @@ -914,17 +914,14 @@ impl InfluxRpcPlanner { /// Filter(predicate) /// TableScan (of chunks) /// ``` - fn tag_keys_plan( + fn tag_keys_plan( &self, ctx: IOxExecutionContext, table_name: &str, schema: Arc, predicate: &Predicate, - chunks: Vec>, - ) -> Result> - where - C: QueryChunk + 'static, - { + chunks: Vec>, + ) -> Result> { let scan_and_filter = self.scan_and_filter( ctx.child_ctx("scan_and_filter planning"), table_name, @@ -989,17 +986,14 @@ impl InfluxRpcPlanner { /// Filter(predicate) [optional] /// Scan /// ``` - fn field_columns_plan( + fn field_columns_plan( &self, ctx: IOxExecutionContext, table_name: &str, schema: Arc, predicate: &Predicate, - chunks: Vec>, - ) -> Result> - where - C: QueryChunk + 'static, - { + chunks: Vec>, + ) -> Result> { let scan_and_filter = self.scan_and_filter( ctx.child_ctx("scan_and_filter planning"), table_name, @@ -1054,16 +1048,13 @@ impl InfluxRpcPlanner { /// Filter(predicate) [optional] /// Scan /// ``` - fn table_name_plan( + fn table_name_plan( &self, table_name: &str, schema: Arc, predicate: &Predicate, - chunks: Vec>, - ) -> Result> - where - C: QueryChunk + 'static, - { + chunks: Vec>, + ) -> Result> { debug!(%table_name, "Creating table_name full plan"); let scan_and_filter = self.scan_and_filter( self.ctx.child_ctx("scan_and_filter planning"), @@ -1107,17 +1098,14 @@ impl InfluxRpcPlanner { /// Order by (tag_columns, timestamp_column) /// Filter(predicate) /// Scan - fn read_filter_plan( + fn read_filter_plan( &self, ctx: IOxExecutionContext, table_name: impl AsRef, schema: Arc, predicate: &Predicate, - chunks: Vec>, - ) -> Result> - where - C: QueryChunk + 'static, - { + chunks: Vec>, + ) -> Result> { let table_name = table_name.as_ref(); let scan_and_filter = self.scan_and_filter( ctx.child_ctx("scan_and_filter planning"), @@ -1223,18 +1211,15 @@ impl InfluxRpcPlanner { /// GroupBy(gby cols, aggs, time cols) /// Filter(predicate) /// Scan - fn read_group_plan( + fn read_group_plan( &self, ctx: IOxExecutionContext, table_name: &str, schema: Arc, predicate: &Predicate, agg: Aggregate, - chunks: Vec>, - ) -> Result> - where - C: QueryChunk + 'static, - { + chunks: Vec>, + ) -> Result> { let scan_and_filter = self.scan_and_filter( ctx.child_ctx("scan_and_filter planning"), table_name, @@ -1341,7 +1326,7 @@ impl InfluxRpcPlanner { /// Filter(predicate) /// Scan #[allow(clippy::too_many_arguments)] - fn read_window_aggregate_plan( + fn read_window_aggregate_plan( &self, ctx: IOxExecutionContext, table_name: impl Into, @@ -1350,11 +1335,8 @@ impl InfluxRpcPlanner { agg: Aggregate, every: &WindowDuration, offset: &WindowDuration, - chunks: Vec>, - ) -> Result> - where - C: QueryChunk + 'static, - { + chunks: Vec>, + ) -> Result> { let table_name = table_name.into(); let scan_and_filter = self.scan_and_filter( ctx.child_ctx("scan_and_filter planning"), @@ -1429,17 +1411,14 @@ impl InfluxRpcPlanner { /// Filter(predicate) [optional] /// Scan /// ``` - fn scan_and_filter( + fn scan_and_filter( &self, ctx: IOxExecutionContext, table_name: &str, schema: Arc, predicate: &Predicate, - chunks: Vec>, - ) -> Result> - where - C: QueryChunk + 'static, - { + chunks: Vec>, + ) -> Result> { // Scan all columns to begin with (DataFusion projection // push-down optimization will prune out unneeded columns later) let projection = None; @@ -1505,10 +1484,10 @@ impl InfluxRpcPlanner { /// Prunes the provided list of chunks using [`QueryChunk::apply_predicate_to_metadata`] /// /// TODO: Should this logic live with the rest of the chunk pruning logic? -fn prune_chunks_metadata(chunks: Vec>, predicate: &Predicate) -> Result>> -where - C: QueryChunk + 'static, -{ +fn prune_chunks_metadata( + chunks: Vec>, + predicate: &Predicate, +) -> Result>> { let mut filtered = Vec::with_capacity(chunks.len()); for chunk in chunks { // Try and apply the predicate using only metadata diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index c9000a0156..5172a037f8 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -48,28 +48,22 @@ impl ReorgPlanner { /// Creates an execution plan for a full scan of a single chunk. /// This plan is primarilty used to load chunks from one storage medium to /// another. - pub fn scan_single_chunk_plan( + pub fn scan_single_chunk_plan( &self, schema: Arc, - chunk: Arc, - ) -> Result - where - C: QueryChunk + 'static, - { + chunk: Arc, + ) -> Result { self.scan_single_chunk_plan_with_filter(schema, chunk, None, vec![]) } /// Creates an execution plan for a scan and filter data of a single chunk - pub fn scan_single_chunk_plan_with_filter( + pub fn scan_single_chunk_plan_with_filter( &self, schema: Arc, - chunk: Arc, + chunk: Arc, projection: Option>, filters: Vec, - ) -> Result - where - C: QueryChunk + 'static, - { + ) -> Result { let table_name = chunk.table_name(); // Prepare the plan for the table let mut builder = ProviderBuilder::new(table_name, schema); @@ -108,15 +102,14 @@ impl ReorgPlanner { /// /// (Sort on output_sort) /// (Scan chunks) <-- any needed deduplication happens here - pub fn compact_plan( + pub fn compact_plan( &self, schema: Arc, chunks: I, sort_key: SortKey, ) -> Result where - C: QueryChunk + 'static, - I: IntoIterator>, + I: IntoIterator>, { let ScanPlan { plan_builder, @@ -174,7 +167,7 @@ impl ReorgPlanner { /// e | 3000 /// c | 4000 /// ``` - pub fn split_plan( + pub fn split_plan( &self, schema: Arc, chunks: I, @@ -182,8 +175,7 @@ impl ReorgPlanner { split_time: i64, ) -> Result where - C: QueryChunk + 'static, - I: IntoIterator>, + I: IntoIterator>, { let ScanPlan { plan_builder, @@ -209,15 +201,14 @@ impl ReorgPlanner { /// /// Refer to query::provider::build_scan_plan for the detail of the plan /// - fn sorted_scan_plan( + fn sorted_scan_plan( &self, schema: Arc, chunks: I, sort_key: SortKey, - ) -> Result> + ) -> Result where - C: QueryChunk + 'static, - I: IntoIterator>, + I: IntoIterator>, { let mut chunks = chunks.into_iter().peekable(); let table_name = match chunks.peek() { @@ -265,9 +256,9 @@ impl ReorgPlanner { } } -struct ScanPlan { +struct ScanPlan { plan_builder: LogicalPlanBuilder, - provider: Arc>, + provider: Arc, } #[cfg(test)] @@ -280,12 +271,11 @@ mod test { use crate::{ exec::{Executor, ExecutorType}, test::{raw_data, TestChunk}, - QueryChunkMeta, }; use super::*; - async fn get_test_chunks() -> (Arc, Vec>) { + async fn get_test_chunks() -> (Arc, Vec>) { // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( TestChunk::new("t") @@ -293,7 +283,7 @@ mod test { .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Chunk 2 has an extra field, and only 4 fields let chunk2 = Arc::new( @@ -304,7 +294,7 @@ mod test { .with_i64_field_column("field_int2") .with_may_contain_pk_duplicates(true) .with_four_rows_of_data(), - ); + ) as Arc; let expected = vec![ "+-----------+------+--------------------------------+", diff --git a/query/src/lib.rs b/query/src/lib.rs index 4d89342b49..005fb704a4 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -137,12 +137,10 @@ pub type QueryText = Box; /// data in Chunks. #[async_trait] pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync { - type Chunk: QueryChunk; - /// Returns a set of chunks within the partition with data that may match /// the provided predicate. If possible, chunks which have no rows that can /// possibly match the predicate may be omitted. - async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec>; + async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec>; /// Record that particular type of query was run / planned fn record_query( @@ -258,20 +256,14 @@ where } /// return true if all the chunks inlcude statistics -pub fn chunks_have_stats(chunks: &[C]) -> bool -where - C: QueryChunkMeta, -{ +pub fn chunks_have_stats(chunks: &[Arc]) -> bool { // If at least one of the provided chunk cannot provide stats, // do not need to compute potential duplicates. We will treat // as all of them have duplicates chunks.iter().all(|c| c.summary().is_some()) } -pub fn compute_sort_key_for_chunks(schema: &Schema, chunks: &[C]) -> SortKey -where - C: QueryChunkMeta, -{ +pub fn compute_sort_key_for_chunks(schema: &Schema, chunks: &[Arc]) -> SortKey { if !chunks_have_stats(chunks) { // chunks have not enough stats, return its pk that is // sorted lexicographically but time column always last diff --git a/query/src/provider.rs b/query/src/provider.rs index 6dbd75566b..ca5e39bdc2 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -25,7 +25,7 @@ use crate::{ chunks_have_stats, compute_sort_key_for_chunks, exec::IOxExecutionContext, util::{arrow_sort_key_exprs, df_physical_expr}, - QueryChunk, QueryChunkMeta, + QueryChunk, }; use snafu::{ResultExt, Snafu}; @@ -86,33 +86,33 @@ impl From for DataFusionError { } /// Something that can prune chunks based on their metadata -pub trait ChunkPruner: Sync + Send + std::fmt::Debug { +pub trait ChunkPruner: Sync + Send + std::fmt::Debug { /// prune `chunks`, if possible, based on predicate. fn prune_chunks( &self, table_name: &str, table_schema: Arc, - chunks: Vec>, + chunks: Vec>, predicate: &Predicate, - ) -> Vec>; + ) -> Vec>; } /// Builds a `ChunkTableProvider` from a series of `QueryChunk`s /// and ensures the schema across the chunks is compatible and /// consistent. #[derive(Debug)] -pub struct ProviderBuilder { +pub struct ProviderBuilder { table_name: Arc, schema: Arc, - chunk_pruner: Option>>, - chunks: Vec>, + chunk_pruner: Option>, + chunks: Vec>, sort_key: Option, // execution context used for tracing ctx: IOxExecutionContext, } -impl ProviderBuilder { +impl ProviderBuilder { pub fn new(table_name: impl AsRef, schema: Arc) -> Self { Self { table_name: Arc::from(table_name.as_ref()), @@ -137,14 +137,14 @@ impl ProviderBuilder { } /// Add a new chunk to this provider - pub fn add_chunk(mut self, chunk: Arc) -> Self { + pub fn add_chunk(mut self, chunk: Arc) -> Self { self.chunks.push(chunk); self } /// Specify a `ChunkPruner` for the provider that will apply /// additional chunk level pruning based on pushed down predicates - pub fn add_pruner(mut self, chunk_pruner: Arc>) -> Self { + pub fn add_pruner(mut self, chunk_pruner: Arc) -> Self { assert!( self.chunk_pruner.is_none(), "Chunk pruner already specified" @@ -165,7 +165,7 @@ impl ProviderBuilder { } /// Create the Provider - pub fn build(self) -> Result> { + pub fn build(self) -> Result { let chunk_pruner = match self.chunk_pruner { Some(chunk_pruner) => chunk_pruner, None => { @@ -192,14 +192,14 @@ impl ProviderBuilder { /// This allows DataFusion to see data from Chunks as a single table, as well as /// push predicates and selections down to chunks #[derive(Debug)] -pub struct ChunkTableProvider { +pub struct ChunkTableProvider { table_name: Arc, /// The IOx schema (wrapper around Arrow Schemaref) for this table iox_schema: Arc, /// Something that can prune chunks - chunk_pruner: Arc>, + chunk_pruner: Arc, /// The chunks - chunks: Vec>, + chunks: Vec>, /// The sort key if any sort_key: Option, @@ -207,7 +207,7 @@ pub struct ChunkTableProvider { ctx: IOxExecutionContext, } -impl ChunkTableProvider { +impl ChunkTableProvider { /// Return the IOx schema view for the data provided by this provider pub fn iox_schema(&self) -> Arc { Arc::clone(&self.iox_schema) @@ -225,7 +225,7 @@ impl ChunkTableProvider { } #[async_trait] -impl TableProvider for ChunkTableProvider { +impl TableProvider for ChunkTableProvider { fn as_any(&self) -> &dyn std::any::Any { self } @@ -252,7 +252,7 @@ impl TableProvider for ChunkTableProvider { // Now we have a second attempt to prune out chunks based on // metadata using the pushed down predicate (e.g. in SQL). - let chunks: Vec> = self.chunks.to_vec(); + let chunks: Vec> = self.chunks.to_vec(); let num_initial_chunks = chunks.len(); let chunks = self.chunk_pruner.prune_chunks( self.table_name(), @@ -301,21 +301,21 @@ impl TableProvider for ChunkTableProvider { #[derive(Debug)] /// A deduplicater that deduplicate the duplicated data during scan execution -pub(crate) struct Deduplicater { +pub(crate) struct Deduplicater { /// a vector of a vector of overlapped chunks - pub overlapped_chunks_set: Vec>>, + pub overlapped_chunks_set: Vec>>, /// a vector of non-overlapped chunks each have duplicates in itself - pub in_chunk_duplicates_chunks: Vec>, + pub in_chunk_duplicates_chunks: Vec>, /// a vector of non-overlapped and non-duplicates chunks - pub no_duplicates_chunks: Vec>, + pub no_duplicates_chunks: Vec>, // execution context ctx: IOxExecutionContext, } -impl Deduplicater { +impl Deduplicater { pub(crate) fn new() -> Self { Self { overlapped_chunks_set: vec![], @@ -405,7 +405,7 @@ impl Deduplicater { &mut self, table_name: Arc, output_schema: Arc, - chunks: Vec>, + chunks: Vec>, predicate: Predicate, output_sort_key: Option, ) -> Result> { @@ -518,7 +518,7 @@ impl Deduplicater { /// 1. vector of vector of overlapped chunks /// 2. vector of non-overlapped chunks, each have duplicates in itself /// 3. vectors of non-overlapped chunks without duplicates - fn split_overlapped_chunks(&mut self, chunks: Vec>) -> Result<()> { + fn split_overlapped_chunks(&mut self, chunks: Vec>) -> Result<()> { if !chunks_have_stats(&chunks) { // no statistics, consider all chunks overlap self.overlapped_chunks_set.push(chunks); @@ -590,7 +590,7 @@ impl Deduplicater { ctx: IOxExecutionContext, table_name: Arc, output_schema: Arc, - chunks: Vec>, // These chunks are identified overlapped + chunks: Vec>, // These chunks are identified overlapped predicate: Predicate, sort_key: &SortKey, ) -> Result> { @@ -679,7 +679,7 @@ impl Deduplicater { ctx: IOxExecutionContext, table_name: Arc, output_schema: Arc, - chunk: Arc, // This chunk is identified having duplicates + chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, sort_key: &SortKey, ) -> Result> { @@ -807,8 +807,8 @@ impl Deduplicater { ctx: IOxExecutionContext, table_name: Arc, output_schema: Arc, - chunk: Arc, // This chunk is identified having duplicates - predicate: Predicate, // This is the select predicate of the query + chunk: Arc, // This chunk is identified having duplicates + predicate: Predicate, // This is the select predicate of the query sort_key: Option<&SortKey>, ) -> Result> { // Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode @@ -898,7 +898,7 @@ impl Deduplicater { /// Add SortExec operator on top of the input plan of the given chunk /// The plan will be sorted on the chunk's primary key fn build_sort_plan( - chunk: Arc, + chunk: Arc, input: Arc, output_sort_key: &SortKey, ) -> Result> { @@ -952,7 +952,7 @@ impl Deduplicater { ctx: IOxExecutionContext, table_name: Arc, output_schema: Arc, - chunk: Arc, // This chunk is identified having no duplicates + chunk: Arc, // This chunk is identified having no duplicates predicate: Predicate, sort_key: Option<&SortKey>, ) -> Result> { @@ -1001,7 +1001,7 @@ impl Deduplicater { ctx: IOxExecutionContext, table_name: Arc, output_schema: Arc, - chunks: Vec>, // These chunks is identified having no duplicates + chunks: Vec>, // These chunks is identified having no duplicates predicate: Predicate, output_sort_key: Option<&SortKey>, ) -> Result>> { @@ -1039,14 +1039,14 @@ impl Deduplicater { sorted_chunk_plans } - fn no_delete_predicates(chunks: &[Arc]) -> bool { + fn no_delete_predicates(chunks: &[Arc]) -> bool { chunks .iter() .all(|chunk| chunk.delete_predicates().is_empty()) } /// Find the columns needed in chunks' primary keys across schemas - fn compute_pk_schema(chunks: &[Arc]) -> Arc { + fn compute_pk_schema(chunks: &[Arc]) -> Arc { let mut schema_merger = SchemaMerger::new(); for chunk in chunks { let chunk_schema = chunk.schema(); @@ -1081,14 +1081,14 @@ impl Deduplicater { #[derive(Debug)] /// A pruner that does not do pruning (suitable if no additional pruning is possible) struct NoOpPruner {} -impl ChunkPruner for NoOpPruner { +impl ChunkPruner for NoOpPruner { fn prune_chunks( &self, _table_name: &str, _table_schema: Arc, - chunks: Vec>, + chunks: Vec>, _predicate: &Predicate, - ) -> Vec> { + ) -> Vec> { chunks } } @@ -1102,10 +1102,7 @@ mod test { use datafusion_util::test_collect; use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME}; - use crate::{ - test::{raw_data, TestChunk}, - QueryChunkMeta, - }; + use crate::test::{raw_data, TestChunk}; use super::*; @@ -1174,7 +1171,7 @@ mod test { .with_tag_column("tag1") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; let sort_key = SortKey::from_columns(vec!["tag1", TIME_COLUMN_NAME]); @@ -1248,7 +1245,7 @@ mod test { ) .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; let sort_key = SortKey::from_columns(vec!["tag1", "tag2", "tag3", TIME_COLUMN_NAME]); @@ -1322,7 +1319,7 @@ mod test { ) .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; let sort_key = SortKey::from_columns(vec!["tag1", "tag2", TIME_COLUMN_NAME]); @@ -1367,7 +1364,7 @@ mod test { .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Chunk 2 exactly the same with Chunk 1 let chunk2 = Arc::new( @@ -1378,7 +1375,7 @@ mod test { .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Datafusion schema of the chunk // the same for 2 chunks let schema = chunk1.schema(); @@ -1443,7 +1440,7 @@ mod test { .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Chunk 2 exactly the same with Chunk 1 let chunk2 = Arc::new( @@ -1454,7 +1451,7 @@ mod test { .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; let chunks = vec![chunk1, chunk2]; // data in its original form @@ -1523,7 +1520,7 @@ mod test { .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Chunk 2 same tags, but different fields let chunk2 = Arc::new( @@ -1533,7 +1530,7 @@ mod test { .with_tag_column("tag1") .with_i64_field_column("other_field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Chunk 3 exactly the same with Chunk 2 let chunk3 = Arc::new( @@ -1543,7 +1540,7 @@ mod test { .with_tag_column("tag1") .with_i64_field_column("other_field_int") .with_five_rows_of_data(), - ); + ) as Arc; let chunks = vec![chunk1, chunk2, chunk3]; // data in its original form @@ -1621,7 +1618,7 @@ mod test { .with_tag_column("tag2") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Chunk 2 has two different tags let chunk2 = Arc::new( @@ -1632,7 +1629,7 @@ mod test { .with_tag_column("tag1") .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Chunk 3 has just tag3 let chunk3 = Arc::new( @@ -1643,7 +1640,7 @@ mod test { .with_i64_field_column("field_int") .with_i64_field_column("field_int2") .with_five_rows_of_data(), - ); + ) as Arc; // With provided stats, the computed key will be (tag2, tag1, tag3, time) // Requested output schema == the schema for all three @@ -1739,7 +1736,7 @@ mod test { ) .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Datafusion schema of the chunk let schema = chunk.schema(); @@ -1791,7 +1788,7 @@ mod test { .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) .with_ten_rows_of_data_some_duplicates(), - ); + ) as Arc; // Datafusion schema of the chunk let schema = chunk.schema(); @@ -1861,7 +1858,7 @@ mod test { .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) .with_ten_rows_of_data_some_duplicates(), - ); + ) as Arc; let chunks = vec![chunk]; // data in its original form @@ -1941,7 +1938,7 @@ mod test { ) .with_i64_field_column("field_int") .with_ten_rows_of_data_some_duplicates(), - ); + ) as Arc; let chunk2 = Arc::new( TestChunk::new("t") @@ -1960,7 +1957,7 @@ mod test { ) .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // Datafusion schema of the chunk let schema = chunk1.schema(); @@ -2036,7 +2033,7 @@ mod test { ) .with_i64_field_column("field_int") .with_ten_rows_of_data_some_duplicates(), - ); + ) as Arc; // chunk2 overlaps with chunk 1 let chunk2 = Arc::new( @@ -2057,7 +2054,7 @@ mod test { ) .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // chunk3 no overlap, no duplicates within let chunk3 = Arc::new( @@ -2078,7 +2075,7 @@ mod test { ) .with_i64_field_column("field_int") .with_three_rows_of_data(), - ); + ) as Arc; // chunk4 no overlap, duplicates within let chunk4 = Arc::new( @@ -2100,7 +2097,7 @@ mod test { .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) .with_four_rows_of_data(), - ); + ) as Arc; // Datafusion schema of the chunk let schema = chunk1.schema(); @@ -2195,7 +2192,7 @@ mod test { ) .with_i64_field_column("field_int") .with_ten_rows_of_data_some_duplicates(), - ); + ) as Arc; // chunk2 overlaps with chunk 1 let chunk2 = Arc::new( @@ -2216,7 +2213,7 @@ mod test { ) .with_i64_field_column("field_int") .with_five_rows_of_data(), - ); + ) as Arc; // chunk3 no overlap, no duplicates within let chunk3 = Arc::new( @@ -2237,7 +2234,7 @@ mod test { ) .with_i64_field_column("field_int") .with_three_rows_of_data(), - ); + ) as Arc; // chunk3 no overlap, duplicates within let chunk4 = Arc::new( @@ -2259,7 +2256,7 @@ mod test { .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) .with_four_rows_of_data(), - ); + ) as Arc; // Datafusion schema of the chunk let schema = chunk1.schema(); @@ -2333,7 +2330,7 @@ mod test { assert_batches_eq!(&expected, &batch); } - fn chunk_ids(group: &[Arc]) -> String { + fn chunk_ids(group: &[Arc]) -> String { let ids = group .iter() .map(|c| c.id().get().to_string()) @@ -2341,7 +2338,7 @@ mod test { ids.join(", ") } - fn chunk_group_ids(groups: &[Vec>]) -> Vec { + fn chunk_group_ids(groups: &[Vec>]) -> Vec { groups .iter() .enumerate() diff --git a/query/src/provider/overlap.rs b/query/src/provider/overlap.rs index f0694f19b2..3626674b2e 100644 --- a/query/src/provider/overlap.rs +++ b/query/src/provider/overlap.rs @@ -7,9 +7,9 @@ use data_types::partition_metadata::{ColumnSummary, StatOverlap, Statistics}; use schema::TIME_COLUMN_NAME; use snafu::Snafu; -use std::cmp::Ordering; +use std::{cmp::Ordering, sync::Arc}; -use crate::QueryChunkMeta; +use crate::QueryChunk; #[derive(Debug, Snafu)] pub enum Error { @@ -32,7 +32,7 @@ pub enum Error { pub type Result = std::result::Result; -/// Groups [`QueryChunkMeta`] objects into disjoint sets using values of +/// Groups [`QueryChunk`] objects into disjoint sets using values of /// min/max statistics. The groups are formed such that each group /// *may* contain InfluxDB data model primary key duplicates with /// others in that set. @@ -48,17 +48,16 @@ pub type Result = std::result::Result; /// /// Note 2: this algorithm is O(n^2) worst case (when no chunks have /// any overlap) -pub fn group_potential_duplicates(chunks: Vec) -> Result>> -where - C: QueryChunkMeta, -{ - let mut groups: Vec>> = vec![]; +pub fn group_potential_duplicates( + chunks: Vec>, +) -> Result>>> { + let mut groups: Vec>> = vec![]; // Step 1: find the up groups using references to `chunks` stored // in KeyStats views for (idx, chunk) in chunks.iter().enumerate() { // try to find a place to put this chunk - let mut key_stats = Some(KeyStats::new(idx, chunk)); + let mut key_stats = Some(KeyStats::new(idx, chunk.as_ref())); 'outer: for group in &mut groups { // If this chunk overlaps any existing chunk in group add @@ -89,7 +88,7 @@ where .map(|group| group.into_iter().map(|key_stats| key_stats.index).collect()) .collect(); - let mut chunks: Vec> = chunks.into_iter().map(Some).collect(); + let mut chunks: Vec>> = chunks.into_iter().map(Some).collect(); let groups = groups .into_iter() @@ -101,9 +100,9 @@ where .take() .expect("Internal mismatch while gathering into groups") }) - .collect::>() + .collect::>() }) - .collect::>>(); + .collect::>>(); Ok(groups) } @@ -111,29 +110,23 @@ where /// Holds a view to a chunk along with information about its columns /// in an easy to compare form #[derive(Debug)] -struct KeyStats<'a, C> -where - C: QueryChunkMeta, -{ +struct KeyStats<'a> { /// The index of the chunk index: usize, /// The underlying chunk #[allow(dead_code)] - chunk: &'a C, + chunk: &'a dyn QueryChunk, /// the ColumnSummaries for the chunk's 'primary_key' columns, in /// "lexographical" order (aka sorted by name) key_summaries: Vec<&'a ColumnSummary>, } -impl<'a, C> KeyStats<'a, C> -where - C: QueryChunkMeta, -{ +impl<'a> KeyStats<'a> { /// Create a new view for the specified chunk at index `index`, /// computing the columns to be used in the primary key comparison - pub fn new(index: usize, chunk: &'a C) -> Self { + pub fn new(index: usize, chunk: &'a dyn QueryChunk) -> Self { // find summaries for each primary key column: let key_summaries = chunk .schema() @@ -297,17 +290,17 @@ mod test { #[test] fn one_column_no_overlap() { - let c1 = TestChunk::new("chunk1").with_tag_column_with_stats( + let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats( "tag1", Some("boston"), Some("mumbai"), - ); + )); - let c2 = TestChunk::new("chunk2").with_tag_column_with_stats( + let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats( "tag1", Some("new york"), Some("zoo york"), - ); + )); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -317,17 +310,17 @@ mod test { #[test] fn one_column_overlap() { - let c1 = TestChunk::new("chunk1").with_tag_column_with_stats( + let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats( "tag1", Some("boston"), Some("new york"), - ); + )); - let c2 = TestChunk::new("chunk2").with_tag_column_with_stats( + let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats( "tag1", Some("denver"), Some("zoo york"), - ); + )); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -337,9 +330,11 @@ mod test { #[test] fn one_time_column_overlap() { - let c1 = TestChunk::new("chunk1").with_time_column_with_stats(Some(100), Some(1000)); + let c1 = + Arc::new(TestChunk::new("chunk1").with_time_column_with_stats(Some(100), Some(1000))); - let c2 = TestChunk::new("chunk2").with_time_column_with_stats(Some(200), Some(500)); + let c2 = + Arc::new(TestChunk::new("chunk2").with_time_column_with_stats(Some(200), Some(500))); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -349,24 +344,32 @@ mod test { #[test] fn multi_columns() { - let c1 = TestChunk::new("chunk1") - .with_time_column_with_stats(Some(0), Some(1000)) - .with_tag_column_with_stats("tag1", Some("boston"), Some("new york")); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column_with_stats(Some(0), Some(1000)) + .with_tag_column_with_stats("tag1", Some("boston"), Some("new york")), + ); // Overlaps in tag1, but not in time - let c2 = TestChunk::new("chunk2") - .with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york")) - .with_time_column_with_stats(Some(2000), Some(3000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york")) + .with_time_column_with_stats(Some(2000), Some(3000)), + ); // Overlaps in time, but not in tag1 - let c3 = TestChunk::new("chunk3") - .with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy")) - .with_time_column_with_stats(Some(500), Some(1500)); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy")) + .with_time_column_with_stats(Some(500), Some(1500)), + ); // Overlaps in time, and in tag1 - let c4 = TestChunk::new("chunk4") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("zzz")) - .with_time_column_with_stats(Some(500), Some(1500)); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("zzz")) + .with_time_column_with_stats(Some(500), Some(1500)), + ); let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded"); @@ -380,37 +383,49 @@ mod test { #[test] fn missing_columns() { - let c1 = TestChunk::new("chunk1") - .with_time_column_with_stats(Some(0), Some(1000)) - .with_tag_column_with_stats("tag1", Some("boston"), Some("new york")) - .with_tag_column_with_stats("tag2", Some("boston"), Some("new york")) - .with_tag_column_with_stats("z", Some("a"), Some("b")); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column_with_stats(Some(0), Some(1000)) + .with_tag_column_with_stats("tag1", Some("boston"), Some("new york")) + .with_tag_column_with_stats("tag2", Some("boston"), Some("new york")) + .with_tag_column_with_stats("z", Some("a"), Some("b")), + ); // Overlaps in tag1, but not in time - let c2 = TestChunk::new("chunk2") - .with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york")) - .with_time_column_with_stats(Some(2000), Some(3000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york")) + .with_time_column_with_stats(Some(2000), Some(3000)), + ); // Overlaps in time and z, but not in tag2 - let c3 = TestChunk::new("chunk3") - .with_tag_column_with_stats("tag2", Some("zzx"), Some("zzy")) - .with_tag_column_with_stats("z", Some("a"), Some("b")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_tag_column_with_stats("tag2", Some("zzx"), Some("zzy")) + .with_tag_column_with_stats("z", Some("a"), Some("b")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); // Overlaps in time, but not in tag1 - let c4 = TestChunk::new("chunk4") - .with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy")) - .with_time_column_with_stats(Some(2000), Some(3000)); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy")) + .with_time_column_with_stats(Some(2000), Some(3000)), + ); // Overlaps in time, but not z - let c5 = TestChunk::new("chunk5") - .with_tag_column_with_stats("z", Some("c"), Some("d")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c5 = Arc::new( + TestChunk::new("chunk5") + .with_tag_column_with_stats("z", Some("c"), Some("d")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); // Overlaps in z, but not in time - let c6 = TestChunk::new("chunk6") - .with_tag_column_with_stats("z", Some("a"), Some("b")) - .with_time_column_with_stats(Some(4000), Some(5000)); + let c6 = Arc::new( + TestChunk::new("chunk6") + .with_tag_column_with_stats("z", Some("a"), Some("b")) + .with_time_column_with_stats(Some(4000), Some(5000)), + ); let groups = group_potential_duplicates(vec![c1, c2, c3, c4, c5, c6]).expect("grouping succeeded"); @@ -436,24 +451,32 @@ mod test { // Even "time" column is stored in front of "url", the primary_key function // invoked inside potential_overlap invoked by group_potential_duplicates // will return "url", "time" - let c1 = TestChunk::new("chunk1") - .with_time_column_with_stats(Some(0), Some(1000)) - .with_tag_column_with_stats("url", Some("boston"), Some("new york")); // "url" > "time" + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_time_column_with_stats(Some(0), Some(1000)) + .with_tag_column_with_stats("url", Some("boston"), Some("new york")), + ); // "url" > "time" // Overlaps in tag1, but not in time - let c2 = TestChunk::new("chunk2") - .with_tag_column_with_stats("url", Some("denver"), Some("zoo york")) - .with_time_column_with_stats(Some(2000), Some(3000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_tag_column_with_stats("url", Some("denver"), Some("zoo york")) + .with_time_column_with_stats(Some(2000), Some(3000)), + ); // Overlaps in time, but not in tag1 - let c3 = TestChunk::new("chunk3") - .with_tag_column_with_stats("url", Some("zzx"), Some("zzy")) - .with_time_column_with_stats(Some(500), Some(1500)); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_tag_column_with_stats("url", Some("zzx"), Some("zzy")) + .with_time_column_with_stats(Some(500), Some(1500)), + ); // Overlaps in time, and in tag1 - let c4 = TestChunk::new("chunk4") - .with_tag_column_with_stats("url", Some("aaa"), Some("zzz")) - .with_time_column_with_stats(Some(500), Some(1500)); + let c4 = Arc::new( + TestChunk::new("chunk4") + .with_tag_column_with_stats("url", Some("aaa"), Some("zzz")) + .with_time_column_with_stats(Some(500), Some(1500)), + ); let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded"); @@ -468,10 +491,16 @@ mod test { #[test] fn boundary() { // check that overlap calculations include the bound - let c1 = - TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); - let c2 = - TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("bbb"), Some("ccc")); + let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats( + "tag1", + Some("aaa"), + Some("bbb"), + )); + let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats( + "tag1", + Some("bbb"), + Some("ccc"), + )); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -482,10 +511,16 @@ mod test { #[test] fn same() { // check that if chunks overlap exactly on the boundaries they are still grouped - let c1 = - TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); - let c2 = - TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); + let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats( + "tag1", + Some("aaa"), + Some("bbb"), + )); + let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats( + "tag1", + Some("aaa"), + Some("bbb"), + )); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -496,10 +531,16 @@ mod test { #[test] fn different_tag_names() { // check that if chunks overlap but in different tag names - let c1 = - TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); - let c2 = - TestChunk::new("chunk2").with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")); + let c1 = Arc::new(TestChunk::new("chunk1").with_tag_column_with_stats( + "tag1", + Some("aaa"), + Some("bbb"), + )); + let c2 = Arc::new(TestChunk::new("chunk2").with_tag_column_with_stats( + "tag2", + Some("aaa"), + Some("bbb"), + )); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -512,13 +553,17 @@ mod test { #[test] fn different_tag_names_multi_tags() { // check that if chunks overlap but in different tag names - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")), + ); - let c2 = TestChunk::new("chunk2") - .with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag3", Some("aaa"), Some("bbb")); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag3", Some("aaa"), Some("bbb")), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -529,22 +574,28 @@ mod test { #[test] fn three_column() { - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); - let c2 = TestChunk::new("chunk2") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) - // Timestamp doesn't overlap, but the two tags do - .with_time_column_with_stats(Some(2001), Some(3000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + // Timestamp doesn't overlap, but the two tags do + .with_time_column_with_stats(Some(2001), Some(3000)), + ); - let c3 = TestChunk::new("chunk3") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz")) - // all three overlap - .with_time_column_with_stats(Some(1000), Some(2000)); + let c3 = Arc::new( + TestChunk::new("chunk3") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz")) + // all three overlap + .with_time_column_with_stats(Some(1000), Some(2000)), + ); let groups = group_potential_duplicates(vec![c1, c2, c3]).expect("grouping succeeded"); @@ -554,16 +605,20 @@ mod test { #[test] fn tag_order() { - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); - let c2 = TestChunk::new("chunk2") - .with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz")) - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - // all three overlap, but tags in different order - .with_time_column_with_stats(Some(500), Some(1000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + .with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz")) + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + // all three overlap, but tags in different order + .with_time_column_with_stats(Some(500), Some(1000)), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -573,16 +628,20 @@ mod test { #[test] fn tag_order_no_tags() { - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); - let c2 = TestChunk::new("chunk2") - // tag1 and timestamp overlap, but no tag2 (aka it is all null) - // so it could overlap if there was a null tag2 value in chunk1 - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_time_column_with_stats(Some(500), Some(1000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + // tag1 and timestamp overlap, but no tag2 (aka it is all null) + // so it could overlap if there was a null tag2 value in chunk1 + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_time_column_with_stats(Some(500), Some(1000)), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -592,17 +651,21 @@ mod test { #[test] fn tag_order_null_stats() { - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); - let c2 = TestChunk::new("chunk2") - // tag1 and timestamp overlap, tag2 has no stats (is all null) - // so they might overlap if chunk1 had a null in tag 2 - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_tag_column_with_stats("tag2", None, None) - .with_time_column_with_stats(Some(500), Some(1000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + // tag1 and timestamp overlap, tag2 has no stats (is all null) + // so they might overlap if chunk1 had a null in tag 2 + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", None, None) + .with_time_column_with_stats(Some(500), Some(1000)), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -612,14 +675,18 @@ mod test { #[test] fn tag_order_partial_stats() { - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); - let c2 = TestChunk::new("chunk2") - // tag1 has a min but not a max. Should result in error - .with_tag_column_with_stats("tag1", Some("aaa"), None) - .with_time_column_with_stats(Some(500), Some(1000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + // tag1 has a min but not a max. Should result in error + .with_tag_column_with_stats("tag1", Some("aaa"), None) + .with_time_column_with_stats(Some(500), Some(1000)), + ); let result = group_potential_duplicates(vec![c1, c2]).unwrap_err(); @@ -636,17 +703,21 @@ mod test { #[test] fn tag_fields_not_counted() { - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_i64_field_column_with_stats("field", Some(0), Some(2)) - .with_time_column_with_stats(Some(0), Some(1000)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_i64_field_column_with_stats("field", Some(0), Some(2)) + .with_time_column_with_stats(Some(0), Some(1000)), + ); - let c2 = TestChunk::new("chunk2") - // tag1 and timestamp overlap, but field value does not - // should still overlap - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_i64_field_column_with_stats("field", Some(100), Some(200)) - .with_time_column_with_stats(Some(500), Some(1000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + // tag1 and timestamp overlap, but field value does not + // should still overlap + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_i64_field_column_with_stats("field", Some(100), Some(200)) + .with_time_column_with_stats(Some(500), Some(1000)), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -659,16 +730,20 @@ mod test { // When the same column has different types in different // chunks; this will likely cause errors elsewhere in practice // as the schemas are incompatible (and can't be merged) - let c1 = TestChunk::new("chunk1") - .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) - .with_time_column_with_stats(Some(0), Some(1000)); + let c1 = Arc::new( + TestChunk::new("chunk1") + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_time_column_with_stats(Some(0), Some(1000)), + ); - let c2 = TestChunk::new("chunk2") - // tag1 column is actually a field is different in chunk - // 2, so since the timestamps overlap these chunks - // might also have duplicates (if tag1 was null in c1) - .with_i64_field_column_with_stats("tag1", Some(100), Some(200)) - .with_time_column_with_stats(Some(0), Some(1000)); + let c2 = Arc::new( + TestChunk::new("chunk2") + // tag1 column is actually a field is different in chunk + // 2, so since the timestamps overlap these chunks + // might also have duplicates (if tag1 was null in c1) + .with_i64_field_column_with_stats("tag1", Some(100), Some(200)) + .with_time_column_with_stats(Some(0), Some(1000)), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -678,7 +753,7 @@ mod test { // --- Test infrastructure -- - fn to_string(groups: Vec>) -> Vec { + fn to_string(groups: Vec>>) -> Vec { let mut s = vec![]; for (idx, group) in groups.iter().enumerate() { let names = group.iter().map(|c| c.table_name()).collect::>(); diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index a2e69a9943..8426d3d525 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -25,12 +25,12 @@ use super::adapter::SchemaAdapterStream; /// Implements the DataFusion physical plan interface #[derive(Debug)] -pub(crate) struct IOxReadFilterNode { +pub(crate) struct IOxReadFilterNode { table_name: Arc, /// The desired output schema (includes selection) /// note that the chunk may not have all these columns. iox_schema: Arc, - chunks: Vec>, + chunks: Vec>, predicate: Predicate, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -39,7 +39,7 @@ pub(crate) struct IOxReadFilterNode { ctx: IOxExecutionContext, } -impl IOxReadFilterNode { +impl IOxReadFilterNode { /// Create a execution plan node that reads data from `chunks` producing /// output according to schema, while applying `predicate` and /// returns @@ -47,7 +47,7 @@ impl IOxReadFilterNode { ctx: IOxExecutionContext, table_name: Arc, iox_schema: Arc, - chunks: Vec>, + chunks: Vec>, predicate: Predicate, ) -> Self { Self { @@ -62,7 +62,7 @@ impl IOxReadFilterNode { } #[async_trait] -impl ExecutionPlan for IOxReadFilterNode { +impl ExecutionPlan for IOxReadFilterNode { fn as_any(&self) -> &dyn std::any::Any { self } @@ -91,7 +91,7 @@ impl ExecutionPlan for IOxReadFilterNode { ) -> datafusion::error::Result> { assert!(children.is_empty(), "no children expected in iox plan"); - let chunks: Vec> = self.chunks.to_vec(); + let chunks: Vec> = self.chunks.to_vec(); // For some reason when I used an automatically derived `Clone` implementation // the compiler didn't recognize the trait implementation diff --git a/query/src/pruning.rs b/query/src/pruning.rs index fc8b769a16..ccac64bd85 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -16,14 +16,13 @@ use observability_deps::tracing::{debug, trace}; use predicate::Predicate; use schema::Schema; -use crate::{group_by::Aggregate, QueryChunkMeta}; +use crate::group_by::Aggregate; +use crate::QueryChunk; /// Something that cares to be notified when pruning of chunks occurs pub trait PruningObserver { - type Observed; - /// Called when the specified chunk was pruned from observation - fn was_pruned(&self, _chunk: &Self::Observed) {} + fn was_pruned(&self, _chunk: &dyn QueryChunk) {} /// Called when no pruning can happen at all for some reason fn could_not_prune(&self, _reason: &str) {} @@ -35,15 +34,14 @@ pub trait PruningObserver { /// /// TODO(raphael): Perhaps this should return `Result>` instead of /// the [`PruningObserver`] plumbing -pub fn prune_chunks( +pub fn prune_chunks( observer: &O, table_schema: Arc, - chunks: Vec>, + chunks: Vec>, predicate: &Predicate, -) -> Vec> +) -> Vec> where - C: QueryChunkMeta, - O: PruningObserver, + O: PruningObserver, { let num_chunks = chunks.len(); trace!(num_chunks, %predicate, "Pruning chunks"); @@ -104,14 +102,14 @@ where pruned_chunks } -/// Wraps a collection of [`QueryChunkMeta`] and implements the [`PruningStatistics`] +/// Wraps a collection of [`QueryChunk`] and implements the [`PruningStatistics`] /// interface required by [`PruningPredicate`] -struct ChunkPruningStatistics<'a, C> { +struct ChunkPruningStatistics<'a> { table_schema: &'a Schema, - chunks: &'a [Arc], + chunks: &'a [Arc], } -impl<'a, C: QueryChunkMeta> ChunkPruningStatistics<'a, C> { +impl<'a> ChunkPruningStatistics<'a> { /// Returns the [`DataType`] for `column` fn column_type(&self, column: &Column) -> Option<&DataType> { let index = self.table_schema.find_index_of(&column.name)?; @@ -130,10 +128,7 @@ impl<'a, C: QueryChunkMeta> ChunkPruningStatistics<'a, C> { } } -impl<'a, C> PruningStatistics for ChunkPruningStatistics<'a, C> -where - C: QueryChunkMeta, -{ +impl<'a> PruningStatistics for ChunkPruningStatistics<'a> { fn min_values(&self, column: &Column) -> Option { let data_type = self.column_type(column)?; let summaries = self.column_summaries(column); @@ -232,7 +227,7 @@ mod test { use predicate::PredicateBuilder; use schema::merge::SchemaMerger; - use crate::{test::TestChunk, QueryChunk}; + use crate::{test::TestChunk, QueryChunk, QueryChunkMeta}; use super::*; @@ -478,7 +473,7 @@ mod test { assert_eq!(names(&pruned), vec!["chunk1"]); } - fn merge_schema(chunks: &[Arc]) -> Arc { + fn merge_schema(chunks: &[Arc]) -> Arc { let mut merger = SchemaMerger::new(); for chunk in chunks { merger = merger.merge(chunk.schema().as_ref()).unwrap(); @@ -500,19 +495,20 @@ mod test { "column1", None, Some(10), - )); + )) as Arc; let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats( "column1", Some(0), None, - )); + )) as Arc; let c3 = Arc::new( TestChunk::new("chunk3").with_i64_field_column_with_stats("column1", None, None), - ); + ) as Arc; - let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1")); + let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1")) + as Arc; let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -543,35 +539,35 @@ mod test { "column1", Some(0), Some(10), - )); + )) as Arc; let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats( "column1", Some(0), Some(1000), - )); + )) as Arc; let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats( "column1", Some(10), Some(20), - )); + )) as Arc; let c4 = Arc::new( TestChunk::new("chunk4").with_i64_field_column_with_stats("column1", None, None), - ); + ) as Arc; let c5 = Arc::new(TestChunk::new("chunk5").with_i64_field_column_with_stats( "column1", Some(10), None, - )); + )) as Arc; let c6 = Arc::new(TestChunk::new("chunk6").with_i64_field_column_with_stats( "column1", None, Some(20), - )); + )) as Arc; let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -601,19 +597,19 @@ mod test { TestChunk::new("chunk1") .with_i64_field_column_with_stats("column1", Some(0), Some(100)) .with_i64_field_column_with_stats("column2", Some(0), Some(4)), - ); + ) as Arc; let c2 = Arc::new( TestChunk::new("chunk2") .with_i64_field_column_with_stats("column1", Some(0), Some(1000)) .with_i64_field_column_with_stats("column2", Some(0), Some(4)), - ); + ) as Arc; let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats( "column2", Some(0), Some(4), - )); + )) as Arc; let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -645,7 +641,7 @@ mod test { None, 0, ), - ); + ) as Arc; // Has no nulls, can prune it out based on statistics alone let c2 = Arc::new( @@ -657,7 +653,7 @@ mod test { None, 0, ), - ); + ) as Arc; // Has nulls, can still can prune it out based on statistics alone let c3 = Arc::new( @@ -669,7 +665,7 @@ mod test { None, 1, // that one peksy null! ), - ); + ) as Arc; let predicate = PredicateBuilder::new() .add_expr( @@ -705,37 +701,37 @@ mod test { TestChunk::new("chunk1") .with_i64_field_column_with_stats("column1", Some(0), Some(1000)) .with_i64_field_column_with_stats("column2", Some(0), Some(4)), - ); + ) as Arc; let c2 = Arc::new( TestChunk::new("chunk2") .with_i64_field_column_with_stats("column1", Some(0), Some(10)) .with_i64_field_column_with_stats("column2", Some(0), Some(4)), - ); + ) as Arc; let c3 = Arc::new( TestChunk::new("chunk3") .with_i64_field_column_with_stats("column1", Some(0), Some(10)) .with_i64_field_column_with_stats("column2", Some(5), Some(10)), - ); + ) as Arc; let c4 = Arc::new( TestChunk::new("chunk4") .with_i64_field_column_with_stats("column1", Some(1000), Some(2000)) .with_i64_field_column_with_stats("column2", Some(0), Some(4)), - ); + ) as Arc; let c5 = Arc::new( TestChunk::new("chunk5") .with_i64_field_column_with_stats("column1", Some(0), Some(10)) .with_i64_field_column_no_stats("column2"), - ); + ) as Arc; let c6 = Arc::new( TestChunk::new("chunk6") .with_i64_field_column_no_stats("column1") .with_i64_field_column_with_stats("column2", Some(0), Some(4)), - ); + ) as Arc; let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100)).and(col("column2").lt(lit(5)))) @@ -753,7 +749,7 @@ mod test { assert_eq!(names(&pruned), vec!["chunk1", "chunk4", "chunk6"]); } - fn names(pruned: &[Arc]) -> Vec<&str> { + fn names(pruned: &[Arc]) -> Vec<&str> { pruned.iter().map(|p| p.table_name()).collect() } @@ -773,10 +769,10 @@ mod test { } impl PruningObserver for TestObserver { - type Observed = TestChunk; - - fn was_pruned(&self, chunk: &Self::Observed) { - self.events.borrow_mut().push(format!("{}: Pruned", chunk)) + fn was_pruned(&self, chunk: &dyn QueryChunk) { + self.events + .borrow_mut() + .push(format!("{}: Pruned", chunk.table_name())) } fn could_not_prune(&self, reason: &str) { diff --git a/query/src/test.rs b/query/src/test.rs index 913387a87d..5c2a4e021f 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -102,9 +102,7 @@ impl TestDatabase { #[async_trait] impl QueryDatabase for TestDatabase { - type Chunk = TestChunk; - - async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { + async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { // save last predicate *self.chunks_predicate.lock() = predicate.clone(); @@ -113,7 +111,7 @@ impl QueryDatabase for TestDatabase { .values() .flat_map(|x| x.values()) .filter(|x| x.table_name == table_name) - .cloned() + .map(|x| Arc::clone(x) as _) .collect() } @@ -975,7 +973,7 @@ impl QueryChunkMeta for TestChunk { } /// Return the raw data from the list of chunks -pub async fn raw_data(chunks: &[Arc]) -> Vec { +pub async fn raw_data(chunks: &[Arc]) -> Vec { let mut batches = vec![]; for c in chunks { let pred = Predicate::default(); diff --git a/query_tests/src/db.rs b/query_tests/src/db.rs index 42388e82bf..2094e061f8 100644 --- a/query_tests/src/db.rs +++ b/query_tests/src/db.rs @@ -1,13 +1,12 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::chunk_metadata::{ChunkAddr, ChunkId, ChunkOrder}; use datafusion::catalog::catalog::CatalogProvider; use db::Db; use predicate::rpc_predicate::QueryDatabaseMeta; use query::{ exec::{ExecutionContextProvider, IOxExecutionContext}, - QueryChunk, QueryChunkError, QueryChunkMeta, QueryDatabase, + QueryChunk, QueryDatabase, }; use self::sealed::AbstractDbInterface; @@ -53,13 +52,11 @@ impl CatalogProvider for AbstractDb { #[async_trait] impl QueryDatabase for AbstractDb { - type Chunk = AbstractChunk; - async fn chunks( &self, table_name: &str, predicate: &predicate::Predicate, - ) -> Vec> { + ) -> Vec> { self.0.chunks(table_name, predicate).await } @@ -83,87 +80,6 @@ impl QueryDatabaseMeta for AbstractDb { } } -#[derive(Debug)] -pub struct AbstractChunk(Arc); - -impl QueryChunk for AbstractChunk { - fn id(&self) -> ChunkId { - self.0.id() - } - - fn addr(&self) -> ChunkAddr { - self.0.addr() - } - - fn table_name(&self) -> &str { - self.0.table_name() - } - - fn may_contain_pk_duplicates(&self) -> bool { - self.0.may_contain_pk_duplicates() - } - - fn apply_predicate_to_metadata( - &self, - predicate: &predicate::Predicate, - ) -> Result { - self.0.apply_predicate_to_metadata(predicate) - } - - fn column_names( - &self, - ctx: IOxExecutionContext, - predicate: &predicate::Predicate, - columns: schema::selection::Selection<'_>, - ) -> Result, QueryChunkError> { - self.0.column_names(ctx, predicate, columns) - } - - fn column_values( - &self, - ctx: IOxExecutionContext, - column_name: &str, - predicate: &predicate::Predicate, - ) -> Result, QueryChunkError> { - self.0.column_values(ctx, column_name, predicate) - } - - fn read_filter( - &self, - ctx: IOxExecutionContext, - predicate: &predicate::Predicate, - selection: schema::selection::Selection<'_>, - ) -> Result { - self.0.read_filter(ctx, predicate, selection) - } - - fn chunk_type(&self) -> &str { - self.0.chunk_type() - } - - fn order(&self) -> ChunkOrder { - self.0.order() - } -} - -impl QueryChunkMeta for AbstractChunk { - fn summary(&self) -> Option<&data_types::partition_metadata::TableSummary> { - self.0.summary() - } - - fn schema(&self) -> Arc { - self.0.schema() - } - - fn sort_key(&self) -> Option<&schema::sort::SortKey> { - self.0.sort_key() - } - - fn delete_predicates(&self) -> &[Arc] { - self.0.delete_predicates() - } -} - mod sealed { use super::*; @@ -182,7 +98,7 @@ mod sealed { &self, table_name: &str, predicate: &predicate::Predicate, - ) -> Vec>; + ) -> Vec>; fn record_query( &self, @@ -218,13 +134,8 @@ impl AbstractDbInterface for OldDb { &self, table_name: &str, predicate: &predicate::Predicate, - ) -> Vec> { - self.0 - .chunks(table_name, predicate) - .await - .into_iter() - .map(|c| Arc::new(AbstractChunk(c as _))) - .collect() + ) -> Vec> { + self.0.chunks(table_name, predicate).await } fn record_query( diff --git a/query_tests/src/table_schema.rs b/query_tests/src/table_schema.rs index d3aaff780b..79865f2a6e 100644 --- a/query_tests/src/table_schema.rs +++ b/query_tests/src/table_schema.rs @@ -1,7 +1,7 @@ //! Tests for the table_names implementation use arrow::datatypes::DataType; -use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; +use query::QueryDatabase; use schema::selection::Selection; use schema::{builder::SchemaBuilder, sort::SortKey, Schema, TIME_COLUMN_NAME}; diff --git a/server/tests/delete.rs b/server/tests/delete.rs index 39a6c2c18e..d44dbe9d4e 100644 --- a/server/tests/delete.rs +++ b/server/tests/delete.rs @@ -12,7 +12,7 @@ use db::{ Db, }; use futures::TryStreamExt; -use query::{QueryChunk, QueryChunkMeta, QueryDatabase}; +use query::{QueryChunk, QueryDatabase}; use server::{ rules::ProvidedDatabaseRules, test_utils::{make_application, make_initialized_server}, @@ -143,7 +143,8 @@ async fn delete_predicate_preservation() { async move { for chunk in db.chunks(table_name, &Default::default()).await { - let partition_key = chunk.addr().partition_key.as_ref(); + let addr = chunk.addr(); + let partition_key = addr.partition_key.as_ref(); if partition_key == "part_b" { // Strictly speaking not required because the chunk was persisted AFTER the delete predicate was // registered so we can get away with materializing it during persistence.