From de148337e8cf7a8043e0b95d39f076e73ba4e2e8 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 6 Oct 2021 17:43:48 -0400 Subject: [PATCH 1/8] fix: half way fix the bug to inlcude schema of column in delete predicate into the schema of IOx scan to avoid missing reading columns --- nga.out | 6 ++ query/src/exec/context.rs | 2 + query/src/provider.rs | 37 +++++++- query/src/pruning.rs | 1 + query/src/util.rs | 5 ++ query_tests/src/sql.rs | 184 +++++++++++++++++++++++++++++++++++++- 6 files changed, 232 insertions(+), 3 deletions(-) create mode 100644 nga.out diff --git a/nga.out b/nga.out new file mode 100644 index 0000000000..b22fd7eaea --- /dev/null +++ b/nga.out @@ -0,0 +1,6 @@ + +running 1 test +test sql::sql_select_with_delete_from_multi_exprs_count ... ignored + +test result: ok. 0 passed; 0 failed; 1 ignored; 0 measured; 139 filtered out; finished in 0.00s + diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index 4183280896..55290192e8 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -254,6 +254,8 @@ impl IOxExecutionContext { let ctx = self.child_ctx("prepare_sql"); debug!(text=%sql, "planning SQL query"); let logical_plan = ctx.inner.create_logical_plan(sql)?; + let s = logical_plan.display_graphviz().to_string(); + debug!(?s, "logical plan"); ctx.prepare_plan(&logical_plan).await } diff --git a/query/src/provider.rs b/query/src/provider.rs index afbd411b1c..c6904af751 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -250,6 +250,8 @@ impl TableProvider for ChunkTableProvider { Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)), None => Arc::clone(&self.iox_schema), }; + println!(" ===== scan_schema (-> output_schema) in scan: {:#?}", scan_schema); + // This debug shows the self.arrow_schema() includes all columns in all chunks // which means the schema of all chunks are merged before invoking this scan @@ -378,6 +380,8 @@ impl Deduplicater { predicate: Predicate, sort_output: bool, ) -> Result> { + println!(" ===== output_schema in build_scan_plan: {:#?}", output_schema); + // Initialize an empty sort key let mut output_sort_key = SortKey::with_capacity(0); if sort_output { @@ -540,6 +544,9 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { + + println!(" ===== output_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", output_schema); + // Note that we may need to sort/deduplicate based on tag // columns which do not appear in the output @@ -553,6 +560,7 @@ impl Deduplicater { let pk_schema = Self::compute_pk_schema(&chunks); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); + println!(" ===== input_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", input_schema); // Compute the output sort key for these chunks let sort_key = if !output_sort_key.is_empty() { @@ -636,8 +644,10 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { + println!(" ===== output_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", output_schema); let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); + println!(" ===== input_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", input_schema); // Compute the output sort key for this chunk let mut sort_key = if !output_sort_key.is_empty() { @@ -764,13 +774,26 @@ impl Deduplicater { predicate: Predicate, // This is the select predicate of the query output_sort_key: &SortKey<'_>, ) -> Result> { + println!(" ===== output_schema in build_sort_plan_for_read_filter {:#?}", output_schema); + + // There are delete predicate, needs to add all of its column into the schema of the data + let schema = output_schema; + if chunk.has_delete_predicates() { + // NGA todo: + // build schema of columns in delete expression + // merge that column into the schema + } + // Create the bottom node IOxReadFilterNode for this chunk + // NGA: correct for MUB but incorrect for RUB (not include everything) + println!(" ========== predicate: {:#?}", predicate); let mut input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), - output_schema, + schema, vec![Arc::clone(&chunk)], predicate, )); + println!(" ========== input plan in build_sort_plan_for_read_filter: {:#?}", input); // Add Filter operator, FilterExec, if the chunk has delete predicates let del_preds = chunk.delete_predicates(); @@ -875,9 +898,19 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { + println!(" ===== output_schema in build_plan_for_non_duplicates_chunk: {:#?}", output_schema); + + let mut input_schema = output_schema; + // There is sort key, must add columns of the sort keys (which is a part of the primary key) + if !output_sort_key.is_empty() { + let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); + input_schema = Self::compute_input_schema(&input_schema, &pk_schema); + } + println!(" ===== input_schema in build_plan_for_non_duplicates_chunk: {:#?}", input_schema); + Self::build_sort_plan_for_read_filter( table_name, - output_schema, + input_schema, chunk, predicate, output_sort_key, diff --git a/query/src/pruning.rs b/query/src/pruning.rs index ba75364e4d..e4c3444de3 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -46,6 +46,7 @@ where return summaries; } }; + trace!(%filter_expr, "Filter_expr of pruning chunks"); // TODO: performance optimization: batch the chunk pruning by // grouping the chunks with the same types for all columns diff --git a/query/src/util.rs b/query/src/util.rs index 938fa74a96..67638a4e5b 100644 --- a/query/src/util.rs +++ b/query/src/util.rs @@ -15,6 +15,7 @@ use datafusion::{ }, }; use internal_types::schema::{sort::SortKey, Schema}; +use observability_deps::tracing::trace; /// Create a logical plan that produces the record batch pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result { @@ -88,6 +89,10 @@ pub fn df_physical_expr( let input_physical_schema = input.schema(); let input_logical_schema: DFSchema = input_physical_schema.as_ref().clone().try_into()?; + trace!(%expr, "logical expression"); + trace!(%input_logical_schema, "input logical schema"); + trace!(%input_physical_schema, "input physical schema"); + physical_planner.create_physical_expr( &expr, &input_logical_schema, diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index c308d7dd3b..847f2132d6 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -870,14 +870,53 @@ async fn sql_select_all_different_tags_chunks() { // Delete tests #[tokio::test] async fn sql_select_with_delete_from_one_expr_delete_all() { - let expected = vec!["++", "++"]; + // select * + let expected = vec!["++", "++"]; run_sql_test_case( scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {}, "SELECT * from cpu", &expected, ) .await; + + // select a specific column + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {}, + "SELECT time from cpu", + &expected, + ) + .await; + + // Count + let expected = vec![ + "+-----------------+----------------+-----------------+", + "| COUNT(UInt8(1)) | COUNT(cpu.bar) | COUNT(cpu.time) |", + "+-----------------+----------------+-----------------+", + "| 0 | 0 | 0 |", + "+-----------------+----------------+-----------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {}, + "SELECT count(*), count(bar), count(time) from cpu", + &expected, + ) + .await; + + // Min & Max + let expected = vec![ + "+--------------+--------------+---------------+---------------+", + "| MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", + "+--------------+--------------+---------------+---------------+", + "| | | | |", + "+--------------+--------------+---------------+---------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunkDeleteAll {}, + "SELECT min(bar), max(bar), min(time), max(time) from cpu", + &expected, + ) + .await; } #[tokio::test] @@ -896,6 +935,51 @@ async fn sql_select_with_delete_from_one_expr() { &expected, ) .await; + + // + let expected = vec![ + "+--------------------------------+-----+", + "| time | bar |", + "+--------------------------------+-----+", + "| 1970-01-01T00:00:00.000000020Z | 2 |", + "+--------------------------------+-----+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunk {}, + "SELECT time, bar from cpu", + &expected, + ) + .await; + + // Count + let expected = vec![ + "+-----------------+-----------------+----------------+", + "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) |", + "+-----------------+-----------------+----------------+", + "| 1 | 1 | 1 |", + "+-----------------+-----------------+----------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunk {}, + "SELECT count(time), count(*), count(bar) from cpu", + &expected, + ) + .await; + + // Min & Max + let expected = vec![ + "+--------------+--------------+--------------------------------+--------------------------------+", + "| MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", + "+--------------+--------------+--------------------------------+--------------------------------+", + "| 2 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000020Z |", + "+--------------+--------------+--------------------------------+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunk {}, + "SELECT min(bar), max(bar), min(time), max(time) from cpu", + &expected, + ) + .await; } #[tokio::test] @@ -925,6 +1009,21 @@ async fn sql_select_with_delete_from_one_expr_with_select_predicate() { &expected, ) .await; + + // Count, min and max + let expected = vec![ + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + "| 1 | 1 | 1 | 2 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000020Z |", + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunk {}, + "SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu", + &expected, + ) + .await; } #[tokio::test] @@ -944,8 +1043,61 @@ async fn sql_select_with_delete_from_multi_exprs() { &expected, ) .await; + + // + let expected = vec![ + "+-----+", + "| bar |", + "+-----+", + "| 1 |", + "| 2 |", + "+-----+", + ]; + + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT bar from cpu", + &expected, + ) + .await; + + // Count, min and max + // BUG when data in RUB but delete happens at open MUB + // let expected = vec![ + // "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + // "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", + // "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + // "| 2 | 2 | 2 | 1 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000040Z |", + // "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + // ]; + // run_sql_test_case( + // scenarios::delete::OneDeleteMultiExprsOneChunk {}, + // "SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu", + // &expected, + // ) + // .await; } +#[ignore] +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_count() { + + let expected = vec![ + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + "| 2 | 2 | 2 | 1 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000040Z |", + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT count(foo) from cpu", + &expected, + ) + .await; +} + + #[tokio::test] async fn sql_select_with_delete_from_multi_exprs_with_select_predicate() { // not eliminate extra row @@ -1174,4 +1326,34 @@ async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate() &expected, ) .await; + + // ---- + // min, max & count + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT min(bar) from cpu;", + &expected, + ) + .await; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT max(foo) from cpu;", + &expected, + ) + .await; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(bar) from cpu;", + &expected, + ) + .await; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(*) from cpu;", + &expected, + ) + .await; } From 22d6f11bea1ffe81e4070082f532101f96f18693 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 7 Oct 2021 17:37:34 -0400 Subject: [PATCH 2/8] fix: add cols of delete predicates into the schema of scanning columns --- Cargo.lock | 1 + predicate/Cargo.toml | 1 + predicate/src/delete_predicate.rs | 18 ++ query/src/lib.rs | 36 ++- query/src/provider.rs | 74 ++++-- query_tests/src/sql.rs | 416 +++++++++++++++++++++++++----- 6 files changed, 454 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d0107e218..972eb88632 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2963,6 +2963,7 @@ dependencies = [ "datafusion 0.1.0", "datafusion_util", "generated_types", + "hashbrown 0.11.2", "internal_types", "observability_deps", "ordered-float 2.8.0", diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index e3f8514293..07e08d5077 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -10,6 +10,7 @@ data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } generated_types = { path = "../generated_types" } +hashbrown = "0.11" internal_types = { path = "../internal_types" } observability_deps = { path = "../observability_deps" } ordered-float = "2" diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index 56e9b6b421..d7bad8c546 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -3,6 +3,7 @@ use std::convert::TryInto; use chrono::DateTime; use data_types::timestamp::TimestampRange; use datafusion::logical_plan::{lit, Column, Expr, Operator}; +use internal_types::schema::TIME_COLUMN_NAME; use snafu::{ResultExt, Snafu}; use sqlparser::{ ast::{BinaryOperator, Expr as SqlParserExpr, Ident, Statement, Value}, @@ -99,6 +100,23 @@ impl DeletePredicate { exprs: delete_exprs, }) } + + /// Return all columns participating in the expressions of delete predicate minus the time column if any + pub fn all_column_names_but_time(&self) -> hashbrown::HashSet<&str> { + // println!(" ===== del exprs: {:#?}", self.exprs); + + // Get column names of the predicate expressions + let cols = self + .exprs + .iter() + .map(|e| e.column()) + .filter(|col_name| col_name != &TIME_COLUMN_NAME) + .collect::>(); + + // println!(" ===== cols of del exprs: {:#?}", cols); + + cols + } } impl From for crate::predicate::Predicate { diff --git a/query/src/lib.rs b/query/src/lib.rs index f842af55c7..a07f93751f 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -24,8 +24,8 @@ use predicate::{ predicate::{Predicate, PredicateMatch}, }; -use hashbrown::HashMap; -use std::{fmt::Debug, sync::Arc}; +use hashbrown::{HashMap, HashSet}; +use std::{fmt::Debug, iter::FromIterator, sync::Arc}; pub mod exec; pub mod frontend; @@ -55,6 +55,38 @@ pub trait QueryChunkMeta: Sized { fn has_delete_predicates(&self) -> bool { !self.delete_predicates().is_empty() } + + /// return column names participating in the all delete predicates + /// in lexicographical order with one exception that time column is last + /// This order is to be consistent with Schema::primary_key + #[warn(clippy::stable_sort_primitive)] + fn delete_predicate_columns(&self) -> Vec<&str> { + // get all column names but time + let mut col_names: HashSet<&str> = hashbrown::HashSet::new(); + for pred in self.delete_predicates() { + let cols = pred.all_column_names_but_time(); + // println!(" ===== in loop - all_column_names_but_time: {:#?}", cols); + for col in cols { + col_names.insert(col); + } + } + // println!(" ===== col_names in pred exprs: {:#?}", col_names); + + // convert to vector + let mut column_names: Vec<&str> = Vec::from_iter(col_names); + // println!(" ===== vector column_names: {:#?}", column_names); + + // Sort it + column_names.sort_unstable(); + // println!(" ===== vector column_names after sorting: {:#?}", column_names); + + // Now add time column to the end of the vector + // Since time range is a must in the delete predicate, time column must be in this list + column_names.push(TIME_COLUMN_NAME); + // println!(" ===== vector column_names with time: {:#?}", column_names); + + column_names + } } /// A `Database` is the main trait implemented by the IOx subsystems diff --git a/query/src/provider.rs b/query/src/provider.rs index c6904af751..2ed8acdedd 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -89,6 +89,11 @@ impl From for DataFusionError { } } +enum ColumnType { + PrimaryKey, + DeletePredicate, +} + /// Something that can prune chunks based on their metadata pub trait ChunkPruner: Sync + Send + std::fmt::Debug { /// prune `chunks`, if possible, based on predicate. @@ -250,8 +255,7 @@ impl TableProvider for ChunkTableProvider { Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)), None => Arc::clone(&self.iox_schema), }; - println!(" ===== scan_schema (-> output_schema) in scan: {:#?}", scan_schema); - + // println!(" ===== scan_schema (-> output_schema) in scan: {:#?}", scan_schema); // This debug shows the self.arrow_schema() includes all columns in all chunks // which means the schema of all chunks are merged before invoking this scan @@ -380,7 +384,7 @@ impl Deduplicater { predicate: Predicate, sort_output: bool, ) -> Result> { - println!(" ===== output_schema in build_scan_plan: {:#?}", output_schema); + // println!(" ===== output_schema in build_scan_plan: {:#?}", output_schema); // Initialize an empty sort key let mut output_sort_key = SortKey::with_capacity(0); @@ -544,8 +548,7 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { - - println!(" ===== output_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", output_schema); + // println!(" ===== output_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", output_schema); // Note that we may need to sort/deduplicate based on tag // columns which do not appear in the output @@ -560,7 +563,7 @@ impl Deduplicater { let pk_schema = Self::compute_pk_schema(&chunks); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); - println!(" ===== input_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", input_schema); + // println!(" ===== input_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", input_schema); // Compute the output sort key for these chunks let sort_key = if !output_sort_key.is_empty() { @@ -644,10 +647,10 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { - println!(" ===== output_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", output_schema); + // println!(" ===== output_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", output_schema); let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); - println!(" ===== input_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", input_schema); + // println!(" ===== input_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", input_schema); // Compute the output sort key for this chunk let mut sort_key = if !output_sort_key.is_empty() { @@ -774,26 +777,29 @@ impl Deduplicater { predicate: Predicate, // This is the select predicate of the query output_sort_key: &SortKey<'_>, ) -> Result> { - println!(" ===== output_schema in build_sort_plan_for_read_filter {:#?}", output_schema); + // println!(" ===== output_schema in build_sort_plan_for_read_filter {:#?}", output_schema); - // There are delete predicate, needs to add all of its column into the schema of the data - let schema = output_schema; - if chunk.has_delete_predicates() { - // NGA todo: + // If there are delete predicates, their columns must be added into the schema of + // the reading data in IOxReadFilterNode + let mut schema = output_schema; + if chunk.has_delete_predicates() { + // NGA todo: // build schema of columns in delete expression + let pred_schema = Self::compute_delete_predicate_schema(&[Arc::clone(&chunk)]); // merge that column into the schema + schema = Self::compute_input_schema(&schema, &pred_schema); } // Create the bottom node IOxReadFilterNode for this chunk // NGA: correct for MUB but incorrect for RUB (not include everything) - println!(" ========== predicate: {:#?}", predicate); + // println!(" ========== predicate: {:#?}", predicate); let mut input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), schema, vec![Arc::clone(&chunk)], predicate, )); - println!(" ========== input plan in build_sort_plan_for_read_filter: {:#?}", input); + // println!(" ========== input plan in build_sort_plan_for_read_filter: {:#?}", input); // Add Filter operator, FilterExec, if the chunk has delete predicates let del_preds = chunk.delete_predicates(); @@ -898,15 +904,15 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { - println!(" ===== output_schema in build_plan_for_non_duplicates_chunk: {:#?}", output_schema); - + // println!(" ===== output_schema in build_plan_for_non_duplicates_chunk: {:#?}", output_schema); + let mut input_schema = output_schema; // There is sort key, must add columns of the sort keys (which is a part of the primary key) if !output_sort_key.is_empty() { let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); input_schema = Self::compute_input_schema(&input_schema, &pk_schema); } - println!(" ===== input_schema in build_plan_for_non_duplicates_chunk: {:#?}", input_schema); + // println!(" ===== input_schema in build_plan_for_non_duplicates_chunk: {:#?}", input_schema); Self::build_sort_plan_for_read_filter( table_name, @@ -994,21 +1000,37 @@ impl Deduplicater { .all(|chunk| chunk.delete_predicates().is_empty()) } - /// Find the columns needed in the primary key across schemas + /// Find the columns needed in a given ColumnType across schemas /// /// Note by the time we get down here, we have already checked /// the chunks for compatible schema, so we use unwrap (perhaps /// famous last words, but true at time of writing) - fn compute_pk_schema(chunks: &[Arc]) -> Arc { - let mut pk_schema_merger = SchemaMerger::new(); + fn compute_schema_for_column_type(chunks: &[Arc], col_type: ColumnType) -> Arc { + let mut schema_merger = SchemaMerger::new(); for chunk in chunks { let chunk_schema = chunk.schema(); - let chunk_pk = chunk_schema.primary_key(); - let chunk_pk_schema = chunk_schema.select_by_names(&chunk_pk).unwrap(); - pk_schema_merger = pk_schema_merger.merge(&chunk_pk_schema).unwrap(); + let cols = match col_type { + ColumnType::PrimaryKey => chunk_schema.primary_key(), + ColumnType::DeletePredicate => chunk.delete_predicate_columns() + }; + // println!(" ===== cols: {:#?}", cols); + let chunk_cols_schema = chunk_schema.select_by_names(&cols).unwrap(); + schema_merger = schema_merger.merge(&chunk_cols_schema).unwrap(); } - let pk_schema = pk_schema_merger.build(); - Arc::new(pk_schema) + let cols_schema = schema_merger.build(); + // println!(" ===== cols_schema: {:#?}", cols_schema); + + Arc::new(cols_schema) + } + + /// Find the columns needed in chunks' delete predicates across schemas + fn compute_delete_predicate_schema(chunks: &[Arc]) -> Arc { + Self::compute_schema_for_column_type(chunks, ColumnType::DeletePredicate) + } + + /// Find the columns needed in chunks' primary keys across schemas + fn compute_pk_schema(chunks: &[Arc]) -> Arc { + Self::compute_schema_for_column_type(chunks, ColumnType::PrimaryKey) } /// Find columns required to read from each scan: the output columns + the diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 847f2132d6..5b01e57ed2 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -870,7 +870,6 @@ async fn sql_select_all_different_tags_chunks() { // Delete tests #[tokio::test] async fn sql_select_with_delete_from_one_expr_delete_all() { - // select * let expected = vec!["++", "++"]; run_sql_test_case( @@ -936,7 +935,7 @@ async fn sql_select_with_delete_from_one_expr() { ) .await; - // + // select all explicit columns let expected = vec![ "+--------------------------------+-----+", "| time | bar |", @@ -950,33 +949,40 @@ async fn sql_select_with_delete_from_one_expr() { &expected, ) .await; +} - // Count +#[tokio::test] +async fn sql_select_with_delete_from_one_expr_min_max() { + // Min & Max of bar only let expected = vec![ - "+-----------------+-----------------+----------------+", - "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) |", - "+-----------------+-----------------+----------------+", - "| 1 | 1 | 1 |", - "+-----------------+-----------------+----------------+", + "+--------------+--------------+", + "| MIN(cpu.bar) | MAX(cpu.bar) |", + "+--------------+--------------+", + "| 2 | 2 |", + "+--------------+--------------+", ]; run_sql_test_case( scenarios::delete::OneDeleteSimpleExprOneChunk {}, - "SELECT count(time), count(*), count(bar) from cpu", + "SELECT min(bar), max(bar) from cpu", &expected, ) .await; +} - // Min & Max +#[ignore] +#[tokio::test] +async fn sql_select_with_delete_from_one_expr_count_max_time() { + // Count and max on one column and no cover all 2 columns, time and bar, of the delete predicate let expected = vec![ - "+--------------+--------------+--------------------------------+--------------------------------+", - "| MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", - "+--------------+--------------+--------------------------------+--------------------------------+", - "| 2 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000020Z |", - "+--------------+--------------+--------------------------------+--------------------------------+", + "+-----------------+--------------------------------+", + "| COUNT(cpu.time) | MAX(cpu.time) |", + "+-----------------+--------------------------------+", + "| 1 | 1970-01-01T00:00:00.000000020Z |", + "+-----------------+--------------------------------+", ]; run_sql_test_case( scenarios::delete::OneDeleteSimpleExprOneChunk {}, - "SELECT min(bar), max(bar), min(time), max(time) from cpu", + "SELECT count(time), max(time) from cpu", &expected, ) .await; @@ -1046,12 +1052,7 @@ async fn sql_select_with_delete_from_multi_exprs() { // let expected = vec![ - "+-----+", - "| bar |", - "+-----+", - "| 1 |", - "| 2 |", - "+-----+", + "+-----+", "| bar |", "+-----+", "| 1 |", "| 2 |", "+-----+", ]; run_sql_test_case( @@ -1060,28 +1061,15 @@ async fn sql_select_with_delete_from_multi_exprs() { &expected, ) .await; - - // Count, min and max - // BUG when data in RUB but delete happens at open MUB - // let expected = vec![ - // "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", - // "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", - // "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", - // "| 2 | 2 | 2 | 1 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000040Z |", - // "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", - // ]; - // run_sql_test_case( - // scenarios::delete::OneDeleteMultiExprsOneChunk {}, - // "SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu", - // &expected, - // ) - // .await; } +// BUG Running scenario 'Deleted data from one RUB chunk, with 1 deletes from open MUB' +// SQL: '"SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu"' +// thread 'sql::sql_select_with_delete_from_multi_exprs' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 5"))', query_tests/src/sql.rs:37:74 #[ignore] #[tokio::test] -async fn sql_select_with_delete_from_multi_exprs_count() { - +async fn sql_select_with_delete_from_multi_exprs_agg() { + // Count, min and max on many columns but not `foo` that is included in delete predicate let expected = vec![ "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", @@ -1089,6 +1077,25 @@ async fn sql_select_with_delete_from_multi_exprs_count() { "| 2 | 2 | 2 | 1 | 2 | 1970-01-01T00:00:00.000000020Z | 1970-01-01T00:00:00.000000040Z |", "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_count_col() { + let expected = vec![ + "+----------------+", + "| COUNT(cpu.foo) |", + "+----------------+", + "| 2 |", + "+----------------+", + ]; + + // OneDeleteMultiExprsOneChunk's delete predicates include columns foo, bar and time run_sql_test_case( scenarios::delete::OneDeleteMultiExprsOneChunk {}, "SELECT count(foo) from cpu", @@ -1097,6 +1104,81 @@ async fn sql_select_with_delete_from_multi_exprs_count() { .await; } +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_count_star() { + let expected = vec![ + "+-----------------+", + "| COUNT(UInt8(1)) |", + "+-----------------+", + "| 2 |", + "+-----------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT count(*) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_min() { + let expected = vec![ + "+--------------+", + "| MIN(cpu.bar) |", + "+--------------+", + "| 1 |", + "+--------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT min(bar) from cpu", + &expected, + ) + .await; +} + +// BUG +// Running scenario 'Deleted data from one Open MUB chunk, with 1 deletes from open MUB' +// SQL: '"SELECT max(foo) from cpu"' +// thread 'sql::sql_select_with_delete_from_multi_exprs_max' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 0"))', query_tests/src/sql.rs:37:74 +#[ignore] +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_max() { + let expected = vec![ + "+----------------+", + "| COUNT(cpu.foo) |", + "+----------------+", + "| 2 |", + "+----------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT max(foo) from cpu", + &expected, + ) + .await; +} + +// BUG: scenario 'Deleted data from one RUB chunk, with 1 deletes from open MUB' +// return wrong result +#[ignore] +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_max_time() { + let expected = vec![ + "+--------------------------------+", + "| MAX(cpu.time) |", + "+--------------------------------+", + "| 1970-01-01T00:00:00.000000040Z |", + "+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT max(time) from cpu", + &expected, + ) + .await; +} #[tokio::test] async fn sql_select_with_delete_from_multi_exprs_with_select_predicate() { @@ -1214,10 +1296,102 @@ async fn sql_select_with_three_deletes_from_three_chunks() { .await; } -// Bug: https://github.com/influxdata/influxdb_iox/issues/2745 +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_count_star() { + let expected = vec![ + "+-----------------+", + "| COUNT(UInt8(1)) |", + "+-----------------+", + "| 7 |", + "+-----------------+", + ]; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(*) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_count_time() { + let expected = vec![ + "+-----------------+", + "| COUNT(cpu.time) |", + "+-----------------+", + "| 7 |", + "+-----------------+", + ]; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(time) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_count_foo() { + let expected = vec![ + "+----------------+", + "| COUNT(cpu.foo) |", + "+----------------+", + "| 7 |", + "+----------------+", + ]; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(foo) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_count_bar() { + let expected = vec![ + "+----------------+", + "| COUNT(cpu.bar) |", + "+----------------+", + "| 7 |", + "+----------------+", + ]; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(bar) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_min_bar() { + let expected = vec![ + "+--------------+", + "| MIN(cpu.bar) |", + "+--------------+", + "| 1 |", + "+--------------+", + ]; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT min(bar) from cpu", + &expected, + ) + .await; +} + +// BUG scenario 'Deleted data from 3 chunks, - OS - RUB - Open MUB, with 3 deletes after all chunks are created' +// SQL: '"SELECT max(foo) from cpu"' +// thread 'sql::sql_select_with_three_deletes_from_three_chunks_max_foo' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 0"))', query_tests/src/sql.rs:37:74 #[ignore] #[tokio::test] -async fn sql_select_count_with_three_deletes_from_three_chunks() { +async fn sql_select_with_three_deletes_from_three_chunks_max_foo() { let expected = vec![ "+-----+-----+--------------------------------+", "| bar | foo | time |", @@ -1228,7 +1402,29 @@ async fn sql_select_count_with_three_deletes_from_three_chunks() { run_sql_test_case( scenarios::delete::ThreeDeleteThreeChunks {}, - "SELECT count(*) from cpu", + "SELECT max(foo) from cpu", + &expected, + ) + .await; +} + +// BUG: Running scenario 'Deleted data from 3 chunks, - OS - RUB - Open MUB, with 3 deletes after all chunks are created' +// SQL: '"SELECT min(time), max(time) from cpu"' +// thread 'sql::sql_select_with_three_deletes_from_three_chunks_min_max_time' panicked at 'Running plan: ArrowError(ExternalError(Internal("MIN/MAX is not expected to receive scalars of incompatible types (TimestampNanosecond(NULL), Float64(1))")))', query_tests/src/sql.rs:37:74 +#[ignore] +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_min_max_time() { + let expected = vec![ + "+-----+-----+--------------------------------+", + "| bar | foo | time |", + "+-----+-----+--------------------------------+", + "| 7 | me | 1970-01-01T00:00:00.000000080Z |", + "+-----+-----+--------------------------------+", + ]; + + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT min(time), max(time) from cpu", &expected, ) .await; @@ -1326,33 +1522,125 @@ async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate() &expected, ) .await; +} +// let expected = vec![ +// "+-----+-----+--------------------------------+", +// "| bar | foo | time |", +// "+-----+-----+--------------------------------+", +// "| 1 | me | 1970-01-01T00:00:00.000000040Z |", +// "| 1 | me | 1970-01-01T00:00:00.000000042Z |", +// "| 1 | me | 1970-01-01T00:00:00.000000062Z |", +// "| 4 | me | 1970-01-01T00:00:00.000000050Z |", +// "| 5 | me | 1970-01-01T00:00:00.000000060Z |", +// "| 7 | me | 1970-01-01T00:00:00.000000080Z |", +// "+-----+-----+--------------------------------+", +// ]; + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_min_bar() { // ---- // min, max & count - run_sql_test_case( - scenarios::delete::ThreeDeleteThreeChunks {}, - "SELECT min(bar) from cpu;", - &expected, - ) - .await; + let expected = vec![ + "+--------------+", + "| MIN(cpu.bar) |", + "+--------------+", + "| 1 |", + "+--------------+", + ]; run_sql_test_case( scenarios::delete::ThreeDeleteThreeChunks {}, - "SELECT max(foo) from cpu;", - &expected, - ) - .await; - - run_sql_test_case( - scenarios::delete::ThreeDeleteThreeChunks {}, - "SELECT count(bar) from cpu;", - &expected, - ) - .await; - - run_sql_test_case( - scenarios::delete::ThreeDeleteThreeChunks {}, - "SELECT count(*) from cpu;", + "SELECT min(bar) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)", + &expected, + ) + .await; +} + +// BUG: Running scenario 'Deleted data from 3 chunks, - OS - RUB - Open MUB, with 3 deletes after all chunks are created' +// SQL: '"SELECT max(foo) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)"' +// thread 'sql::sql_select_with_three_deletes_from_three_chunks_with_select_predicate_max_foo' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 0"))', query_tests/src/sql.rs:37:74 +#[ignore] +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_max_foo() { + let expected = vec![ + "+--------------+", + "| MAX(cpu.foo) |", + "+--------------+", + "| me |", + "+--------------+", + ]; + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT max(foo) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_min_time() { + let expected = vec![ + "+--------------------------------+", + "| MIN(cpu.time) |", + "+--------------------------------+", + "| 1970-01-01T00:00:00.000000040Z |", + "+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT min(time) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_count_bar() { + let expected = vec![ + "+----------------+", + "| COUNT(cpu.bar) |", + "+----------------+", + "| 6 |", + "+----------------+", + ]; + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(bar) from cpu where foo = 'me' and (bar > 2 or bar = 1.0);", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_count_time() { + let expected = vec![ + "+-----------------+", + "| COUNT(cpu.time) |", + "+-----------------+", + "| 6 |", + "+-----------------+", + ]; + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(time) from cpu where foo = 'me' and (bar > 2 or bar = 1.0);", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_count_star() { + let expected = vec![ + "+-----------------+", + "| COUNT(UInt8(1)) |", + "+-----------------+", + "| 6 |", + "+-----------------+", + ]; + run_sql_test_case( + scenarios::delete::ThreeDeleteThreeChunks {}, + "SELECT count(*) from cpu where foo = 'me' and (bar > 2 or bar = 1.0);", &expected, ) .await; From adbcd85c26a82057ad0b2ab609ae4a39b9e6eaba Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 8 Oct 2021 14:37:34 -0400 Subject: [PATCH 3/8] fix: fully fix 2745 --- nga.out | 6 - query/src/provider.rs | 57 ++---- query_tests/src/scenarios/delete.rs | 23 +++ query_tests/src/sql.rs | 294 +++++++++++++++++++++++++--- 4 files changed, 306 insertions(+), 74 deletions(-) delete mode 100644 nga.out diff --git a/nga.out b/nga.out deleted file mode 100644 index b22fd7eaea..0000000000 --- a/nga.out +++ /dev/null @@ -1,6 +0,0 @@ - -running 1 test -test sql::sql_select_with_delete_from_multi_exprs_count ... ignored - -test result: ok. 0 passed; 0 failed; 1 ignored; 0 measured; 139 filtered out; finished in 0.00s - diff --git a/query/src/provider.rs b/query/src/provider.rs index 2ed8acdedd..72bc207c2f 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -255,7 +255,6 @@ impl TableProvider for ChunkTableProvider { Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)), None => Arc::clone(&self.iox_schema), }; - // println!(" ===== scan_schema (-> output_schema) in scan: {:#?}", scan_schema); // This debug shows the self.arrow_schema() includes all columns in all chunks // which means the schema of all chunks are merged before invoking this scan @@ -384,8 +383,6 @@ impl Deduplicater { predicate: Predicate, sort_output: bool, ) -> Result> { - // println!(" ===== output_schema in build_scan_plan: {:#?}", output_schema); - // Initialize an empty sort key let mut output_sort_key = SortKey::with_capacity(0); if sort_output { @@ -548,8 +545,6 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { - // println!(" ===== output_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", output_schema); - // Note that we may need to sort/deduplicate based on tag // columns which do not appear in the output @@ -563,7 +558,6 @@ impl Deduplicater { let pk_schema = Self::compute_pk_schema(&chunks); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); - // println!(" ===== input_schema in build_deduplicate_plan_for_overlapped_chunks: {:#?}", input_schema); // Compute the output sort key for these chunks let sort_key = if !output_sort_key.is_empty() { @@ -647,10 +641,8 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { - // println!(" ===== output_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", output_schema); let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); - // println!(" ===== input_schema in build_deduplicate_plan_for_chunk_with_duplicates: {:#?}", input_schema); // Compute the output sort key for this chunk let mut sort_key = if !output_sort_key.is_empty() { @@ -777,29 +769,33 @@ impl Deduplicater { predicate: Predicate, // This is the select predicate of the query output_sort_key: &SortKey<'_>, ) -> Result> { - // println!(" ===== output_schema in build_sort_plan_for_read_filter {:#?}", output_schema); - - // If there are delete predicates, their columns must be added into the schema of - // the reading data in IOxReadFilterNode - let mut schema = output_schema; + // Add columns in sort key or delete predicates if not yet in the output schema + // This is needed because columns in select query may not include all columns of sort key and delete predicates + let mut input_schema = Arc::clone(&output_schema); + // + // Cols of sort keys (which is a part of the primary key) + if !output_sort_key.is_empty() { + // build schema of PK + let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); + // Merge it in the output_schema + input_schema = Self::compute_input_schema(&input_schema, &pk_schema); + } + // + // Cols of delete predicates if chunk.has_delete_predicates() { - // NGA todo: // build schema of columns in delete expression let pred_schema = Self::compute_delete_predicate_schema(&[Arc::clone(&chunk)]); // merge that column into the schema - schema = Self::compute_input_schema(&schema, &pred_schema); + input_schema = Self::compute_input_schema(&input_schema, &pred_schema); } // Create the bottom node IOxReadFilterNode for this chunk - // NGA: correct for MUB but incorrect for RUB (not include everything) - // println!(" ========== predicate: {:#?}", predicate); let mut input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), - schema, + input_schema, vec![Arc::clone(&chunk)], predicate, )); - // println!(" ========== input plan in build_sort_plan_for_read_filter: {:#?}", input); // Add Filter operator, FilterExec, if the chunk has delete predicates let del_preds = chunk.delete_predicates(); @@ -823,10 +819,11 @@ impl Deduplicater { // Add the sort operator, SortExec, if needed if !output_sort_key.is_empty() { - Self::build_sort_plan(chunk, input, output_sort_key) - } else { - Ok(input) + input = Self::build_sort_plan(chunk, input, output_sort_key)? } + + // select back to the requested output schema + Self::add_projection_node_if_needed(output_schema, input) } /// Add SortExec operator on top of the input plan of the given chunk @@ -904,19 +901,9 @@ impl Deduplicater { predicate: Predicate, output_sort_key: &SortKey<'_>, ) -> Result> { - // println!(" ===== output_schema in build_plan_for_non_duplicates_chunk: {:#?}", output_schema); - - let mut input_schema = output_schema; - // There is sort key, must add columns of the sort keys (which is a part of the primary key) - if !output_sort_key.is_empty() { - let pk_schema = Self::compute_pk_schema(&[Arc::clone(&chunk)]); - input_schema = Self::compute_input_schema(&input_schema, &pk_schema); - } - // println!(" ===== input_schema in build_plan_for_non_duplicates_chunk: {:#?}", input_schema); - Self::build_sort_plan_for_read_filter( table_name, - input_schema, + output_schema, chunk, predicate, output_sort_key, @@ -1011,14 +998,12 @@ impl Deduplicater { let chunk_schema = chunk.schema(); let cols = match col_type { ColumnType::PrimaryKey => chunk_schema.primary_key(), - ColumnType::DeletePredicate => chunk.delete_predicate_columns() + ColumnType::DeletePredicate => chunk.delete_predicate_columns(), }; - // println!(" ===== cols: {:#?}", cols); let chunk_cols_schema = chunk_schema.select_by_names(&cols).unwrap(); schema_merger = schema_merger.merge(&chunk_cols_schema).unwrap(); } let cols_schema = schema_merger.build(); - // println!(" ===== cols_schema: {:#?}", cols_schema); Arc::new(cols_schema) } diff --git a/query_tests/src/scenarios/delete.rs b/query_tests/src/scenarios/delete.rs index 2d38f6ecb9..874b37545f 100644 --- a/query_tests/src/scenarios/delete.rs +++ b/query_tests/src/scenarios/delete.rs @@ -71,6 +71,29 @@ impl DbSetup for OneDeleteSimpleExprOneChunk { } } +#[derive(Debug)] +/// Setup for many scenario move chunk from from MUB to RUB to OS +/// No delete in this case +pub struct NoDeleteOneChunk {} +#[async_trait] +impl DbSetup for NoDeleteOneChunk { + async fn make(&self) -> Vec { + let partition_key = "1970-01-01T00"; + let table_name = "cpu"; + // chunk data + let lp_lines = vec![ + "cpu,foo=me bar=1 10", + "cpu,foo=you bar=2 20", + "cpu,foo=me bar=1 30", + "cpu,foo=me bar=1 40", + ]; + + // this returns 15 scenarios + all_delete_scenarios_for_one_chunk(vec![], vec![], lp_lines, table_name, partition_key) + .await + } +} + #[derive(Debug)] /// Setup for multi-expression delete query test with one table and one chunk moved from MUB to RUB to OS pub struct OneDeleteMultiExprsOneChunk {} diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 5b01e57ed2..c4b70f8162 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -52,6 +52,21 @@ async fn sql_select_from_cpu() { run_sql_test_case(TwoMeasurements {}, "SELECT * from cpu", &expected).await; } +// BUG: https://github.com/influxdata/influxdb_iox/issues/2776 +#[ignore] +#[tokio::test] +async fn sql_select_from_cpu_min_utf8() { + let expected = vec![ + "+----------------+", + "| MIN(cpu.region |", + "+----------------+", + "| west |", + "| west |", + "+----------------+", + ]; + run_sql_test_case(TwoMeasurements {}, "SELECT min(region) from cpu", &expected).await; +} + #[tokio::test] async fn sql_select_from_cpu_2021() { let expected = vec![ @@ -918,6 +933,8 @@ async fn sql_select_with_delete_from_one_expr_delete_all() { .await; } +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_delete_from_one_expr() { let expected = vec![ @@ -951,6 +968,8 @@ async fn sql_select_with_delete_from_one_expr() { .await; } +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_delete_from_one_expr_min_max() { // Min & Max of bar only @@ -969,7 +988,42 @@ async fn sql_select_with_delete_from_one_expr_min_max() { .await; } -#[ignore] +#[tokio::test] +async fn sql_select_with_delete_from_one_expr_time() { + // one column, time, and no cover all 2 columns, time and bar, of the delete predicate + let expected = vec![ + "+--------------------------------+", + "| time |", + "+--------------------------------+", + "| 1970-01-01T00:00:00.000000020Z |", + "+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunk {}, + "SELECT time from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_delete_from_one_expr_max_time() { + // max on one column and no cover all 2 columns, time and bar, of the delete predicate + let expected = vec![ + "+--------------------------------+", + "| MAX(cpu.time) |", + "+--------------------------------+", + "| 1970-01-01T00:00:00.000000020Z |", + "+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunk {}, + "SELECT max(time) from cpu", + &expected, + ) + .await; +} + #[tokio::test] async fn sql_select_with_delete_from_one_expr_count_max_time() { // Count and max on one column and no cover all 2 columns, time and bar, of the delete predicate @@ -988,6 +1042,26 @@ async fn sql_select_with_delete_from_one_expr_count_max_time() { .await; } +#[tokio::test] +async fn sql_select_with_delete_from_one_expr_count_time() { + // Count and max on one column and no cover all 2 columns, time and bar, of the delete predicate + let expected = vec![ + "+-----------------+", + "| COUNT(cpu.time) |", + "+-----------------+", + "| 1 |", + "+-----------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteSimpleExprOneChunk {}, + "SELECT count(time) from cpu", + &expected, + ) + .await; +} + +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_delete_from_one_expr_with_select_predicate() { // Select predicate does not eliminate extra rows @@ -1032,6 +1106,8 @@ async fn sql_select_with_delete_from_one_expr_with_select_predicate() { .await; } +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_delete_from_multi_exprs() { let expected = vec![ @@ -1050,7 +1126,7 @@ async fn sql_select_with_delete_from_multi_exprs() { ) .await; - // + // select one column, delete predicates on all 3 columns of the table let expected = vec![ "+-----+", "| bar |", "+-----+", "| 1 |", "| 2 |", "+-----+", ]; @@ -1063,10 +1139,67 @@ async fn sql_select_with_delete_from_multi_exprs() { .await; } -// BUG Running scenario 'Deleted data from one RUB chunk, with 1 deletes from open MUB' -// SQL: '"SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu"' -// thread 'sql::sql_select_with_delete_from_multi_exprs' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 5"))', query_tests/src/sql.rs:37:74 +// -------------------------------------------------------- + +// tests without delete +#[tokio::test] +async fn sql_select_without_delete_agg() { + // Count, min and max on many columns but not `foo` that is included in delete predicate + let expected = vec![ + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + "| COUNT(cpu.time) | COUNT(UInt8(1)) | COUNT(cpu.bar) | MIN(cpu.bar) | MAX(cpu.bar) | MIN(cpu.time) | MAX(cpu.time) |", + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + "| 4 | 4 | 4 | 1 | 2 | 1970-01-01T00:00:00.000000010Z | 1970-01-01T00:00:00.000000040Z |", + "+-----------------+-----------------+----------------+--------------+--------------+--------------------------------+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::NoDeleteOneChunk {}, + "SELECT count(time), count(*), count(bar), min(bar), max(bar), min(time), max(time) from cpu", + &expected, + ) + .await; +} + +// BUG: https://github.com/influxdata/influxdb_iox/issues/2776 #[ignore] +#[tokio::test] +async fn sql_select_without_delete_max_foo() { + let expected = vec![ + "+--------------+", + "| MAX(cpu.foo) |", + "+--------------+", + "| you |", + "+--------------+", + ]; + run_sql_test_case( + scenarios::delete::NoDeleteOneChunk {}, + "SELECT max(foo) from cpu", + &expected, + ) + .await; +} + +// BUG: as as above +#[ignore] +#[tokio::test] +async fn sql_select_without_delete_min_foo() { + let expected = vec![ + "+--------------+", + "| MIN(cpu.foo) |", + "+--------------+", + "| me |", + "+--------------+", + ]; + run_sql_test_case( + scenarios::delete::NoDeleteOneChunk {}, + "SELECT min(foo) from cpu", + &expected, + ) + .await; +} + +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_delete_from_multi_exprs_agg() { // Count, min and max on many columns but not `foo` that is included in delete predicate @@ -1086,7 +1219,25 @@ async fn sql_select_with_delete_from_multi_exprs_agg() { } #[tokio::test] -async fn sql_select_with_delete_from_multi_exprs_count_col() { +async fn sql_select_with_delete_from_multi_exprs_count_time() { + // Count, min and max on many columns but not `foo` that is included in delete predicate + let expected = vec![ + "+-----------------+", + "| COUNT(cpu.time) |", + "+-----------------+", + "| 2 |", + "+-----------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT count(time) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_count_foo() { let expected = vec![ "+----------------+", "| COUNT(cpu.foo) |", @@ -1104,6 +1255,25 @@ async fn sql_select_with_delete_from_multi_exprs_count_col() { .await; } +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_count_bar() { + let expected = vec![ + "+----------------+", + "| COUNT(cpu.bar) |", + "+----------------+", + "| 2 |", + "+----------------+", + ]; + + // OneDeleteMultiExprsOneChunk's delete predicates include columns foo, bar and time + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT count(bar) from cpu", + &expected, + ) + .await; +} + #[tokio::test] async fn sql_select_with_delete_from_multi_exprs_count_star() { let expected = vec![ @@ -1122,7 +1292,7 @@ async fn sql_select_with_delete_from_multi_exprs_count_star() { } #[tokio::test] -async fn sql_select_with_delete_from_multi_exprs_min() { +async fn sql_select_with_delete_from_multi_exprs_min_bar() { let expected = vec![ "+--------------+", "| MIN(cpu.bar) |", @@ -1138,13 +1308,76 @@ async fn sql_select_with_delete_from_multi_exprs_min() { .await; } -// BUG -// Running scenario 'Deleted data from one Open MUB chunk, with 1 deletes from open MUB' -// SQL: '"SELECT max(foo) from cpu"' -// thread 'sql::sql_select_with_delete_from_multi_exprs_max' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 0"))', query_tests/src/sql.rs:37:74 +// BUG: https://github.com/influxdata/influxdb_iox/issues/2776 #[ignore] #[tokio::test] -async fn sql_select_with_delete_from_multi_exprs_max() { +async fn sql_select_with_delete_from_multi_exprs_min_foo() { + let expected = vec![ + "+--------------+", + "| MIN(cpu.foo) |", + "+--------------+", + "| me |", + "+--------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT min(foo) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_foo() { + let expected = vec![ + "+-----+", "| foo |", "+-----+", "| me |", "| you |", "+-----+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT foo from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_min_time() { + let expected = vec![ + "+--------------------------------+", + "| MIN(cpu.time) |", + "+--------------------------------+", + "| 1970-01-01T00:00:00.000000020Z |", + "+--------------------------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT min(time) from cpu", + &expected, + ) + .await; +} + +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_max_bar() { + let expected = vec![ + "+--------------+", + "| MAX(cpu.bar) |", + "+--------------+", + "| 2 |", + "+--------------+", + ]; + run_sql_test_case( + scenarios::delete::OneDeleteMultiExprsOneChunk {}, + "SELECT max(bar) from cpu", + &expected, + ) + .await; +} + +// BUG: https://github.com/influxdata/influxdb_iox/issues/2776 +#[ignore] +#[tokio::test] +async fn sql_select_with_delete_from_multi_exprs_max_foo() { let expected = vec![ "+----------------+", "| COUNT(cpu.foo) |", @@ -1160,9 +1393,6 @@ async fn sql_select_with_delete_from_multi_exprs_max() { .await; } -// BUG: scenario 'Deleted data from one RUB chunk, with 1 deletes from open MUB' -// return wrong result -#[ignore] #[tokio::test] async fn sql_select_with_delete_from_multi_exprs_max_time() { let expected = vec![ @@ -1180,6 +1410,8 @@ async fn sql_select_with_delete_from_multi_exprs_max_time() { .await; } +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_delete_from_multi_exprs_with_select_predicate() { // not eliminate extra row @@ -1272,6 +1504,8 @@ async fn sql_select_with_two_deletes_from_multi_exprs_with_select_predicate() { .await; } +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks() { let expected = vec![ @@ -1296,6 +1530,8 @@ async fn sql_select_with_three_deletes_from_three_chunks() { .await; } +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks_count_star() { let expected = vec![ @@ -1386,9 +1622,7 @@ async fn sql_select_with_three_deletes_from_three_chunks_min_bar() { .await; } -// BUG scenario 'Deleted data from 3 chunks, - OS - RUB - Open MUB, with 3 deletes after all chunks are created' -// SQL: '"SELECT max(foo) from cpu"' -// thread 'sql::sql_select_with_three_deletes_from_three_chunks_max_foo' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 0"))', query_tests/src/sql.rs:37:74 +// BUG: https://github.com/influxdata/influxdb_iox/issues/2776 #[ignore] #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks_max_foo() { @@ -1408,18 +1642,14 @@ async fn sql_select_with_three_deletes_from_three_chunks_max_foo() { .await; } -// BUG: Running scenario 'Deleted data from 3 chunks, - OS - RUB - Open MUB, with 3 deletes after all chunks are created' -// SQL: '"SELECT min(time), max(time) from cpu"' -// thread 'sql::sql_select_with_three_deletes_from_three_chunks_min_max_time' panicked at 'Running plan: ArrowError(ExternalError(Internal("MIN/MAX is not expected to receive scalars of incompatible types (TimestampNanosecond(NULL), Float64(1))")))', query_tests/src/sql.rs:37:74 -#[ignore] #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks_min_max_time() { let expected = vec![ - "+-----+-----+--------------------------------+", - "| bar | foo | time |", - "+-----+-----+--------------------------------+", - "| 7 | me | 1970-01-01T00:00:00.000000080Z |", - "+-----+-----+--------------------------------+", + "+--------------------------------+--------------------------------+", + "| MIN(cpu.time) | MAX(cpu.time) |", + "+--------------------------------+--------------------------------+", + "| 1970-01-01T00:00:00.000000040Z | 1970-01-01T00:00:00.000000080Z |", + "+--------------------------------+--------------------------------+", ]; run_sql_test_case( @@ -1430,6 +1660,8 @@ async fn sql_select_with_three_deletes_from_three_chunks_min_max_time() { .await; } +// -------------------------------------------------------- + #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate() { test_helpers::maybe_start_logging(); @@ -1524,6 +1756,8 @@ async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate() .await; } +// -------------------------------------------------------- + // let expected = vec![ // "+-----+-----+--------------------------------+", // "| bar | foo | time |", @@ -1539,8 +1773,6 @@ async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate() #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_min_bar() { - // ---- - // min, max & count let expected = vec![ "+--------------+", "| MIN(cpu.bar) |", @@ -1557,9 +1789,7 @@ async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_m .await; } -// BUG: Running scenario 'Deleted data from 3 chunks, - OS - RUB - Open MUB, with 3 deletes after all chunks are created' -// SQL: '"SELECT max(foo) from cpu where foo = 'me' and (bar > 2 or bar = 1.0)"' -// thread 'sql::sql_select_with_three_deletes_from_three_chunks_with_select_predicate_max_foo' panicked at 'Running plan: ArrowError(InvalidArgumentError("column types must match schema types, expected Dictionary(Int32, Utf8) but found Utf8 at column index 0"))', query_tests/src/sql.rs:37:74 +// BUG: https://github.com/influxdata/influxdb_iox/issues/2776 #[ignore] #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_max_foo() { From f2cdb9531fe3f0113f4f6e7d1f340d5452239758 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 8 Oct 2021 14:52:15 -0400 Subject: [PATCH 4/8] chore: cleanup --- predicate/src/delete_predicate.rs | 11 ++--------- query/src/lib.rs | 5 ----- query/src/provider.rs | 7 ++++--- query_tests/src/sql.rs | 14 -------------- 4 files changed, 6 insertions(+), 31 deletions(-) diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index d7bad8c546..a06fe0d3c0 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -103,19 +103,12 @@ impl DeletePredicate { /// Return all columns participating in the expressions of delete predicate minus the time column if any pub fn all_column_names_but_time(&self) -> hashbrown::HashSet<&str> { - // println!(" ===== del exprs: {:#?}", self.exprs); - // Get column names of the predicate expressions - let cols = self - .exprs + self.exprs .iter() .map(|e| e.column()) .filter(|col_name| col_name != &TIME_COLUMN_NAME) - .collect::>(); - - // println!(" ===== cols of del exprs: {:#?}", cols); - - cols + .collect::>() } } diff --git a/query/src/lib.rs b/query/src/lib.rs index a07f93751f..9475916cd8 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -65,25 +65,20 @@ pub trait QueryChunkMeta: Sized { let mut col_names: HashSet<&str> = hashbrown::HashSet::new(); for pred in self.delete_predicates() { let cols = pred.all_column_names_but_time(); - // println!(" ===== in loop - all_column_names_but_time: {:#?}", cols); for col in cols { col_names.insert(col); } } - // println!(" ===== col_names in pred exprs: {:#?}", col_names); // convert to vector let mut column_names: Vec<&str> = Vec::from_iter(col_names); - // println!(" ===== vector column_names: {:#?}", column_names); // Sort it column_names.sort_unstable(); - // println!(" ===== vector column_names after sorting: {:#?}", column_names); // Now add time column to the end of the vector // Since time range is a must in the delete predicate, time column must be in this list column_names.push(TIME_COLUMN_NAME); - // println!(" ===== vector column_names with time: {:#?}", column_names); column_names } diff --git a/query/src/provider.rs b/query/src/provider.rs index 72bc207c2f..268b08360a 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -769,8 +769,8 @@ impl Deduplicater { predicate: Predicate, // This is the select predicate of the query output_sort_key: &SortKey<'_>, ) -> Result> { - // Add columns in sort key or delete predicates if not yet in the output schema - // This is needed because columns in select query may not include all columns of sort key and delete predicates + // Add columns of sort key and delete predicates in the schema of to-be-scanned IOxReadFilterNode + // This is needed because columns in select query may not include them yet let mut input_schema = Arc::clone(&output_schema); // // Cols of sort keys (which is a part of the primary key) @@ -822,7 +822,8 @@ impl Deduplicater { input = Self::build_sort_plan(chunk, input, output_sort_key)? } - // select back to the requested output schema + // Add a projection operator to return only schema of the operator above this in the plan + // This is needed for matching column index of that operator Self::add_projection_node_if_needed(output_schema, input) } diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index c4b70f8162..a29ef6a57c 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -61,7 +61,6 @@ async fn sql_select_from_cpu_min_utf8() { "| MIN(cpu.region |", "+----------------+", "| west |", - "| west |", "+----------------+", ]; run_sql_test_case(TwoMeasurements {}, "SELECT min(region) from cpu", &expected).await; @@ -1758,19 +1757,6 @@ async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate() // -------------------------------------------------------- -// let expected = vec![ -// "+-----+-----+--------------------------------+", -// "| bar | foo | time |", -// "+-----+-----+--------------------------------+", -// "| 1 | me | 1970-01-01T00:00:00.000000040Z |", -// "| 1 | me | 1970-01-01T00:00:00.000000042Z |", -// "| 1 | me | 1970-01-01T00:00:00.000000062Z |", -// "| 4 | me | 1970-01-01T00:00:00.000000050Z |", -// "| 5 | me | 1970-01-01T00:00:00.000000060Z |", -// "| 7 | me | 1970-01-01T00:00:00.000000080Z |", -// "+-----+-----+--------------------------------+", -// ]; - #[tokio::test] async fn sql_select_with_three_deletes_from_three_chunks_with_select_predicate_min_bar() { let expected = vec![ From 2556639bb5b6985a569d5c85b01f9e6939f4222d Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 8 Oct 2021 14:57:22 -0400 Subject: [PATCH 5/8] chore: more cleanup --- query/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/query/src/lib.rs b/query/src/lib.rs index 9475916cd8..9267730ad9 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -59,7 +59,7 @@ pub trait QueryChunkMeta: Sized { /// return column names participating in the all delete predicates /// in lexicographical order with one exception that time column is last /// This order is to be consistent with Schema::primary_key - #[warn(clippy::stable_sort_primitive)] + //#[warn(clippy::stable_sort_primitive)] fn delete_predicate_columns(&self) -> Vec<&str> { // get all column names but time let mut col_names: HashSet<&str> = hashbrown::HashSet::new(); From bea310db766ceee779fe390f21530fb8cbdec375 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 8 Oct 2021 14:58:06 -0400 Subject: [PATCH 6/8] chore: remove comments --- query/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/query/src/lib.rs b/query/src/lib.rs index 9267730ad9..c9ff547d88 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -59,7 +59,6 @@ pub trait QueryChunkMeta: Sized { /// return column names participating in the all delete predicates /// in lexicographical order with one exception that time column is last /// This order is to be consistent with Schema::primary_key - //#[warn(clippy::stable_sort_primitive)] fn delete_predicate_columns(&self) -> Vec<&str> { // get all column names but time let mut col_names: HashSet<&str> = hashbrown::HashSet::new(); From d0a17ca79d12763bcdd0fc1695bc499b2f8bc213 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 8 Oct 2021 16:47:37 -0400 Subject: [PATCH 7/8] refactor: address Edd's review comments --- predicate/src/delete_predicate.rs | 16 ++++++++-------- query/src/exec/context.rs | 2 +- query/src/lib.rs | 17 ++++++----------- 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/predicate/src/delete_predicate.rs b/predicate/src/delete_predicate.rs index a06fe0d3c0..c28dafb058 100644 --- a/predicate/src/delete_predicate.rs +++ b/predicate/src/delete_predicate.rs @@ -1,4 +1,4 @@ -use std::convert::TryInto; +use std::{collections::BTreeSet, convert::TryInto}; use chrono::DateTime; use data_types::timestamp::TimestampRange; @@ -102,13 +102,13 @@ impl DeletePredicate { } /// Return all columns participating in the expressions of delete predicate minus the time column if any - pub fn all_column_names_but_time(&self) -> hashbrown::HashSet<&str> { - // Get column names of the predicate expressions - self.exprs - .iter() - .map(|e| e.column()) - .filter(|col_name| col_name != &TIME_COLUMN_NAME) - .collect::>() + pub fn all_column_names_but_time<'a>(&'a self, cols: &mut BTreeSet<&'a str>) { + for expr in &self.exprs { + let col = expr.column(); + if col != TIME_COLUMN_NAME { + cols.insert(col); + } + } } } diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index 55290192e8..ef356e6330 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -255,7 +255,7 @@ impl IOxExecutionContext { debug!(text=%sql, "planning SQL query"); let logical_plan = ctx.inner.create_logical_plan(sql)?; let s = logical_plan.display_graphviz().to_string(); - debug!(?s, "logical plan"); + debug!(plan=?s, "logical plan"); ctx.prepare_plan(&logical_plan).await } diff --git a/query/src/lib.rs b/query/src/lib.rs index c9ff547d88..db538aa18c 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -24,8 +24,8 @@ use predicate::{ predicate::{Predicate, PredicateMatch}, }; -use hashbrown::{HashMap, HashSet}; -use std::{fmt::Debug, iter::FromIterator, sync::Arc}; +use hashbrown::HashMap; +use std::{collections::BTreeSet, fmt::Debug, iter::FromIterator, sync::Arc}; pub mod exec; pub mod frontend; @@ -61,19 +61,14 @@ pub trait QueryChunkMeta: Sized { /// This order is to be consistent with Schema::primary_key fn delete_predicate_columns(&self) -> Vec<&str> { // get all column names but time - let mut col_names: HashSet<&str> = hashbrown::HashSet::new(); + let mut col_names = BTreeSet::new(); for pred in self.delete_predicates() { - let cols = pred.all_column_names_but_time(); - for col in cols { - col_names.insert(col); - } + //let cols = pred.all_column_names_but_time(); + pred.all_column_names_but_time(&mut col_names); } // convert to vector - let mut column_names: Vec<&str> = Vec::from_iter(col_names); - - // Sort it - column_names.sort_unstable(); + let mut column_names = Vec::from_iter(col_names); // Now add time column to the end of the vector // Since time range is a must in the delete predicate, time column must be in this list From d13e61c20174ae80fd008b488c36aba45eba3eae Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Mon, 11 Oct 2021 09:33:23 -0400 Subject: [PATCH 8/8] fix: Apply suggestions from code review Co-authored-by: Andrew Lamb --- query/src/exec/context.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index ef356e6330..beb6aa274a 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -254,8 +254,7 @@ impl IOxExecutionContext { let ctx = self.child_ctx("prepare_sql"); debug!(text=%sql, "planning SQL query"); let logical_plan = ctx.inner.create_logical_plan(sql)?; - let s = logical_plan.display_graphviz().to_string(); - debug!(plan=?s, "logical plan"); + debug!(plan=%logical_plan.display_graphviz(), "logical plan"); ctx.prepare_plan(&logical_plan).await }