From 2a2c5889c313b24cac1295e3374af12c7224ddfa Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 8 Dec 2021 11:59:16 +0000 Subject: [PATCH] perf: loading chunks doesn't require sort --- query/src/frontend/reorg.rs | 91 +++++++++++++++++++++++++++++---- server/src/db/lifecycle/load.rs | 12 ++--- 2 files changed, 85 insertions(+), 18 deletions(-) diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index e4aeec6616..1f02c4dada 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -48,6 +48,38 @@ impl ReorgPlanner { Self::default() } + /// Creates an execution plan for a full scan of a single chunk. + /// This plan is primarilty used to load chunks from one storage medium to + /// another. + pub fn scan_single_chunk_plan( + &self, + schema: Arc, + chunk: Arc, + ) -> Result + where + C: QueryChunk + 'static, + { + let table_name = chunk.table_name(); + // Prepare the plan for the table + let mut builder = ProviderBuilder::new(table_name, schema); + + // There are no predicates in these plans, so no need to prune them + builder = builder.add_no_op_pruner(); + builder = builder.add_chunk(Arc::clone(&chunk)); + + let provider = builder.build().context(CreatingProvider { table_name })?; + + // Logical plan to scan all columns with no predicates + let plan = LogicalPlanBuilder::scan(table_name, Arc::new(provider) as _, None) + .context(BuildingPlan)? + .build() + .context(BuildingPlan)?; + + debug!(%table_name, plan=%plan.display_indent_schema(), + "created single chunk scan plan"); + Ok(plan) + } + /// Creates an execution plan for the COMPACT operations which does the following: /// /// 1. Merges chunks together into a single stream @@ -71,7 +103,7 @@ impl ReorgPlanner { let ScanPlan { plan_builder, provider, - } = self.sorted_scan_plan(schema, chunks)?; + } = self.scan_plan(schema, chunks, true)?; let mut schema = provider.iox_schema(); @@ -154,7 +186,7 @@ impl ReorgPlanner { let ScanPlan { plan_builder, provider, - } = self.sorted_scan_plan(schema, chunks)?; + } = self.scan_plan(schema, chunks, true)?; let mut schema = provider.iox_schema(); @@ -186,14 +218,15 @@ impl ReorgPlanner { } /// Creates a scan plan for the given set of chunks. - /// Output data of the scan will be deduplicated and sorted - /// on the optimal sort order of the chunks' PK columns (tags and time). + /// Output data of the scan will be deduplicated sorted if `sort=true` on + /// the optimal sort order of the chunks' PK columns (tags and time). + /// /// The optimal sort order is computed based on the PK columns cardinality /// that will be best for RLE encoding. /// - /// Prefer to query::provider::build_scan_plan for the detail of the plan + /// Refer to query::provider::build_scan_plan for the detail of the plan /// - fn sorted_scan_plan(&self, schema: Arc, chunks: I) -> Result> + fn scan_plan(&self, schema: Arc, chunks: I, sort: bool) -> Result> where C: QueryChunk + 'static, I: IntoIterator>, @@ -207,9 +240,10 @@ impl ReorgPlanner { // Prepare the plan for the table let mut builder = ProviderBuilder::new(table_name, schema); - - // Tell the scan of this provider to sort its output on the chunks' PK - builder.ensure_pk_sort(); + if sort { + // Tell the scan of this provider to sort its output on the chunks' PK + builder.ensure_pk_sort(); + } // There are no predicates in these plans, so no need to prune them builder = builder.add_no_op_pruner(); @@ -318,6 +352,45 @@ mod test { (Arc::new(schema), vec![chunk1, chunk2]) } + #[tokio::test] + async fn test_scan_plan() { + test_helpers::maybe_start_logging(); + + let (schema, chunks) = get_test_chunks().await; + let scan_plan = ReorgPlanner::new() + .scan_single_chunk_plan(schema, chunks.into_iter().next().unwrap()) + .expect("created compact plan"); + + let executor = Executor::new(1); + let physical_plan = executor + .new_context(ExecutorType::Reorg) + .prepare_plan(&scan_plan) + .await + .unwrap(); + + // single chunk processed + assert_eq!(physical_plan.output_partitioning().partition_count(), 1); + + let batches = datafusion::physical_plan::collect(physical_plan) + .await + .unwrap(); + + // all data from chunk + let expected = vec![ + "+-----------+------------+------+--------------------------------+", + "| field_int | field_int2 | tag1 | time |", + "+-----------+------------+------+--------------------------------+", + "| 1000 | | MT | 1970-01-01T00:00:00.000001Z |", + "| 10 | | MT | 1970-01-01T00:00:00.000007Z |", + "| 70 | | CT | 1970-01-01T00:00:00.000000100Z |", + "| 100 | | AL | 1970-01-01T00:00:00.000000050Z |", + "| 5 | | MT | 1970-01-01T00:00:00.000005Z |", + "+-----------+------------+------+--------------------------------+", + ]; + + assert_batches_eq!(&expected, &batches); + } + #[tokio::test] async fn test_compact_plan() { test_helpers::maybe_start_logging(); diff --git a/server/src/db/lifecycle/load.rs b/server/src/db/lifecycle/load.rs index fce489dc5a..a35cffa0a1 100644 --- a/server/src/db/lifecycle/load.rs +++ b/server/src/db/lifecycle/load.rs @@ -8,7 +8,7 @@ use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; use query::exec::ExecutorType; use query::frontend::reorg::ReorgPlanner; -use query::{compute_sort_key, QueryChunkMeta}; +use query::QueryChunkMeta; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; use crate::db::lifecycle::collect_rub; @@ -43,14 +43,8 @@ pub fn load_chunk( let ctx = db.exec.new_context(ExecutorType::Reorg); let fut = async move { - let key = compute_sort_key(std::iter::once(db_chunk.summary())); - - // Cannot move query_chunks as the sort key borrows the column names - let (_, plan) = ReorgPlanner::new().compact_plan( - db_chunk.schema(), - std::iter::once(Arc::clone(&db_chunk)), - key, - )?; + let plan = + ReorgPlanner::new().scan_single_chunk_plan(db_chunk.schema(), Arc::clone(&db_chunk))?; let physical_plan = ctx.prepare_plan(&plan).await?; let stream = ctx.execute_stream(physical_plan).await?;