From 57f08dbccde9d7ae8eb331cbf0173f2d16fa33bd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Jan 2023 13:19:32 +0100 Subject: [PATCH] chore: Update datafusion to Jan 9, 2023 (1 / 2) (#6603) * refactor: Update DataFusion pin to early Jan 2023 * fix: Update tests now that planning is async * fix: Updates for API changes * chore: Run cargo hakari tasks * fix: Update comment * refactor: nicer config setup * fix: gapfill async Co-authored-by: CircleCI[bot] --- Cargo.lock | 69 +++++----- Cargo.toml | 8 +- datafusion_util/src/config.rs | 23 ++-- iox_query/src/exec/context.rs | 20 ++- iox_query/src/frontend/influxql.rs | 2 +- iox_query/src/logical_optimizer/mod.rs | 13 +- iox_query/src/plan/influxql.rs | 119 +++++++++--------- iox_query/src/provider/deduplicate.rs | 5 +- iox_query/src/provider/physical.rs | 1 + iox_query/src/test.rs | 3 +- parquet_file/src/storage.rs | 1 + parquet_to_line_protocol/src/lib.rs | 1 + predicate/src/delete_expr.rs | 4 +- predicate/src/rpc_predicate/rewrite.rs | 6 - querier/src/namespace/query_access.rs | 7 +- querier/src/system_tables/mod.rs | 3 +- query_functions/src/gapfill.rs | 2 +- query_functions/src/lib.rs | 13 +- query_functions/src/regex.rs | 2 +- query_functions/src/selectors.rs | 53 +++----- .../in/dedup_and_predicates_parquet.expected | 101 ++++++++------- ...p_and_predicates_parquet_ingester.expected | 107 ++++++++-------- .../cases/in/duplicates_ingester.expected | 37 +++--- .../cases/in/duplicates_parquet.expected | 33 ++--- query_tests/cases/in/pushdown.expected | 77 ++++++------ query_tests/cases/in/retention.expected | 96 +++++++------- query_tests/cases/in/several_chunks.expected | 41 +++--- workspace-hack/Cargo.toml | 2 +- 28 files changed, 411 insertions(+), 438 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ca074ed3f..3a1da02d92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1337,8 +1337,8 @@ dependencies = [ [[package]] name = "datafusion" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "ahash 0.8.2", "arrow", @@ -1358,6 +1358,7 @@ dependencies = [ "futures", "glob", "hashbrown 0.13.2", + "indexmap", "itertools", "lazy_static", "log", @@ -1370,7 +1371,7 @@ dependencies = [ "pin-project-lite", "rand", "smallvec", - "sqlparser 0.28.0", + "sqlparser", "tempfile", "tokio", "tokio-stream", @@ -1382,32 +1383,33 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "arrow", "chrono", + "num_cpus", "object_store", "parquet", - "sqlparser 0.28.0", + "sqlparser", ] [[package]] name = "datafusion-expr" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "ahash 0.8.2", "arrow", "datafusion-common", "log", - "sqlparser 0.28.0", + "sqlparser", ] [[package]] name = "datafusion-optimizer" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "arrow", "async-trait", @@ -1417,12 +1419,13 @@ dependencies = [ "datafusion-physical-expr", "hashbrown 0.13.2", "log", + "regex-syntax", ] [[package]] name = "datafusion-physical-expr" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "ahash 0.8.2", "arrow", @@ -1436,6 +1439,7 @@ dependencies = [ "datafusion-row", "half 2.1.0", "hashbrown 0.13.2", + "indexmap", "itertools", "lazy_static", "md-5", @@ -1450,8 +1454,8 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "arrow", "chrono", @@ -1467,8 +1471,8 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "arrow", "datafusion-common", @@ -1478,14 +1482,14 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "15.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" +version = "16.0.0" +source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603" dependencies = [ "arrow-schema", "datafusion-common", "datafusion-expr", "log", - "sqlparser 0.28.0", + "sqlparser", ] [[package]] @@ -2508,7 +2512,7 @@ version = "0.1.0" dependencies = [ "generated_types", "snafu", - "sqlparser 0.30.0", + "sqlparser", "workspace-hack", ] @@ -4178,7 +4182,7 @@ dependencies = [ "query_functions", "schema", "snafu", - "sqlparser 0.30.0", + "sqlparser", "test_helpers", "workspace-hack", ] @@ -5319,15 +5323,6 @@ dependencies = [ "unicode_categories", ] -[[package]] -name = "sqlparser" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "249ae674b9f636b8ff64d8bfe218774cf05a26de40fd9f358669dccc4c0a9d7d" -dependencies = [ - "log", -] - [[package]] name = "sqlparser" version = "0.30.0" @@ -5335,6 +5330,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b" dependencies = [ "log", + "sqlparser_derive", +] + +[[package]] +name = "sqlparser_derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 79bd154f74..55692f0fbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,8 +115,12 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "29.0.0" } arrow-flight = { version = "29.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" } +#datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", default-features = false } +#datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" } +# Temporary patch to https://github.com/alamb/arrow-datafusion/tree/alamb/patched_for_iox +# See https://github.com/alamb/arrow-datafusion/pull/7 for details +datafusion = { git = "https://github.com/alamb/arrow-datafusion.git", branch="alamb/patched_for_iox", default-features = false } +datafusion-proto = { git = "https://github.com/alamb/arrow-datafusion.git", branch="alamb/patched_for_iox" } hashbrown = { version = "0.13.2" } parquet = { version = "29.0.0" } diff --git a/datafusion_util/src/config.rs b/datafusion_util/src/config.rs index b3ac8bae01..d4eff0c7ab 100644 --- a/datafusion_util/src/config.rs +++ b/datafusion_util/src/config.rs @@ -1,9 +1,4 @@ -use datafusion::{ - config::{ - OPT_COALESCE_TARGET_BATCH_SIZE, OPT_PARQUET_PUSHDOWN_FILTERS, OPT_PARQUET_REORDER_FILTERS, - }, - prelude::SessionConfig, -}; +use datafusion::{config::ConfigOptions, prelude::SessionConfig}; // The default catalog name - this impacts what SQL queries use if not specified pub const DEFAULT_CATALOG: &str = "public"; @@ -13,19 +8,15 @@ pub const DEFAULT_SCHEMA: &str = "iox"; /// The maximum number of rows that DataFusion should create in each RecordBatch pub const BATCH_SIZE: usize = 8 * 1024; -const COALESCE_BATCH_SIZE: usize = BATCH_SIZE / 2; - /// Return a SessionConfig object configured for IOx pub fn iox_session_config() -> SessionConfig { - SessionConfig::new() + // Enable parquet predicate pushdown optimization + let mut options = ConfigOptions::new(); + options.execution.parquet.pushdown_filters = true; + options.execution.parquet.reorder_filters = true; + + SessionConfig::from(options) .with_batch_size(BATCH_SIZE) - .set_u64( - OPT_COALESCE_TARGET_BATCH_SIZE, - COALESCE_BATCH_SIZE.try_into().unwrap(), - ) - // Enable parquet predicate pushdown optimization - .set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, true) - .set_bool(OPT_PARQUET_REORDER_FILTERS, true) .with_create_default_catalog_and_schema(true) .with_information_schema(true) .with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA) diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 1dc298b6f8..bc7b0e8656 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -20,7 +20,7 @@ use crate::{ split::StreamSplitExec, stringset::{IntoStringSet, StringSetRef}, }, - logical_optimizer::iox_optimizer, + logical_optimizer::register_iox_optimizers, plan::{ fieldlist::FieldListPlan, seriesset::{SeriesSetPlan, SeriesSetPlans}, @@ -222,13 +222,11 @@ impl IOxSessionConfig { let state = SessionState::with_config_rt(session_config, self.runtime) .with_query_planner(Arc::new(IOxQueryPlanner {})); - - let state = register_selector_aggregates(state); - let mut state = register_scalar_functions(state); - state.optimizer = iox_optimizer(); + let state = register_iox_optimizers(state); let inner = SessionContext::with_state(state); - + register_selector_aggregates(&inner); + register_scalar_functions(&inner); if let Some(default_catalog) = self.default_catalog { inner.register_catalog(DEFAULT_CATALOG, default_catalog); } @@ -311,9 +309,9 @@ impl IOxSessionContext { let ctx = self.child_ctx("prepare_sql"); debug!(text=%sql, "planning SQL query"); - // NOTE can not use ctx.inner.sql here as it also interprets DDL + // NOTE can not use ctx.inner.sql() here as it also interprets DDL #[allow(deprecated)] - let logical_plan = ctx.inner.create_logical_plan(sql)?; + let logical_plan = ctx.inner.state().create_logical_plan(sql).await?; debug!(plan=%logical_plan.display_graphviz(), "logical plan"); // Make nicer erorrs for unsupported SQL @@ -347,7 +345,7 @@ impl IOxSessionContext { pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result> { let mut ctx = self.child_ctx("create_physical_plan"); debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan"); - let physical_plan = ctx.inner.create_physical_plan(plan).await?; + let physical_plan = ctx.inner.state().create_physical_plan(plan).await?; ctx.recorder.event("physical plan"); debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run"); @@ -670,13 +668,13 @@ pub trait SessionContextIOxExt { impl SessionContextIOxExt for SessionState { fn child_span(&self, name: &'static str) -> Option { - self.config + self.config() .get_extension::>() .and_then(|span| span.as_ref().as_ref().map(|span| span.child(name))) } fn span_ctx(&self) -> Option { - self.config + self.config() .get_extension::>() .and_then(|span| span.as_ref().as_ref().map(|span| span.ctx.clone())) } diff --git a/iox_query/src/frontend/influxql.rs b/iox_query/src/frontend/influxql.rs index e1ef4c9b21..dc3598e1b1 100644 --- a/iox_query/src/frontend/influxql.rs +++ b/iox_query/src/frontend/influxql.rs @@ -40,7 +40,7 @@ impl InfluxQLQueryPlanner { } let planner = InfluxQLToLogicalPlan::new(&ctx, database); - let logical_plan = planner.statement_to_plan(statements.pop().unwrap())?; + let logical_plan = planner.statement_to_plan(statements.pop().unwrap()).await?; debug!(plan=%logical_plan.display_graphviz(), "logical plan"); // This would only work for SELECT statements at the moment, as the schema queries do diff --git a/iox_query/src/logical_optimizer/mod.rs b/iox_query/src/logical_optimizer/mod.rs index fba32fa298..76357c65a8 100644 --- a/iox_query/src/logical_optimizer/mod.rs +++ b/iox_query/src/logical_optimizer/mod.rs @@ -1,17 +1,14 @@ use std::sync::Arc; -use datafusion::optimizer::optimizer::Optimizer; +use datafusion::execution::context::SessionState; use self::influx_regex_to_datafusion_regex::InfluxRegexToDataFusionRegex; mod influx_regex_to_datafusion_regex; -/// Create IOx-specific logical [`Optimizer`]. +/// Register IOx-specific logical [`OptimizerRule`]s with the SessionContext /// -/// This is mostly the default optimizer that DataFusion provides but with some additional passes. -pub fn iox_optimizer() -> Optimizer { - let mut opt = Optimizer::new(); - opt.rules - .push(Arc::new(InfluxRegexToDataFusionRegex::new())); - opt +/// [`OptimizerRule`]: datafusion::optimizer::OptimizerRule +pub fn register_iox_optimizers(state: SessionState) -> SessionState { + state.add_optimizer_rule(Arc::new(InfluxRegexToDataFusionRegex::new())) } diff --git a/iox_query/src/plan/influxql.rs b/iox_query/src/plan/influxql.rs index 1ad512a299..d4cdaf5a68 100644 --- a/iox_query/src/plan/influxql.rs +++ b/iox_query/src/plan/influxql.rs @@ -8,14 +8,13 @@ mod var_ref; use crate::plan::influxql::rewriter::rewrite_statement; use crate::{DataFusionError, IOxSessionContext, QueryNamespace}; use datafusion::common::{DFSchema, Result, ScalarValue}; -use datafusion::execution::context::SessionState; +use datafusion::datasource::provider_as_source; use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::logical_plan::builder::project; use datafusion::logical_expr::{ lit, BinaryExpr, BuiltinScalarFunction, Expr, LogicalPlan, LogicalPlanBuilder, Operator, }; use datafusion::prelude::Column; -use datafusion::sql::planner::ContextProvider; use datafusion::sql::TableReference; use influxdb_influxql_parser::expression::{ BinaryOperator, ConditionalExpression, ConditionalOperator, VarRefDataType, @@ -56,19 +55,14 @@ enum ExprScope { pub struct InfluxQLToLogicalPlan<'a> { ctx: &'a IOxSessionContext, database: Arc, - state: SessionState, } impl<'a> InfluxQLToLogicalPlan<'a> { pub fn new(ctx: &'a IOxSessionContext, database: Arc) -> Self { - Self { - ctx, - database, - state: ctx.inner().state(), - } + Self { ctx, database } } - pub fn statement_to_plan(&self, statement: Statement) -> Result { + pub async fn statement_to_plan(&self, statement: Statement) -> Result { match statement { Statement::CreateDatabase(_) => { Err(DataFusionError::NotImplemented("CREATE DATABASE".into())) @@ -80,7 +74,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Statement::Explain(_) => Err(DataFusionError::NotImplemented("EXPLAIN".into())), Statement::Select(select) => { let select = rewrite_statement(self.database.as_meta(), &select)?; - self.select_statement_to_plan(select) + self.select_statement_to_plan(select).await } Statement::ShowDatabases(_) => { Err(DataFusionError::NotImplemented("SHOW DATABASES".into())) @@ -104,9 +98,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> { } /// Create a [`LogicalPlan`] from the specified InfluxQL `SELECT` statement. - fn select_statement_to_plan(&self, select: SelectStatement) -> Result { + async fn select_statement_to_plan(&self, select: SelectStatement) -> Result { // Process FROM clause - let plans = self.plan_from_tables(select.from)?; + let plans = self.plan_from_tables(select.from).await?; // Only support a single measurement to begin with let plan = match plans.len() { @@ -398,12 +392,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> { /// Generate a list of logical plans for each of the tables references in the `FROM` /// clause. - fn plan_from_tables(&self, from: FromMeasurementClause) -> Result> { - from.iter() - .map(|ms| match ms { + async fn plan_from_tables(&self, from: FromMeasurementClause) -> Result> { + let mut plans = vec![]; + for ms in from.iter() { + let plan = match ms { MeasurementSelection::Name(qn) => match qn.name { MeasurementName::Name(ref ident) => { - self.create_table_ref(normalize_identifier(ident)) + self.create_table_ref(normalize_identifier(ident)).await } // rewriter is expected to expand the regular expression MeasurementName::Regex(_) => Err(DataFusionError::Internal( @@ -413,17 +408,19 @@ impl<'a> InfluxQLToLogicalPlan<'a> { MeasurementSelection::Subquery(_) => Err(DataFusionError::NotImplemented( "subquery in FROM clause".into(), )), - }) - .collect() + }?; + plans.push(plan); + } + Ok(plans) } /// Create a [LogicalPlan] that refers to the specified `table_name` or /// an [LogicalPlan::EmptyRelation] if the table does not exist. - fn create_table_ref(&self, table_name: String) -> Result { + async fn create_table_ref(&self, table_name: String) -> Result { let table_ref: TableReference<'_> = table_name.as_str().into(); - if let Ok(provider) = self.state.get_table_provider(table_ref) { - LogicalPlanBuilder::scan(&table_name, provider, None)?.build() + if let Ok(provider) = self.ctx.inner().table_provider(table_ref).await { + LogicalPlanBuilder::scan(&table_name, provider_as_source(provider), None)?.build() } else { LogicalPlanBuilder::empty(false).build() } @@ -469,7 +466,7 @@ mod test { use influxdb_influxql_parser::parse_statements; use insta::assert_snapshot; - fn plan(sql: &str) -> String { + async fn plan(sql: &str) -> String { let mut statements = parse_statements(sql).unwrap(); // index of columns in the above chunk: [bar, foo, i64_field, i64_field_2, time] let executor = Arc::new(Executor::new_testing()); @@ -502,7 +499,7 @@ mod test { let ctx = test_db.new_query_context(None); let planner = InfluxQLToLogicalPlan::new(&ctx, test_db); - match planner.statement_to_plan(statements.pop().unwrap()) { + match planner.statement_to_plan(statements.pop().unwrap()).await { Ok(res) => res.display_indent_schema().to_string(), Err(err) => err.to_string(), } @@ -511,18 +508,18 @@ mod test { /// Verify the list of unsupported statements. /// /// It is expected certain statements will be unsupported, indefinitely. - #[test] - fn test_unsupported_statements() { - assert_snapshot!(plan("CREATE DATABASE foo")); - assert_snapshot!(plan("DELETE FROM foo")); - assert_snapshot!(plan("DROP MEASUREMENT foo")); - assert_snapshot!(plan("EXPLAIN SELECT bar FROM foo")); - assert_snapshot!(plan("SHOW DATABASES")); - assert_snapshot!(plan("SHOW MEASUREMENTS")); - assert_snapshot!(plan("SHOW RETENTION POLICIES")); - assert_snapshot!(plan("SHOW TAG KEYS")); - assert_snapshot!(plan("SHOW TAG VALUES WITH KEY = bar")); - assert_snapshot!(plan("SHOW FIELD KEYS")); + #[tokio::test] + async fn test_unsupported_statements() { + assert_snapshot!(plan("CREATE DATABASE foo").await); + assert_snapshot!(plan("DELETE FROM foo").await); + assert_snapshot!(plan("DROP MEASUREMENT foo").await); + assert_snapshot!(plan("EXPLAIN SELECT bar FROM foo").await); + assert_snapshot!(plan("SHOW DATABASES").await); + assert_snapshot!(plan("SHOW MEASUREMENTS").await); + assert_snapshot!(plan("SHOW RETENTION POLICIES").await); + assert_snapshot!(plan("SHOW TAG KEYS").await); + assert_snapshot!(plan("SHOW TAG VALUES WITH KEY = bar").await); + assert_snapshot!(plan("SHOW FIELD KEYS").await); } /// Tests to validate InfluxQL `SELECT` statements that project columns without specifying @@ -531,25 +528,25 @@ mod test { use super::*; /// Select data from a single measurement - #[test] - fn test_single_measurement() { - assert_snapshot!(plan("SELECT f64_field FROM data")); - assert_snapshot!(plan("SELECT time, f64_field FROM data")); - assert_snapshot!(plan("SELECT time as timestamp, f64_field FROM data")); - assert_snapshot!(plan("SELECT foo, f64_field FROM data")); - assert_snapshot!(plan("SELECT foo, f64_field, i64_field FROM data")); - assert_snapshot!(plan("SELECT /^f/ FROM data")); - assert_snapshot!(plan("SELECT * FROM data")); - assert_snapshot!(plan("SELECT TIME FROM data")); // TIME is a field + #[tokio::test] + async fn test_single_measurement() { + assert_snapshot!(plan("SELECT f64_field FROM data").await); + assert_snapshot!(plan("SELECT time, f64_field FROM data").await); + assert_snapshot!(plan("SELECT time as timestamp, f64_field FROM data").await); + assert_snapshot!(plan("SELECT foo, f64_field FROM data").await); + assert_snapshot!(plan("SELECT foo, f64_field, i64_field FROM data").await); + assert_snapshot!(plan("SELECT /^f/ FROM data").await); + assert_snapshot!(plan("SELECT * FROM data").await); + assert_snapshot!(plan("SELECT TIME FROM data").await); // TIME is a field } /// Arithmetic expressions in the projection list - #[test] - fn test_simple_arithmetic_in_projection() { - assert_snapshot!(plan("SELECT foo, f64_field + f64_field FROM data")); - assert_snapshot!(plan("SELECT foo, sin(f64_field) FROM data")); - assert_snapshot!(plan("SELECT foo, atan2(f64_field, 2) FROM data")); - assert_snapshot!(plan("SELECT foo, f64_field + 0.5 FROM data")); + #[tokio::test] + async fn test_simple_arithmetic_in_projection() { + assert_snapshot!(plan("SELECT foo, f64_field + f64_field FROM data").await); + assert_snapshot!(plan("SELECT foo, sin(f64_field) FROM data").await); + assert_snapshot!(plan("SELECT foo, atan2(f64_field, 2) FROM data").await); + assert_snapshot!(plan("SELECT foo, f64_field + 0.5 FROM data").await); } // The following is an outline of additional scenarios to develop @@ -659,10 +656,10 @@ mod test { /// Succeeds and returns null values for the expression /// **Actual:** /// Error during planning: 'Float64 + Utf8' can't be evaluated because there isn't a common type to coerce the types to - #[test] + #[tokio::test] #[ignore] - fn test_select_coercion_from_str() { - assert_snapshot!(plan("SELECT f64_field + str_field::float FROM data")); + async fn test_select_coercion_from_str() { + assert_snapshot!(plan("SELECT f64_field + str_field::float FROM data").await); } /// **Issue:** @@ -673,10 +670,10 @@ mod test { /// Succeeds and plan projection of f64_field is Float64 /// **Data:** /// m0,tag0=val00 f64=99.0,i64=100i,str="lo",str_f64="5.5" 1667181600000000000 - #[test] + #[tokio::test] #[ignore] - fn test_select_explicit_cast() { - assert_snapshot!(plan("SELECT f64_field::integer FROM data")); + async fn test_select_explicit_cast() { + assert_snapshot!(plan("SELECT f64_field::integer FROM data").await); } /// **Issue:** @@ -685,14 +682,14 @@ mod test { /// Succeeds and plans the query, returning null values for unknown columns /// **Actual:** /// Schema error: No field named 'TIME'. Valid fields are 'data'.'bar', 'data'.'bool_field', 'data'.'f64_field', 'data'.'foo', 'data'.'i64_field', 'data'.'mixedCase', 'data'.'str_field', 'data'.'time', 'data'.'with space'. - #[test] + #[tokio::test] #[ignore] - fn test_select_case_sensitivity() { + async fn test_select_case_sensitivity() { // should return no results - assert_snapshot!(plan("SELECT TIME, f64_Field FROM data")); + assert_snapshot!(plan("SELECT TIME, f64_Field FROM data").await); // should bind to time and f64_field, and i64_Field should return NULL values - assert_snapshot!(plan("SELECT time, f64_field, i64_Field FROM data")); + assert_snapshot!(plan("SELECT time, f64_field, i64_Field FROM data").await); } } } diff --git a/iox_query/src/provider/deduplicate.rs b/iox_query/src/provider/deduplicate.rs index f75bb7c7ba..d1fdda6f91 100644 --- a/iox_query/src/provider/deduplicate.rs +++ b/iox_query/src/provider/deduplicate.rs @@ -156,8 +156,9 @@ impl ExecutionPlan for DeduplicateExec { Some(&self.sort_keys) } - fn relies_on_input_order(&self) -> bool { - true + fn required_input_ordering(&self) -> Vec> { + // requires the input to be sorted on the primary key + vec![self.output_ordering()] } fn maintains_input_order(&self) -> bool { diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index e59af96b55..049e194589 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -198,6 +198,7 @@ pub fn chunks_to_physical_nodes( limit: None, table_partition_cols: vec![], output_ordering, + infinite_source: false, }; let meta_size_hint = None; let parquet_exec = ParquetExec::new(base_config, predicate.filter_expr(), meta_size_hint); diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 8c00df8edf..59229af1bf 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -232,6 +232,7 @@ struct TestDatabaseSchemaProvider { partitions: BTreeMap>>, } +#[async_trait] impl SchemaProvider for TestDatabaseSchemaProvider { fn as_any(&self) -> &dyn Any { self @@ -246,7 +247,7 @@ impl SchemaProvider for TestDatabaseSchemaProvider { .collect() } - fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Option> { Some(Arc::new(TestDatabaseTableProvider { partitions: self .partitions diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 39f1fe050c..ceb27b207f 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -123,6 +123,7 @@ impl ParquetExecInput { table_partition_cols: vec![], // Parquet files ARE actually sorted but we don't care here since we just construct a `collect` plan. output_ordering: None, + infinite_source: false, }; let exec = ParquetExec::new(base_config, None, None); let exec_schema = exec.schema(); diff --git a/parquet_to_line_protocol/src/lib.rs b/parquet_to_line_protocol/src/lib.rs index 78afaa1aac..88385fbabb 100644 --- a/parquet_to_line_protocol/src/lib.rs +++ b/parquet_to_line_protocol/src/lib.rs @@ -222,6 +222,7 @@ impl ParquetFileReader { limit: None, table_partition_cols: vec![], output_ordering: None, + infinite_source: false, }; // set up enough datafusion context to do the real read session diff --git a/predicate/src/delete_expr.rs b/predicate/src/delete_expr.rs index 084ecd6186..4c9cdc3861 100644 --- a/predicate/src/delete_expr.rs +++ b/predicate/src/delete_expr.rs @@ -216,13 +216,13 @@ mod tests { #[test] fn test_unsupported_operator() { - let res = df_to_op(datafusion::logical_expr::Operator::Like); + let res = df_to_op(datafusion::logical_expr::Operator::Lt); assert_contains!(res.unwrap_err().to_string(), "unsupported operator:"); } #[test] fn test_unsupported_operator_in_expr() { - let expr = col("foo").like(lit("x")); + let expr = col("foo").lt(lit("x")); let res = df_to_expr(expr); assert_contains!(res.unwrap_err().to_string(), "unsupported operator:"); } diff --git a/predicate/src/rpc_predicate/rewrite.rs b/predicate/src/rpc_predicate/rewrite.rs index da4814f718..4488e86a5a 100644 --- a/predicate/src/rpc_predicate/rewrite.rs +++ b/predicate/src/rpc_predicate/rewrite.rs @@ -99,10 +99,6 @@ fn is_comparison(op: Operator) -> bool { Operator::Modulo => false, Operator::And => true, Operator::Or => true, - Operator::Like => true, - Operator::ILike => true, - Operator::NotLike => true, - Operator::NotILike => true, Operator::IsDistinctFrom => true, Operator::IsNotDistinctFrom => true, Operator::RegexMatch => true, @@ -381,8 +377,6 @@ mod tests { run_case(Operator::Modulo, false, lit(1), lit(2)); run_case(Operator::And, true, lit("foo"), lit("bar")); run_case(Operator::Or, true, lit("foo"), lit("bar")); - run_case(Operator::Like, true, lit("foo"), lit("bar")); - run_case(Operator::NotLike, true, lit("foo"), lit("bar")); run_case(Operator::IsDistinctFrom, true, lit("foo"), lit("bar")); run_case(Operator::IsNotDistinctFrom, true, lit("foo"), lit("bar")); run_case(Operator::RegexMatch, true, lit("foo"), lit("bar")); diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index da8b76aed1..fb839488b7 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -163,6 +163,7 @@ struct UserSchemaProvider { tables: Arc, Arc>>, } +#[async_trait] impl SchemaProvider for UserSchemaProvider { fn as_any(&self) -> &dyn Any { self as &dyn Any @@ -174,7 +175,7 @@ impl SchemaProvider for UserSchemaProvider { names } - fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Option> { self.tables.get(name).map(|t| Arc::clone(t) as _) } @@ -517,10 +518,10 @@ mod tests { "| | CoalescePartitionsExec |", "| | ProjectionExec: expr=[host@0 as host, perc@1 as perc, time@2 as time] |", "| | UnionExec |", - "| | CoalesceBatchesExec: target_batch_size=4096 |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", "| | FilterExec: time@2 < 1 OR time@2 > 13 OR NOT host@0 = CAST(d AS Dictionary(Int32, Utf8)) |", "| | ParquetExec: limit=None, partitions={1 group: [[1/2/1/4/.parquet]]}, projection=[host, perc, time] |", - "| | CoalesceBatchesExec: target_batch_size=4096 |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", "| | FilterExec: time@2 < 1 OR time@2 > 13 OR NOT host@0 = CAST(d AS Dictionary(Int32, Utf8)) |", "| | ParquetExec: limit=None, partitions={1 group: [[1/2/1/4/.parquet]]}, projection=[host, perc, time] |", "| | |", diff --git a/querier/src/system_tables/mod.rs b/querier/src/system_tables/mod.rs index 0429b55a1f..1ab101074a 100644 --- a/querier/src/system_tables/mod.rs +++ b/querier/src/system_tables/mod.rs @@ -43,6 +43,7 @@ impl SystemSchemaProvider { } } +#[async_trait] impl SchemaProvider for SystemSchemaProvider { fn as_any(&self) -> &dyn Any { self as &dyn Any @@ -55,7 +56,7 @@ impl SchemaProvider for SystemSchemaProvider { .collect() } - fn table(&self, name: &str) -> Option> { + async fn table(&self, name: &str) -> Option> { match name { QUERIES_TABLE => Some(Arc::clone(&self.queries)), _ => None, diff --git a/query_functions/src/gapfill.rs b/query_functions/src/gapfill.rs index 8158397e95..a6a5320e47 100644 --- a/query_functions/src/gapfill.rs +++ b/query_functions/src/gapfill.rs @@ -66,7 +66,7 @@ mod test { let times = Arc::new(TimestampNanosecondArray::from(vec![Some(1000)])); let rb = RecordBatch::try_from_iter(vec![("time", times as ArrayRef)])?; let ctx = context_with_table(rb); - let df = ctx.table("t")?.select(vec![date_bin_gapfill( + let df = ctx.table("t").await?.select(vec![date_bin_gapfill( lit_interval_milliseconds(360_000), col("time"), lit_timestamp_nano(0), diff --git a/query_functions/src/lib.rs b/query_functions/src/lib.rs index afb35e36c9..87ba94756c 100644 --- a/query_functions/src/lib.rs +++ b/query_functions/src/lib.rs @@ -12,8 +12,8 @@ )] use datafusion::{ - execution::{context::SessionState, FunctionRegistry}, - prelude::{lit, Expr}, + execution::FunctionRegistry, + prelude::{lit, Expr, SessionContext}, }; use group_by::WindowDuration; use window::EncodedWindowDuration; @@ -96,14 +96,12 @@ pub fn registry() -> &'static dyn FunctionRegistry { } /// registers scalar functions so they can be invoked via SQL -pub fn register_scalar_functions(mut state: SessionState) -> SessionState { +pub fn register_scalar_functions(ctx: &SessionContext) { let registry = registry(); for f in registry.udfs() { let udf = registry.udf(&f).unwrap(); - state.scalar_functions.insert(f, udf); + ctx.register_udf(udf.as_ref().clone()) } - - state } #[cfg(test)] @@ -131,6 +129,7 @@ mod test { let ctx = context_with_table(batch); let result = ctx .table("t") + .await .unwrap() .filter(regex_match_expr(col("data"), "Foo".into())) .unwrap() @@ -163,6 +162,7 @@ mod test { let ctx = context_with_table(batch); let result = ctx .table("t") + .await .unwrap() .filter(regex_not_match_expr(col("data"), "Foo".into())) .unwrap() @@ -191,6 +191,7 @@ mod test { let ctx = context_with_table(batch); let result = ctx .table("t") + .await .unwrap() .select(vec![ col("time"), diff --git a/query_functions/src/regex.rs b/query_functions/src/regex.rs index e6ea4fab36..ea5c6f0a93 100644 --- a/query_functions/src/regex.rs +++ b/query_functions/src/regex.rs @@ -343,7 +343,7 @@ mod test { .unwrap(); let ctx = context_with_table(rb); - let df = ctx.table("t").unwrap(); + let df = ctx.table("t").await.unwrap(); let df = df.filter(op).unwrap(); // execute the query diff --git a/query_functions/src/selectors.rs b/query_functions/src/selectors.rs index 5afd930c29..d901432742 100644 --- a/query_functions/src/selectors.rs +++ b/query_functions/src/selectors.rs @@ -22,9 +22,9 @@ use arrow::{ }; use datafusion::{ error::{DataFusionError, Result as DataFusionResult}, - execution::context::SessionState, logical_expr::{AccumulatorFunctionImplementation, Signature, TypeSignature, Volatility}, physical_plan::{udaf::AggregateUDF, Accumulator}, + prelude::SessionContext, scalar::ScalarValue, }; @@ -40,26 +40,11 @@ use internal::{ use schema::TIME_DATA_TYPE; /// registers selector functions so they can be invoked via SQL -pub fn register_selector_aggregates(mut state: SessionState) -> SessionState { - let first = struct_selector_first(); - let last = struct_selector_last(); - let min = struct_selector_min(); - let max = struct_selector_max(); - - //TODO make a nicer api for this in DataFusion - state - .aggregate_functions - .insert(first.name.to_string(), first); - - state - .aggregate_functions - .insert(last.name.to_string(), last); - - state.aggregate_functions.insert(min.name.to_string(), min); - - state.aggregate_functions.insert(max.name.to_string(), max); - - state +pub fn register_selector_aggregates(ctx: &SessionContext) { + ctx.register_udaf(struct_selector_first()); + ctx.register_udaf(struct_selector_last()); + ctx.register_udaf(struct_selector_min()); + ctx.register_udaf(struct_selector_max()); } /// Returns a DataFusion user defined aggregate function for computing @@ -76,11 +61,11 @@ pub fn register_selector_aggregates(mut state: SessionState) -> SessionState { /// /// If there are multiple rows with the minimum timestamp value, the /// value is arbitrary -pub fn struct_selector_first() -> Arc { - Arc::new(make_uda( +pub fn struct_selector_first() -> AggregateUDF { + make_uda( "selector_first", FactoryBuilder::new(SelectorType::First, SelectorOutput::Struct), - )) + ) } /// Returns a DataFusion user defined aggregate function for computing @@ -97,11 +82,11 @@ pub fn struct_selector_first() -> Arc { /// /// If there are multiple rows with the maximum timestamp value, the /// value is arbitrary -pub fn struct_selector_last() -> Arc { - Arc::new(make_uda( +pub fn struct_selector_last() -> AggregateUDF { + make_uda( "selector_last", FactoryBuilder::new(SelectorType::Last, SelectorOutput::Struct), - )) + ) } /// Returns a DataFusion user defined aggregate function for computing @@ -118,11 +103,11 @@ pub fn struct_selector_last() -> Arc { /// /// If there are multiple rows with the same minimum value, the value /// with the first (earliest/smallest) timestamp is chosen -pub fn struct_selector_min() -> Arc { - Arc::new(make_uda( +pub fn struct_selector_min() -> AggregateUDF { + make_uda( "selector_min", FactoryBuilder::new(SelectorType::Min, SelectorOutput::Struct), - )) + ) } /// Returns a DataFusion user defined aggregate function for computing @@ -139,11 +124,11 @@ pub fn struct_selector_min() -> Arc { /// /// If there are multiple rows with the same maximum value, the value /// with the first (earliest/smallest) timestamp is chosen -pub fn struct_selector_max() -> Arc { - Arc::new(make_uda( +pub fn struct_selector_max() -> AggregateUDF { + make_uda( "selector_max", FactoryBuilder::new(SelectorType::Max, SelectorOutput::Struct), - )) + ) } /// Returns a DataFusion user defined aggregate function for computing @@ -1346,7 +1331,7 @@ mod test { let ctx = SessionContext::new(); ctx.register_table("t", Arc::new(provider)).unwrap(); - let df = ctx.table("t").unwrap(); + let df = ctx.table("t").await.unwrap(); let df = df.aggregate(vec![], aggs).unwrap(); // execute the query diff --git a/query_tests/cases/in/dedup_and_predicates_parquet.expected b/query_tests/cases/in/dedup_and_predicates_parquet.expected index f4067049ad..d56213f83e 100644 --- a/query_tests/cases/in/dedup_and_predicates_parquet.expected +++ b/query_tests/cases/in/dedup_and_predicates_parquet.expected @@ -33,23 +33,22 @@ +-----+-----+-----+----------------------+ -- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A'; -- Results After Normalizing UUIDs -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | -| | Filter: table.tag = Dictionary(Int32, Utf8("A")) | -| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] | -| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: tag@2 = A | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | -| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | +| | Filter: table.tag = Dictionary(Int32, Utf8("A")) | +| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] | +| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: tag@2 = A | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * FROM "table" WHERE foo=1 AND bar=2; +-----+-----+-----+----------------------+ | bar | foo | tag | time | @@ -58,23 +57,22 @@ +-----+-----+-----+----------------------+ -- SQL: EXPLAIN SELECT * FROM "table" WHERE foo=1 AND bar=2; -- Results After Normalizing UUIDs -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | -| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) | -| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] | -| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: foo@1 = 1 AND bar@0 = 2 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | -| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | +| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) | +| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] | +| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: foo@1 = 1 AND bar@0 = 2 | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * FROM "table" WHERE time=to_timestamp('1970-01-01T00:00:00.000000000+00:00') ORDER BY tag; +-----+-----+-----+----------------------+ | bar | foo | tag | time | @@ -94,7 +92,7 @@ | physical_plan | SortExec: [tag@2 ASC NULLS LAST] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@3 = 0 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | DeduplicateExec: [tag@2 ASC,time@3 ASC] | @@ -112,20 +110,19 @@ +-----+-----+-----+----------------------+ -- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A' AND foo=1 AND time=to_timestamp('1970-01-01T00:00:00.000000000+00:00'); -- Results After Normalizing UUIDs -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | -| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) | -| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] | -| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | -| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | +| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) | +| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] | +| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected b/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected index c7a05cd987..c7fa1ecd41 100644 --- a/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected +++ b/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected @@ -34,24 +34,23 @@ +-----+-----+-----+----------------------+ -- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A'; -- Results After Normalizing UUIDs -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | -| | Filter: table.tag = Dictionary(Int32, Utf8("A")) | -| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] | -| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: tag@2 = A | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | -| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | SortExec: [tag@2 ASC,time@3 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 | -| | | -+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | +| | Filter: table.tag = Dictionary(Int32, Utf8("A")) | +| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] | +| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: tag@2 = A | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | SortExec: [tag@2 ASC,time@3 ASC] | +| | RecordBatchesExec: batches_groups=1 batches=1 | +| | | ++---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * FROM "table" WHERE foo=1 AND bar=2; +-----+-----+-----+----------------------+ | bar | foo | tag | time | @@ -60,24 +59,23 @@ +-----+-----+-----+----------------------+ -- SQL: EXPLAIN SELECT * FROM "table" WHERE foo=1 AND bar=2; -- Results After Normalizing UUIDs -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | -| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) | -| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] | -| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: foo@1 = 1 AND bar@0 = 2 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | -| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | SortExec: [tag@2 ASC,time@3 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | +| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) | +| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] | +| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: foo@1 = 1 AND bar@0 = 2 | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | SortExec: [tag@2 ASC,time@3 ASC] | +| | RecordBatchesExec: batches_groups=1 batches=1 | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * FROM "table" WHERE time=to_timestamp('1970-01-01T00:00:00.000000000+00:00') ORDER BY tag; +-----+-----+-----+----------------------+ | bar | foo | tag | time | @@ -97,7 +95,7 @@ | physical_plan | SortExec: [tag@2 ASC NULLS LAST] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@3 = 0 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | DeduplicateExec: [tag@2 ASC,time@3 ASC] | @@ -116,21 +114,20 @@ +-----+-----+-----+----------------------+ -- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A' AND foo=1 AND time=to_timestamp('1970-01-01T00:00:00.000000000+00:00'); -- Results After Normalizing UUIDs -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | -| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) | -| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] | -| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | -| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | -| | SortExec: [tag@2 ASC,time@3 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 | -| | | -+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: table.bar, table.foo, table.tag, table.time | +| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) | +| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] | +| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] | +| | SortExec: [tag@2 ASC,time@3 ASC] | +| | RecordBatchesExec: batches_groups=1 batches=1 | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/duplicates_ingester.expected b/query_tests/cases/in/duplicates_ingester.expected index 3c5fa34bbe..73cad6fe3c 100644 --- a/query_tests/cases/in/duplicates_ingester.expected +++ b/query_tests/cases/in/duplicates_ingester.expected @@ -25,24 +25,25 @@ +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: EXPLAIN select time, state, city, min_temp, max_temp, area from h2o; -- Results After Normalizing UUIDs -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area | -| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] | -| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | -| | UnionExec | -| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | -| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | -| | DeduplicateExec: [city@1 ASC,state@4 ASC,time@5 ASC] | -| | SortExec: [city@1 ASC,state@4 ASC,time@5 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] | -| | | -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area | +| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] | +| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | UnionExec | +| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | +| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | +| | DeduplicateExec: [city@1 ASC,state@4 ASC,time@5 ASC] | +| | SortExec: [city@1 ASC,state@4 ASC,time@5 ASC] | +| | RecordBatchesExec: batches_groups=1 batches=1 | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] | +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: EXPLAIN select state as name from h2o UNION ALL select city as name from h2o; -- Results After Normalizing UUIDs +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/duplicates_parquet.expected b/query_tests/cases/in/duplicates_parquet.expected index 57f3aee984..14b4a28b8d 100644 --- a/query_tests/cases/in/duplicates_parquet.expected +++ b/query_tests/cases/in/duplicates_parquet.expected @@ -22,21 +22,22 @@ +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: EXPLAIN select time, state, city, min_temp, max_temp, area from h2o; -- Results After Normalizing UUIDs -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area | -| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] | -| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | -| | UnionExec | -| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | -| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | -| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] | -| | | -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area | +| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] | +| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | UnionExec | +| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | +| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] | +| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] | +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: EXPLAIN select state as name from h2o UNION ALL select city as name from h2o; -- Results After Normalizing UUIDs +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -84,7 +85,7 @@ ---------- | Plan with Metrics | CoalescePartitionsExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | | | ProjectionExec: expr=[area@0 as area, city@1 as city, max_temp@2 as max_temp, min_temp@3 as min_temp, state@4 as state, time@5 as time], metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | -| | CoalesceBatchesExec: target_batch_size=4096, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | +| | CoalesceBatchesExec: target_batch_size=8192, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | | | FilterExec: state@4 = MA, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | | | RepartitionExec: partitioning=RoundRobinBatch(4), metrics=[fetch_time=1.234ms, repart_time=1.234ms, send_time=1.234ms] | | | UnionExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | diff --git a/query_tests/cases/in/pushdown.expected b/query_tests/cases/in/pushdown.expected index 8a472e36f1..336aa522df 100644 --- a/query_tests/cases/in/pushdown.expected +++ b/query_tests/cases/in/pushdown.expected @@ -14,15 +14,16 @@ +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant; -- Results After Normalizing UUIDs -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | TableScan: restaurant projection=[count, system, time, town] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[count, system, time, town] | -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | TableScan: restaurant projection=[count, system, time, town] | +| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[count, system, time, town] | +| | | ++---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where count > 200; -- Results After Sorting +-------+--------+--------------------------------+-----------+ @@ -44,7 +45,7 @@ | | Filter: restaurant.count > UInt64(200) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200), pruning_predicate=count_max@0 > 200, projection=[count, system, time, town] | @@ -59,7 +60,7 @@ | | Filter: CAST(restaurant.count AS Float64) > Float64(200) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Float64) > Float64(200)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: CAST(count@0 AS Float64) > 200 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=CAST(count AS Float64) > Float64(200), projection=[count, system, time, town] | @@ -74,7 +75,7 @@ | | Filter: restaurant.system > Float64(4) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(4)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 4 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4), pruning_predicate=system_max@0 > 4, projection=[count, system, time, town] | @@ -100,7 +101,7 @@ | | Filter: restaurant.count > UInt64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury"))] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND town@3 != tewsbury | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2, projection=[count, system, time, town] | @@ -125,7 +126,7 @@ | | Filter: restaurant.count > UInt64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2, projection=[count, system, time, town] | @@ -149,7 +150,7 @@ | | Filter: restaurant.count > UInt64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) AND restaurant.count < UInt64(40000) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), restaurant.count < UInt64(40000)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence AND count@0 < 40000 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2 AND count_min@5 < 40000, projection=[count, system, time, town] | @@ -175,7 +176,7 @@ | | Filter: restaurant.count > UInt64(200) AND restaurant.count < UInt64(40000) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.count < UInt64(40000)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND count@0 < 40000 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND count_min@1 < 40000, projection=[count, system, time, town] | @@ -202,7 +203,7 @@ | | Filter: restaurant.system > Float64(4) AND restaurant.system < Float64(7) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(4), restaurant.system < Float64(7)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 4 AND system@1 < 7 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4) AND system < Float64(7), pruning_predicate=system_max@0 > 4 AND system_min@1 < 7, projection=[count, system, time, town] | @@ -226,7 +227,7 @@ | | Filter: restaurant.system > Float64(5) AND restaurant.system < Float64(7) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), restaurant.system < Float64(7)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 5 AND system@1 < 7 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND system < Float64(7), pruning_predicate=system_max@0 > 5 AND system_min@1 < 7, projection=[count, system, time, town] | @@ -249,7 +250,7 @@ | | Filter: restaurant.system > Float64(5) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND Float64(7) > restaurant.system | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), Float64(7) > restaurant.system] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 5 AND town@3 != tewsbury AND 7 > system@1 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND town != Dictionary(Int32, Utf8("tewsbury")) AND Float64(7) > system, pruning_predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7, projection=[count, system, time, town] | @@ -271,7 +272,7 @@ | | Filter: restaurant.system > Float64(5) AND Dictionary(Int32, Utf8("tewsbury")) != restaurant.town AND restaurant.system < Float64(7) AND (restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), Dictionary(Int32, Utf8("tewsbury")) != restaurant.town, restaurant.system < Float64(7), restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 5 AND tewsbury != town@3 AND system@1 < 7 AND count@0 = 632 OR town@3 = reading | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND Dictionary(Int32, Utf8("tewsbury")) != town AND system < Float64(7) AND (count = UInt64(632) OR town = Dictionary(Int32, Utf8("reading"))), pruning_predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7 AND count_min@4 <= 632 AND 632 <= count_max@5 OR town_min@1 <= reading AND reading <= town_max@2, projection=[count, system, time, town] | @@ -290,9 +291,9 @@ | | Filter: Float64(5) < restaurant.system AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system < Float64(7) AND (restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))) AND restaurant.time > TimestampNanosecond(130, None) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[Float64(5) < restaurant.system, restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system < Float64(7), restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading")), restaurant.time > TimestampNanosecond(130, None)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: 5 < system@1 AND town@3 != tewsbury AND system@1 < 7 AND count@0 = 632 OR town@3 = reading AND time@2 > 130 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: 5 < system@1 AND town@3 != tewsbury AND system@1 < 7 AND count@0 = 632 OR town@3 = reading AND time@2 > 130 | | | EmptyExec: produce_one_row=false | | | | +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -320,18 +321,18 @@ +-------+--------+--------------------------------+---------+ -- SQL: EXPLAIN SELECT * from restaurant where influx_regex_match(town, 'foo|bar|baz') and influx_regex_not_match(town, 'one|two'); -- Results After Normalizing UUIDs -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(restaurant.town AS Utf8)restaurant.town !~ Utf8("one|two")) | -| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) AS restaurant.town !~ Utf8("one|two") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz"), CAST(restaurant.town AS Utf8) !~ Utf8("one|two")] | -| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 ~ foo|bar|baz AND CAST(restaurant.town AS Utf8)restaurant.town@0 !~ one|two | -| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) AS restaurant.town !~ Utf8("one|two")) AND (CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")), projection=[count, system, time, town] | -| | | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town LIKE Utf8("%foo%") OR CAST(restaurant.town AS Utf8)restaurant.town LIKE Utf8("%bar%") OR CAST(restaurant.town AS Utf8)restaurant.town LIKE Utf8("%baz%")) AND CAST(restaurant.town AS Utf8)restaurant.town NOT LIKE Utf8("%one%") AND CAST(restaurant.town AS Utf8)restaurant.town NOT LIKE Utf8("%two%") | +| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) AS restaurant.town LIKE Utf8("%foo%") OR CAST(restaurant.town AS Utf8) AS restaurant.town LIKE Utf8("%bar%") OR CAST(restaurant.town AS Utf8) AS restaurant.town LIKE Utf8("%baz%") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) AS restaurant.town NOT LIKE Utf8("%one%") AND CAST(restaurant.town AS Utf8) AS restaurant.town NOT LIKE Utf8("%two%") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) LIKE Utf8("%foo%") OR CAST(restaurant.town AS Utf8) LIKE Utf8("%bar%") OR CAST(restaurant.town AS Utf8) LIKE Utf8("%baz%"), CAST(restaurant.town AS Utf8) NOT LIKE Utf8("%one%"), CAST(restaurant.town AS Utf8) NOT LIKE Utf8("%two%")] | +| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %foo% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %bar% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %baz% AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %one% AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %two% | +| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%foo%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%bar%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%baz%")) AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%two%") AND (CAST(town AS Utf8) LIKE Utf8("%foo%") OR CAST(town AS Utf8) LIKE Utf8("%bar%") OR CAST(town AS Utf8) LIKE Utf8("%baz%")) AND CAST(town AS Utf8) NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) NOT LIKE Utf8("%two%"), projection=[count, system, time, town] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/retention.expected b/query_tests/cases/in/retention.expected index ec8856fb1f..4f1229c635 100644 --- a/query_tests/cases/in/retention.expected +++ b/query_tests/cases/in/retention.expected @@ -9,30 +9,27 @@ +------+------+----------------------+ -- SQL: EXPLAIN SELECT * FROM cpu order by host, load, time; -- Results After Normalizing UUIDs -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.load ASC NULLS LAST, cpu.time ASC NULLS LAST | -| | Projection: cpu.host, cpu.load, cpu.time | -| | TableScan: cpu projection=[host, load, time] | -| physical_plan | SortExec: [host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] | -| | CoalescePartitionsExec | -| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [host@0 ASC,time@2 ASC] | -| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] | -| | SortExec: [host@0 ASC,time@2 ASC] | -| | UnionExec | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.load ASC NULLS LAST, cpu.time ASC NULLS LAST | +| | Projection: cpu.host, cpu.load, cpu.time | +| | TableScan: cpu projection=[host, load, time] | +| physical_plan | SortExec: [host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] | +| | CoalescePartitionsExec | +| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | DeduplicateExec: [host@0 ASC,time@2 ASC] | +| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] | +| | UnionExec | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time; +------+------+----------------------+ | host | load | time | @@ -42,30 +39,27 @@ +------+------+----------------------+ -- SQL: EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time; -- Results After Normalizing UUIDs -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.time ASC NULLS LAST | -| | Projection: cpu.host, cpu.load, cpu.time | -| | Filter: cpu.host != Dictionary(Int32, Utf8("b")) | -| | TableScan: cpu projection=[host, load, time], partial_filters=[cpu.host != Dictionary(Int32, Utf8("b"))] | -| physical_plan | SortExec: [host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] | -| | CoalescePartitionsExec | -| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: host@0 != b | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | DeduplicateExec: [host@0 ASC,time@2 ASC] | -| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] | -| | SortExec: [host@0 ASC,time@2 ASC] | -| | UnionExec | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | -| | | -+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.time ASC NULLS LAST | +| | Projection: cpu.host, cpu.load, cpu.time | +| | Filter: cpu.host != Dictionary(Int32, Utf8("b")) | +| | TableScan: cpu projection=[host, load, time], partial_filters=[cpu.host != Dictionary(Int32, Utf8("b"))] | +| physical_plan | SortExec: [host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] | +| | CoalescePartitionsExec | +| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: host@0 != b | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | DeduplicateExec: [host@0 ASC,time@2 ASC] | +| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] | +| | UnionExec | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | +| | CoalesceBatchesExec: target_batch_size=8192 | +| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] | +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/several_chunks.expected b/query_tests/cases/in/several_chunks.expected index 62848cefb2..e028700b8c 100644 --- a/query_tests/cases/in/several_chunks.expected +++ b/query_tests/cases/in/several_chunks.expected @@ -14,25 +14,26 @@ +---------+------------+-------+------+--------------------------------+ -- SQL: EXPLAIN SELECT * from h2o; -- Results After Normalizing UUIDs -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: h2o.city, h2o.other_temp, h2o.state, h2o.temp, h2o.time | -| | TableScan: h2o projection=[city, other_temp, state, temp, time] | -| physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] | -| | UnionExec | -| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | -| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] | -| | UnionExec | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | -| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | -| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 | -| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[city, other_temp, state, temp, time] | -| | | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: h2o.city, h2o.other_temp, h2o.state, h2o.temp, h2o.time | +| | TableScan: h2o projection=[city, other_temp, state, temp, time] | +| physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | UnionExec | +| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | +| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] | +| | UnionExec | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | +| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | +| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] | +| | RecordBatchesExec: batches_groups=1 batches=1 | +| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[city, other_temp, state, temp, time] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: select temp, other_temp, time from h2o; -- Results After Sorting +------+------------+--------------------------------+ @@ -80,7 +81,7 @@ | | Filter: h2o.time >= TimestampNanosecond(250, None) | | | TableScan: h2o projection=[city, other_temp, state, temp, time], partial_filters=[h2o.time >= TimestampNanosecond(250, None)] | | physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] | -| | CoalesceBatchesExec: target_batch_size=4096 | +| | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@4 >= 250 | | | RepartitionExec: partitioning=RoundRobinBatch(4) | | | UnionExec | diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index c8096ec041..f3f8aa6442 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -28,7 +28,7 @@ bytes = { version = "1", features = ["std"] } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] } crossbeam-utils = { version = "0.8", features = ["std"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } +datafusion = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/patched_for_iox", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] } either = { version = "1", features = ["use_std"] } fixedbitset = { version = "0.4", features = ["std"] }