chore: Update datafusion to Jan 9, 2023 (1 / 2) (#6603)

* refactor: Update DataFusion pin to early Jan 2023

* fix: Update tests now that planning is async

* fix: Updates for API changes

* chore: Run cargo hakari tasks

* fix: Update comment

* refactor: nicer config setup

* fix: gapfill async

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Andrew Lamb 2023-01-18 13:19:32 +01:00 committed by GitHub
parent 4e0fd0645b
commit 57f08dbccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 411 additions and 438 deletions

69
Cargo.lock generated
View File

@ -1337,8 +1337,8 @@ dependencies = [
[[package]]
name = "datafusion"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1358,6 +1358,7 @@ dependencies = [
"futures",
"glob",
"hashbrown 0.13.2",
"indexmap",
"itertools",
"lazy_static",
"log",
@ -1370,7 +1371,7 @@ dependencies = [
"pin-project-lite",
"rand",
"smallvec",
"sqlparser 0.28.0",
"sqlparser",
"tempfile",
"tokio",
"tokio-stream",
@ -1382,32 +1383,33 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"arrow",
"chrono",
"num_cpus",
"object_store",
"parquet",
"sqlparser 0.28.0",
"sqlparser",
]
[[package]]
name = "datafusion-expr"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"ahash 0.8.2",
"arrow",
"datafusion-common",
"log",
"sqlparser 0.28.0",
"sqlparser",
]
[[package]]
name = "datafusion-optimizer"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"arrow",
"async-trait",
@ -1417,12 +1419,13 @@ dependencies = [
"datafusion-physical-expr",
"hashbrown 0.13.2",
"log",
"regex-syntax",
]
[[package]]
name = "datafusion-physical-expr"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"ahash 0.8.2",
"arrow",
@ -1436,6 +1439,7 @@ dependencies = [
"datafusion-row",
"half 2.1.0",
"hashbrown 0.13.2",
"indexmap",
"itertools",
"lazy_static",
"md-5",
@ -1450,8 +1454,8 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"arrow",
"chrono",
@ -1467,8 +1471,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"arrow",
"datafusion-common",
@ -1478,14 +1482,14 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "15.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=07f49803a3d7a9e9b3c2c9a7714c1bb08db71385#07f49803a3d7a9e9b3c2c9a7714c1bb08db71385"
version = "16.0.0"
source = "git+https://github.com/alamb/arrow-datafusion.git?branch=alamb/patched_for_iox#baeb5a042a0169027df5345ace5f5ccc146f4603"
dependencies = [
"arrow-schema",
"datafusion-common",
"datafusion-expr",
"log",
"sqlparser 0.28.0",
"sqlparser",
]
[[package]]
@ -2508,7 +2512,7 @@ version = "0.1.0"
dependencies = [
"generated_types",
"snafu",
"sqlparser 0.30.0",
"sqlparser",
"workspace-hack",
]
@ -4178,7 +4182,7 @@ dependencies = [
"query_functions",
"schema",
"snafu",
"sqlparser 0.30.0",
"sqlparser",
"test_helpers",
"workspace-hack",
]
@ -5319,15 +5323,6 @@ dependencies = [
"unicode_categories",
]
[[package]]
name = "sqlparser"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "249ae674b9f636b8ff64d8bfe218774cf05a26de40fd9f358669dccc4c0a9d7d"
dependencies = [
"log",
]
[[package]]
name = "sqlparser"
version = "0.30.0"
@ -5335,6 +5330,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db67dc6ef36edb658196c3fef0464a80b53dbbc194a904e81f9bd4190f9ecc5b"
dependencies = [
"log",
"sqlparser_derive",
]
[[package]]
name = "sqlparser_derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]

View File

@ -115,8 +115,12 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "29.0.0" }
arrow-flight = { version = "29.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" }
#datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", default-features = false }
#datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="07f49803a3d7a9e9b3c2c9a7714c1bb08db71385" }
# Temporary patch to https://github.com/alamb/arrow-datafusion/tree/alamb/patched_for_iox
# See https://github.com/alamb/arrow-datafusion/pull/7 for details
datafusion = { git = "https://github.com/alamb/arrow-datafusion.git", branch="alamb/patched_for_iox", default-features = false }
datafusion-proto = { git = "https://github.com/alamb/arrow-datafusion.git", branch="alamb/patched_for_iox" }
hashbrown = { version = "0.13.2" }
parquet = { version = "29.0.0" }

View File

@ -1,9 +1,4 @@
use datafusion::{
config::{
OPT_COALESCE_TARGET_BATCH_SIZE, OPT_PARQUET_PUSHDOWN_FILTERS, OPT_PARQUET_REORDER_FILTERS,
},
prelude::SessionConfig,
};
use datafusion::{config::ConfigOptions, prelude::SessionConfig};
// The default catalog name - this impacts what SQL queries use if not specified
pub const DEFAULT_CATALOG: &str = "public";
@ -13,19 +8,15 @@ pub const DEFAULT_SCHEMA: &str = "iox";
/// The maximum number of rows that DataFusion should create in each RecordBatch
pub const BATCH_SIZE: usize = 8 * 1024;
const COALESCE_BATCH_SIZE: usize = BATCH_SIZE / 2;
/// Return a SessionConfig object configured for IOx
pub fn iox_session_config() -> SessionConfig {
SessionConfig::new()
// Enable parquet predicate pushdown optimization
let mut options = ConfigOptions::new();
options.execution.parquet.pushdown_filters = true;
options.execution.parquet.reorder_filters = true;
SessionConfig::from(options)
.with_batch_size(BATCH_SIZE)
.set_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
COALESCE_BATCH_SIZE.try_into().unwrap(),
)
// Enable parquet predicate pushdown optimization
.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, true)
.set_bool(OPT_PARQUET_REORDER_FILTERS, true)
.with_create_default_catalog_and_schema(true)
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA)

View File

@ -20,7 +20,7 @@ use crate::{
split::StreamSplitExec,
stringset::{IntoStringSet, StringSetRef},
},
logical_optimizer::iox_optimizer,
logical_optimizer::register_iox_optimizers,
plan::{
fieldlist::FieldListPlan,
seriesset::{SeriesSetPlan, SeriesSetPlans},
@ -222,13 +222,11 @@ impl IOxSessionConfig {
let state = SessionState::with_config_rt(session_config, self.runtime)
.with_query_planner(Arc::new(IOxQueryPlanner {}));
let state = register_selector_aggregates(state);
let mut state = register_scalar_functions(state);
state.optimizer = iox_optimizer();
let state = register_iox_optimizers(state);
let inner = SessionContext::with_state(state);
register_selector_aggregates(&inner);
register_scalar_functions(&inner);
if let Some(default_catalog) = self.default_catalog {
inner.register_catalog(DEFAULT_CATALOG, default_catalog);
}
@ -311,9 +309,9 @@ impl IOxSessionContext {
let ctx = self.child_ctx("prepare_sql");
debug!(text=%sql, "planning SQL query");
// NOTE can not use ctx.inner.sql here as it also interprets DDL
// NOTE can not use ctx.inner.sql() here as it also interprets DDL
#[allow(deprecated)]
let logical_plan = ctx.inner.create_logical_plan(sql)?;
let logical_plan = ctx.inner.state().create_logical_plan(sql).await?;
debug!(plan=%logical_plan.display_graphviz(), "logical plan");
// Make nicer erorrs for unsupported SQL
@ -347,7 +345,7 @@ impl IOxSessionContext {
pub async fn create_physical_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let mut ctx = self.child_ctx("create_physical_plan");
debug!(text=%plan.display_indent_schema(), "create_physical_plan: initial plan");
let physical_plan = ctx.inner.create_physical_plan(plan).await?;
let physical_plan = ctx.inner.state().create_physical_plan(plan).await?;
ctx.recorder.event("physical plan");
debug!(text=%displayable(physical_plan.as_ref()).indent(), "create_physical_plan: plan to run");
@ -670,13 +668,13 @@ pub trait SessionContextIOxExt {
impl SessionContextIOxExt for SessionState {
fn child_span(&self, name: &'static str) -> Option<Span> {
self.config
self.config()
.get_extension::<Option<Span>>()
.and_then(|span| span.as_ref().as_ref().map(|span| span.child(name)))
}
fn span_ctx(&self) -> Option<SpanContext> {
self.config
self.config()
.get_extension::<Option<Span>>()
.and_then(|span| span.as_ref().as_ref().map(|span| span.ctx.clone()))
}

View File

@ -40,7 +40,7 @@ impl InfluxQLQueryPlanner {
}
let planner = InfluxQLToLogicalPlan::new(&ctx, database);
let logical_plan = planner.statement_to_plan(statements.pop().unwrap())?;
let logical_plan = planner.statement_to_plan(statements.pop().unwrap()).await?;
debug!(plan=%logical_plan.display_graphviz(), "logical plan");
// This would only work for SELECT statements at the moment, as the schema queries do

View File

@ -1,17 +1,14 @@
use std::sync::Arc;
use datafusion::optimizer::optimizer::Optimizer;
use datafusion::execution::context::SessionState;
use self::influx_regex_to_datafusion_regex::InfluxRegexToDataFusionRegex;
mod influx_regex_to_datafusion_regex;
/// Create IOx-specific logical [`Optimizer`].
/// Register IOx-specific logical [`OptimizerRule`]s with the SessionContext
///
/// This is mostly the default optimizer that DataFusion provides but with some additional passes.
pub fn iox_optimizer() -> Optimizer {
let mut opt = Optimizer::new();
opt.rules
.push(Arc::new(InfluxRegexToDataFusionRegex::new()));
opt
/// [`OptimizerRule`]: datafusion::optimizer::OptimizerRule
pub fn register_iox_optimizers(state: SessionState) -> SessionState {
state.add_optimizer_rule(Arc::new(InfluxRegexToDataFusionRegex::new()))
}

View File

@ -8,14 +8,13 @@ mod var_ref;
use crate::plan::influxql::rewriter::rewrite_statement;
use crate::{DataFusionError, IOxSessionContext, QueryNamespace};
use datafusion::common::{DFSchema, Result, ScalarValue};
use datafusion::execution::context::SessionState;
use datafusion::datasource::provider_as_source;
use datafusion::logical_expr::expr_rewriter::normalize_col;
use datafusion::logical_expr::logical_plan::builder::project;
use datafusion::logical_expr::{
lit, BinaryExpr, BuiltinScalarFunction, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::prelude::Column;
use datafusion::sql::planner::ContextProvider;
use datafusion::sql::TableReference;
use influxdb_influxql_parser::expression::{
BinaryOperator, ConditionalExpression, ConditionalOperator, VarRefDataType,
@ -56,19 +55,14 @@ enum ExprScope {
pub struct InfluxQLToLogicalPlan<'a> {
ctx: &'a IOxSessionContext,
database: Arc<dyn QueryNamespace>,
state: SessionState,
}
impl<'a> InfluxQLToLogicalPlan<'a> {
pub fn new(ctx: &'a IOxSessionContext, database: Arc<dyn QueryNamespace>) -> Self {
Self {
ctx,
database,
state: ctx.inner().state(),
}
Self { ctx, database }
}
pub fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
pub async fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::CreateDatabase(_) => {
Err(DataFusionError::NotImplemented("CREATE DATABASE".into()))
@ -80,7 +74,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Statement::Explain(_) => Err(DataFusionError::NotImplemented("EXPLAIN".into())),
Statement::Select(select) => {
let select = rewrite_statement(self.database.as_meta(), &select)?;
self.select_statement_to_plan(select)
self.select_statement_to_plan(select).await
}
Statement::ShowDatabases(_) => {
Err(DataFusionError::NotImplemented("SHOW DATABASES".into()))
@ -104,9 +98,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
/// Create a [`LogicalPlan`] from the specified InfluxQL `SELECT` statement.
fn select_statement_to_plan(&self, select: SelectStatement) -> Result<LogicalPlan> {
async fn select_statement_to_plan(&self, select: SelectStatement) -> Result<LogicalPlan> {
// Process FROM clause
let plans = self.plan_from_tables(select.from)?;
let plans = self.plan_from_tables(select.from).await?;
// Only support a single measurement to begin with
let plan = match plans.len() {
@ -398,12 +392,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
/// Generate a list of logical plans for each of the tables references in the `FROM`
/// clause.
fn plan_from_tables(&self, from: FromMeasurementClause) -> Result<Vec<LogicalPlan>> {
from.iter()
.map(|ms| match ms {
async fn plan_from_tables(&self, from: FromMeasurementClause) -> Result<Vec<LogicalPlan>> {
let mut plans = vec![];
for ms in from.iter() {
let plan = match ms {
MeasurementSelection::Name(qn) => match qn.name {
MeasurementName::Name(ref ident) => {
self.create_table_ref(normalize_identifier(ident))
self.create_table_ref(normalize_identifier(ident)).await
}
// rewriter is expected to expand the regular expression
MeasurementName::Regex(_) => Err(DataFusionError::Internal(
@ -413,17 +408,19 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
MeasurementSelection::Subquery(_) => Err(DataFusionError::NotImplemented(
"subquery in FROM clause".into(),
)),
})
.collect()
}?;
plans.push(plan);
}
Ok(plans)
}
/// Create a [LogicalPlan] that refers to the specified `table_name` or
/// an [LogicalPlan::EmptyRelation] if the table does not exist.
fn create_table_ref(&self, table_name: String) -> Result<LogicalPlan> {
async fn create_table_ref(&self, table_name: String) -> Result<LogicalPlan> {
let table_ref: TableReference<'_> = table_name.as_str().into();
if let Ok(provider) = self.state.get_table_provider(table_ref) {
LogicalPlanBuilder::scan(&table_name, provider, None)?.build()
if let Ok(provider) = self.ctx.inner().table_provider(table_ref).await {
LogicalPlanBuilder::scan(&table_name, provider_as_source(provider), None)?.build()
} else {
LogicalPlanBuilder::empty(false).build()
}
@ -469,7 +466,7 @@ mod test {
use influxdb_influxql_parser::parse_statements;
use insta::assert_snapshot;
fn plan(sql: &str) -> String {
async fn plan(sql: &str) -> String {
let mut statements = parse_statements(sql).unwrap();
// index of columns in the above chunk: [bar, foo, i64_field, i64_field_2, time]
let executor = Arc::new(Executor::new_testing());
@ -502,7 +499,7 @@ mod test {
let ctx = test_db.new_query_context(None);
let planner = InfluxQLToLogicalPlan::new(&ctx, test_db);
match planner.statement_to_plan(statements.pop().unwrap()) {
match planner.statement_to_plan(statements.pop().unwrap()).await {
Ok(res) => res.display_indent_schema().to_string(),
Err(err) => err.to_string(),
}
@ -511,18 +508,18 @@ mod test {
/// Verify the list of unsupported statements.
///
/// It is expected certain statements will be unsupported, indefinitely.
#[test]
fn test_unsupported_statements() {
assert_snapshot!(plan("CREATE DATABASE foo"));
assert_snapshot!(plan("DELETE FROM foo"));
assert_snapshot!(plan("DROP MEASUREMENT foo"));
assert_snapshot!(plan("EXPLAIN SELECT bar FROM foo"));
assert_snapshot!(plan("SHOW DATABASES"));
assert_snapshot!(plan("SHOW MEASUREMENTS"));
assert_snapshot!(plan("SHOW RETENTION POLICIES"));
assert_snapshot!(plan("SHOW TAG KEYS"));
assert_snapshot!(plan("SHOW TAG VALUES WITH KEY = bar"));
assert_snapshot!(plan("SHOW FIELD KEYS"));
#[tokio::test]
async fn test_unsupported_statements() {
assert_snapshot!(plan("CREATE DATABASE foo").await);
assert_snapshot!(plan("DELETE FROM foo").await);
assert_snapshot!(plan("DROP MEASUREMENT foo").await);
assert_snapshot!(plan("EXPLAIN SELECT bar FROM foo").await);
assert_snapshot!(plan("SHOW DATABASES").await);
assert_snapshot!(plan("SHOW MEASUREMENTS").await);
assert_snapshot!(plan("SHOW RETENTION POLICIES").await);
assert_snapshot!(plan("SHOW TAG KEYS").await);
assert_snapshot!(plan("SHOW TAG VALUES WITH KEY = bar").await);
assert_snapshot!(plan("SHOW FIELD KEYS").await);
}
/// Tests to validate InfluxQL `SELECT` statements that project columns without specifying
@ -531,25 +528,25 @@ mod test {
use super::*;
/// Select data from a single measurement
#[test]
fn test_single_measurement() {
assert_snapshot!(plan("SELECT f64_field FROM data"));
assert_snapshot!(plan("SELECT time, f64_field FROM data"));
assert_snapshot!(plan("SELECT time as timestamp, f64_field FROM data"));
assert_snapshot!(plan("SELECT foo, f64_field FROM data"));
assert_snapshot!(plan("SELECT foo, f64_field, i64_field FROM data"));
assert_snapshot!(plan("SELECT /^f/ FROM data"));
assert_snapshot!(plan("SELECT * FROM data"));
assert_snapshot!(plan("SELECT TIME FROM data")); // TIME is a field
#[tokio::test]
async fn test_single_measurement() {
assert_snapshot!(plan("SELECT f64_field FROM data").await);
assert_snapshot!(plan("SELECT time, f64_field FROM data").await);
assert_snapshot!(plan("SELECT time as timestamp, f64_field FROM data").await);
assert_snapshot!(plan("SELECT foo, f64_field FROM data").await);
assert_snapshot!(plan("SELECT foo, f64_field, i64_field FROM data").await);
assert_snapshot!(plan("SELECT /^f/ FROM data").await);
assert_snapshot!(plan("SELECT * FROM data").await);
assert_snapshot!(plan("SELECT TIME FROM data").await); // TIME is a field
}
/// Arithmetic expressions in the projection list
#[test]
fn test_simple_arithmetic_in_projection() {
assert_snapshot!(plan("SELECT foo, f64_field + f64_field FROM data"));
assert_snapshot!(plan("SELECT foo, sin(f64_field) FROM data"));
assert_snapshot!(plan("SELECT foo, atan2(f64_field, 2) FROM data"));
assert_snapshot!(plan("SELECT foo, f64_field + 0.5 FROM data"));
#[tokio::test]
async fn test_simple_arithmetic_in_projection() {
assert_snapshot!(plan("SELECT foo, f64_field + f64_field FROM data").await);
assert_snapshot!(plan("SELECT foo, sin(f64_field) FROM data").await);
assert_snapshot!(plan("SELECT foo, atan2(f64_field, 2) FROM data").await);
assert_snapshot!(plan("SELECT foo, f64_field + 0.5 FROM data").await);
}
// The following is an outline of additional scenarios to develop
@ -659,10 +656,10 @@ mod test {
/// Succeeds and returns null values for the expression
/// **Actual:**
/// Error during planning: 'Float64 + Utf8' can't be evaluated because there isn't a common type to coerce the types to
#[test]
#[tokio::test]
#[ignore]
fn test_select_coercion_from_str() {
assert_snapshot!(plan("SELECT f64_field + str_field::float FROM data"));
async fn test_select_coercion_from_str() {
assert_snapshot!(plan("SELECT f64_field + str_field::float FROM data").await);
}
/// **Issue:**
@ -673,10 +670,10 @@ mod test {
/// Succeeds and plan projection of f64_field is Float64
/// **Data:**
/// m0,tag0=val00 f64=99.0,i64=100i,str="lo",str_f64="5.5" 1667181600000000000
#[test]
#[tokio::test]
#[ignore]
fn test_select_explicit_cast() {
assert_snapshot!(plan("SELECT f64_field::integer FROM data"));
async fn test_select_explicit_cast() {
assert_snapshot!(plan("SELECT f64_field::integer FROM data").await);
}
/// **Issue:**
@ -685,14 +682,14 @@ mod test {
/// Succeeds and plans the query, returning null values for unknown columns
/// **Actual:**
/// Schema error: No field named 'TIME'. Valid fields are 'data'.'bar', 'data'.'bool_field', 'data'.'f64_field', 'data'.'foo', 'data'.'i64_field', 'data'.'mixedCase', 'data'.'str_field', 'data'.'time', 'data'.'with space'.
#[test]
#[tokio::test]
#[ignore]
fn test_select_case_sensitivity() {
async fn test_select_case_sensitivity() {
// should return no results
assert_snapshot!(plan("SELECT TIME, f64_Field FROM data"));
assert_snapshot!(plan("SELECT TIME, f64_Field FROM data").await);
// should bind to time and f64_field, and i64_Field should return NULL values
assert_snapshot!(plan("SELECT time, f64_field, i64_Field FROM data"));
assert_snapshot!(plan("SELECT time, f64_field, i64_Field FROM data").await);
}
}
}

View File

@ -156,8 +156,9 @@ impl ExecutionPlan for DeduplicateExec {
Some(&self.sort_keys)
}
fn relies_on_input_order(&self) -> bool {
true
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
// requires the input to be sorted on the primary key
vec![self.output_ordering()]
}
fn maintains_input_order(&self) -> bool {

View File

@ -198,6 +198,7 @@ pub fn chunks_to_physical_nodes(
limit: None,
table_partition_cols: vec![],
output_ordering,
infinite_source: false,
};
let meta_size_hint = None;
let parquet_exec = ParquetExec::new(base_config, predicate.filter_expr(), meta_size_hint);

View File

@ -232,6 +232,7 @@ struct TestDatabaseSchemaProvider {
partitions: BTreeMap<String, BTreeMap<ChunkId, Arc<TestChunk>>>,
}
#[async_trait]
impl SchemaProvider for TestDatabaseSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
@ -246,7 +247,7 @@ impl SchemaProvider for TestDatabaseSchemaProvider {
.collect()
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
Some(Arc::new(TestDatabaseTableProvider {
partitions: self
.partitions

View File

@ -123,6 +123,7 @@ impl ParquetExecInput {
table_partition_cols: vec![],
// Parquet files ARE actually sorted but we don't care here since we just construct a `collect` plan.
output_ordering: None,
infinite_source: false,
};
let exec = ParquetExec::new(base_config, None, None);
let exec_schema = exec.schema();

View File

@ -222,6 +222,7 @@ impl ParquetFileReader {
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};
// set up enough datafusion context to do the real read session

View File

@ -216,13 +216,13 @@ mod tests {
#[test]
fn test_unsupported_operator() {
let res = df_to_op(datafusion::logical_expr::Operator::Like);
let res = df_to_op(datafusion::logical_expr::Operator::Lt);
assert_contains!(res.unwrap_err().to_string(), "unsupported operator:");
}
#[test]
fn test_unsupported_operator_in_expr() {
let expr = col("foo").like(lit("x"));
let expr = col("foo").lt(lit("x"));
let res = df_to_expr(expr);
assert_contains!(res.unwrap_err().to_string(), "unsupported operator:");
}

View File

@ -99,10 +99,6 @@ fn is_comparison(op: Operator) -> bool {
Operator::Modulo => false,
Operator::And => true,
Operator::Or => true,
Operator::Like => true,
Operator::ILike => true,
Operator::NotLike => true,
Operator::NotILike => true,
Operator::IsDistinctFrom => true,
Operator::IsNotDistinctFrom => true,
Operator::RegexMatch => true,
@ -381,8 +377,6 @@ mod tests {
run_case(Operator::Modulo, false, lit(1), lit(2));
run_case(Operator::And, true, lit("foo"), lit("bar"));
run_case(Operator::Or, true, lit("foo"), lit("bar"));
run_case(Operator::Like, true, lit("foo"), lit("bar"));
run_case(Operator::NotLike, true, lit("foo"), lit("bar"));
run_case(Operator::IsDistinctFrom, true, lit("foo"), lit("bar"));
run_case(Operator::IsNotDistinctFrom, true, lit("foo"), lit("bar"));
run_case(Operator::RegexMatch, true, lit("foo"), lit("bar"));

View File

@ -163,6 +163,7 @@ struct UserSchemaProvider {
tables: Arc<HashMap<Arc<str>, Arc<QuerierTable>>>,
}
#[async_trait]
impl SchemaProvider for UserSchemaProvider {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
@ -174,7 +175,7 @@ impl SchemaProvider for UserSchemaProvider {
names
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
self.tables.get(name).map(|t| Arc::clone(t) as _)
}
@ -517,10 +518,10 @@ mod tests {
"| | CoalescePartitionsExec |",
"| | ProjectionExec: expr=[host@0 as host, perc@1 as perc, time@2 as time] |",
"| | UnionExec |",
"| | CoalesceBatchesExec: target_batch_size=4096 |",
"| | CoalesceBatchesExec: target_batch_size=8192 |",
"| | FilterExec: time@2 < 1 OR time@2 > 13 OR NOT host@0 = CAST(d AS Dictionary(Int32, Utf8)) |",
"| | ParquetExec: limit=None, partitions={1 group: [[1/2/1/4/<uuid>.parquet]]}, projection=[host, perc, time] |",
"| | CoalesceBatchesExec: target_batch_size=4096 |",
"| | CoalesceBatchesExec: target_batch_size=8192 |",
"| | FilterExec: time@2 < 1 OR time@2 > 13 OR NOT host@0 = CAST(d AS Dictionary(Int32, Utf8)) |",
"| | ParquetExec: limit=None, partitions={1 group: [[1/2/1/4/<uuid>.parquet]]}, projection=[host, perc, time] |",
"| | |",

View File

@ -43,6 +43,7 @@ impl SystemSchemaProvider {
}
}
#[async_trait]
impl SchemaProvider for SystemSchemaProvider {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
@ -55,7 +56,7 @@ impl SchemaProvider for SystemSchemaProvider {
.collect()
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
match name {
QUERIES_TABLE => Some(Arc::clone(&self.queries)),
_ => None,

View File

@ -66,7 +66,7 @@ mod test {
let times = Arc::new(TimestampNanosecondArray::from(vec![Some(1000)]));
let rb = RecordBatch::try_from_iter(vec![("time", times as ArrayRef)])?;
let ctx = context_with_table(rb);
let df = ctx.table("t")?.select(vec![date_bin_gapfill(
let df = ctx.table("t").await?.select(vec![date_bin_gapfill(
lit_interval_milliseconds(360_000),
col("time"),
lit_timestamp_nano(0),

View File

@ -12,8 +12,8 @@
)]
use datafusion::{
execution::{context::SessionState, FunctionRegistry},
prelude::{lit, Expr},
execution::FunctionRegistry,
prelude::{lit, Expr, SessionContext},
};
use group_by::WindowDuration;
use window::EncodedWindowDuration;
@ -96,14 +96,12 @@ pub fn registry() -> &'static dyn FunctionRegistry {
}
/// registers scalar functions so they can be invoked via SQL
pub fn register_scalar_functions(mut state: SessionState) -> SessionState {
pub fn register_scalar_functions(ctx: &SessionContext) {
let registry = registry();
for f in registry.udfs() {
let udf = registry.udf(&f).unwrap();
state.scalar_functions.insert(f, udf);
ctx.register_udf(udf.as_ref().clone())
}
state
}
#[cfg(test)]
@ -131,6 +129,7 @@ mod test {
let ctx = context_with_table(batch);
let result = ctx
.table("t")
.await
.unwrap()
.filter(regex_match_expr(col("data"), "Foo".into()))
.unwrap()
@ -163,6 +162,7 @@ mod test {
let ctx = context_with_table(batch);
let result = ctx
.table("t")
.await
.unwrap()
.filter(regex_not_match_expr(col("data"), "Foo".into()))
.unwrap()
@ -191,6 +191,7 @@ mod test {
let ctx = context_with_table(batch);
let result = ctx
.table("t")
.await
.unwrap()
.select(vec![
col("time"),

View File

@ -343,7 +343,7 @@ mod test {
.unwrap();
let ctx = context_with_table(rb);
let df = ctx.table("t").unwrap();
let df = ctx.table("t").await.unwrap();
let df = df.filter(op).unwrap();
// execute the query

View File

@ -22,9 +22,9 @@ use arrow::{
};
use datafusion::{
error::{DataFusionError, Result as DataFusionResult},
execution::context::SessionState,
logical_expr::{AccumulatorFunctionImplementation, Signature, TypeSignature, Volatility},
physical_plan::{udaf::AggregateUDF, Accumulator},
prelude::SessionContext,
scalar::ScalarValue,
};
@ -40,26 +40,11 @@ use internal::{
use schema::TIME_DATA_TYPE;
/// registers selector functions so they can be invoked via SQL
pub fn register_selector_aggregates(mut state: SessionState) -> SessionState {
let first = struct_selector_first();
let last = struct_selector_last();
let min = struct_selector_min();
let max = struct_selector_max();
//TODO make a nicer api for this in DataFusion
state
.aggregate_functions
.insert(first.name.to_string(), first);
state
.aggregate_functions
.insert(last.name.to_string(), last);
state.aggregate_functions.insert(min.name.to_string(), min);
state.aggregate_functions.insert(max.name.to_string(), max);
state
pub fn register_selector_aggregates(ctx: &SessionContext) {
ctx.register_udaf(struct_selector_first());
ctx.register_udaf(struct_selector_last());
ctx.register_udaf(struct_selector_min());
ctx.register_udaf(struct_selector_max());
}
/// Returns a DataFusion user defined aggregate function for computing
@ -76,11 +61,11 @@ pub fn register_selector_aggregates(mut state: SessionState) -> SessionState {
///
/// If there are multiple rows with the minimum timestamp value, the
/// value is arbitrary
pub fn struct_selector_first() -> Arc<AggregateUDF> {
Arc::new(make_uda(
pub fn struct_selector_first() -> AggregateUDF {
make_uda(
"selector_first",
FactoryBuilder::new(SelectorType::First, SelectorOutput::Struct),
))
)
}
/// Returns a DataFusion user defined aggregate function for computing
@ -97,11 +82,11 @@ pub fn struct_selector_first() -> Arc<AggregateUDF> {
///
/// If there are multiple rows with the maximum timestamp value, the
/// value is arbitrary
pub fn struct_selector_last() -> Arc<AggregateUDF> {
Arc::new(make_uda(
pub fn struct_selector_last() -> AggregateUDF {
make_uda(
"selector_last",
FactoryBuilder::new(SelectorType::Last, SelectorOutput::Struct),
))
)
}
/// Returns a DataFusion user defined aggregate function for computing
@ -118,11 +103,11 @@ pub fn struct_selector_last() -> Arc<AggregateUDF> {
///
/// If there are multiple rows with the same minimum value, the value
/// with the first (earliest/smallest) timestamp is chosen
pub fn struct_selector_min() -> Arc<AggregateUDF> {
Arc::new(make_uda(
pub fn struct_selector_min() -> AggregateUDF {
make_uda(
"selector_min",
FactoryBuilder::new(SelectorType::Min, SelectorOutput::Struct),
))
)
}
/// Returns a DataFusion user defined aggregate function for computing
@ -139,11 +124,11 @@ pub fn struct_selector_min() -> Arc<AggregateUDF> {
///
/// If there are multiple rows with the same maximum value, the value
/// with the first (earliest/smallest) timestamp is chosen
pub fn struct_selector_max() -> Arc<AggregateUDF> {
Arc::new(make_uda(
pub fn struct_selector_max() -> AggregateUDF {
make_uda(
"selector_max",
FactoryBuilder::new(SelectorType::Max, SelectorOutput::Struct),
))
)
}
/// Returns a DataFusion user defined aggregate function for computing
@ -1346,7 +1331,7 @@ mod test {
let ctx = SessionContext::new();
ctx.register_table("t", Arc::new(provider)).unwrap();
let df = ctx.table("t").unwrap();
let df = ctx.table("t").await.unwrap();
let df = df.aggregate(vec![], aggs).unwrap();
// execute the query

View File

@ -33,23 +33,22 @@
+-----+-----+-----+----------------------+
-- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A';
-- Results After Normalizing UUIDs
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: tag@2 = A |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@2 = A |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * FROM "table" WHERE foo=1 AND bar=2;
+-----+-----+-----+----------------------+
| bar | foo | tag | time |
@ -58,23 +57,22 @@
+-----+-----+-----+----------------------+
-- SQL: EXPLAIN SELECT * FROM "table" WHERE foo=1 AND bar=2;
-- Results After Normalizing UUIDs
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: foo@1 = 1 AND bar@0 = 2 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: foo@1 = 1 AND bar@0 = 2 |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * FROM "table" WHERE time=to_timestamp('1970-01-01T00:00:00.000000000+00:00') ORDER BY tag;
+-----+-----+-----+----------------------+
| bar | foo | tag | time |
@ -94,7 +92,7 @@
| physical_plan | SortExec: [tag@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@3 = 0 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
@ -112,20 +110,19 @@
+-----+-----+-----+----------------------+
-- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A' AND foo=1 AND time=to_timestamp('1970-01-01T00:00:00.000000000+00:00');
-- Results After Normalizing UUIDs
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -34,24 +34,23 @@
+-----+-----+-----+----------------------+
-- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A';
-- Results After Normalizing UUIDs
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: tag@2 = A |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A"))] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@2 = A |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * FROM "table" WHERE foo=1 AND bar=2;
+-----+-----+-----+----------------------+
| bar | foo | tag | time |
@ -60,24 +59,23 @@
+-----+-----+-----+----------------------+
-- SQL: EXPLAIN SELECT * FROM "table" WHERE foo=1 AND bar=2;
-- Results After Normalizing UUIDs
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: foo@1 = 1 AND bar@0 = 2 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.foo = Float64(1) AND table.bar = Float64(2) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.foo = Float64(1), table.bar = Float64(2)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: foo@1 = 1 AND bar@0 = 2 |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * FROM "table" WHERE time=to_timestamp('1970-01-01T00:00:00.000000000+00:00') ORDER BY tag;
+-----+-----+-----+----------------------+
| bar | foo | tag | time |
@ -97,7 +95,7 @@
| physical_plan | SortExec: [tag@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@3 = 0 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
@ -116,21 +114,20 @@
+-----+-----+-----+----------------------+
-- SQL: EXPLAIN SELECT * FROM "table" WHERE tag='A' AND foo=1 AND time=to_timestamp('1970-01-01T00:00:00.000000000+00:00');
-- Results After Normalizing UUIDs
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.tag = Dictionary(Int32, Utf8("A")) AND table.foo = Float64(1) AND table.time = TimestampNanosecond(0, None) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.tag = Dictionary(Int32, Utf8("A")), table.foo = Float64(1), table.time = TimestampNanosecond(0, None)] |
| physical_plan | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@2 = A AND foo@1 = 1 AND time@3 = 0 |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -25,24 +25,25 @@
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN select time, state, city, min_temp, max_temp, area from h2o;
-- Results After Normalizing UUIDs
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area |
| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] |
| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | UnionExec |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | DeduplicateExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | SortExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area |
| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] |
| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | UnionExec |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | DeduplicateExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | SortExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN select state as name from h2o UNION ALL select city as name from h2o;
-- Results After Normalizing UUIDs
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -22,21 +22,22 @@
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN select time, state, city, min_temp, max_temp, area from h2o;
-- Results After Normalizing UUIDs
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area |
| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] |
| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | UnionExec |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area |
| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] |
| physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | UnionExec |
| | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={2 groups: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: EXPLAIN select state as name from h2o UNION ALL select city as name from h2o;
-- Results After Normalizing UUIDs
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
@ -84,7 +85,7 @@
----------
| Plan with Metrics | CoalescePartitionsExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] |
| | ProjectionExec: expr=[area@0 as area, city@1 as city, max_temp@2 as max_temp, min_temp@3 as min_temp, state@4 as state, time@5 as time], metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] |
| | CoalesceBatchesExec: target_batch_size=4096, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] |
| | FilterExec: state@4 = MA, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] |
| | RepartitionExec: partitioning=RoundRobinBatch(4), metrics=[fetch_time=1.234ms, repart_time=1.234ms, send_time=1.234ms] |
| | UnionExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] |

View File

@ -14,15 +14,16 @@
+-------+--------+--------------------------------+-----------+
-- SQL: EXPLAIN SELECT * from restaurant;
-- Results After Normalizing UUIDs
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[count, system, time, town] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[count, system, time, town] |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where count > 200;
-- Results After Sorting
+-------+--------+--------------------------------+-----------+
@ -44,7 +45,7 @@
| | Filter: restaurant.count > UInt64(200) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200), pruning_predicate=count_max@0 > 200, projection=[count, system, time, town] |
@ -59,7 +60,7 @@
| | Filter: CAST(restaurant.count AS Float64) > Float64(200) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Float64) > Float64(200)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: CAST(count@0 AS Float64) > 200 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=CAST(count AS Float64) > Float64(200), projection=[count, system, time, town] |
@ -74,7 +75,7 @@
| | Filter: restaurant.system > Float64(4) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(4)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 4 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4), pruning_predicate=system_max@0 > 4, projection=[count, system, time, town] |
@ -100,7 +101,7 @@
| | Filter: restaurant.count > UInt64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury"))] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND town@3 != tewsbury |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2, projection=[count, system, time, town] |
@ -125,7 +126,7 @@
| | Filter: restaurant.count > UInt64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2, projection=[count, system, time, town] |
@ -149,7 +150,7 @@
| | Filter: restaurant.count > UInt64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND (restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence"))) AND restaurant.count < UInt64(40000) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), restaurant.count < UInt64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence AND count@0 < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2 AND count_min@5 < 40000, projection=[count, system, time, town] |
@ -175,7 +176,7 @@
| | Filter: restaurant.count > UInt64(200) AND restaurant.count < UInt64(40000) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.count > UInt64(200), restaurant.count < UInt64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: count@0 > 200 AND count@0 < 40000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND count_min@1 < 40000, projection=[count, system, time, town] |
@ -202,7 +203,7 @@
| | Filter: restaurant.system > Float64(4) AND restaurant.system < Float64(7) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(4), restaurant.system < Float64(7)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 4 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4) AND system < Float64(7), pruning_predicate=system_max@0 > 4 AND system_min@1 < 7, projection=[count, system, time, town] |
@ -226,7 +227,7 @@
| | Filter: restaurant.system > Float64(5) AND restaurant.system < Float64(7) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), restaurant.system < Float64(7)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 5 AND system@1 < 7 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND system < Float64(7), pruning_predicate=system_max@0 > 5 AND system_min@1 < 7, projection=[count, system, time, town] |
@ -249,7 +250,7 @@
| | Filter: restaurant.system > Float64(5) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND Float64(7) > restaurant.system |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), Float64(7) > restaurant.system] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 5 AND town@3 != tewsbury AND 7 > system@1 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND town != Dictionary(Int32, Utf8("tewsbury")) AND Float64(7) > system, pruning_predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7, projection=[count, system, time, town] |
@ -271,7 +272,7 @@
| | Filter: restaurant.system > Float64(5) AND Dictionary(Int32, Utf8("tewsbury")) != restaurant.town AND restaurant.system < Float64(7) AND (restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[restaurant.system > Float64(5), Dictionary(Int32, Utf8("tewsbury")) != restaurant.town, restaurant.system < Float64(7), restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: system@1 > 5 AND tewsbury != town@3 AND system@1 < 7 AND count@0 = 632 OR town@3 = reading |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND Dictionary(Int32, Utf8("tewsbury")) != town AND system < Float64(7) AND (count = UInt64(632) OR town = Dictionary(Int32, Utf8("reading"))), pruning_predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7 AND count_min@4 <= 632 AND 632 <= count_max@5 OR town_min@1 <= reading AND reading <= town_max@2, projection=[count, system, time, town] |
@ -290,9 +291,9 @@
| | Filter: Float64(5) < restaurant.system AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system < Float64(7) AND (restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))) AND restaurant.time > TimestampNanosecond(130, None) |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[Float64(5) < restaurant.system, restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system < Float64(7), restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading")), restaurant.time > TimestampNanosecond(130, None)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: 5 < system@1 AND town@3 != tewsbury AND system@1 < 7 AND count@0 = 632 OR town@3 = reading AND time@2 > 130 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: 5 < system@1 AND town@3 != tewsbury AND system@1 < 7 AND count@0 = 632 OR town@3 = reading AND time@2 > 130 |
| | EmptyExec: produce_one_row=false |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
@ -320,18 +321,18 @@
+-------+--------+--------------------------------+---------+
-- SQL: EXPLAIN SELECT * from restaurant where influx_regex_match(town, 'foo|bar|baz') and influx_regex_not_match(town, 'one|two');
-- Results After Normalizing UUIDs
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(restaurant.town AS Utf8)restaurant.town !~ Utf8("one|two")) |
| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) AS restaurant.town !~ Utf8("one|two") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz"), CAST(restaurant.town AS Utf8) !~ Utf8("one|two")] |
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 ~ foo|bar|baz AND CAST(restaurant.town AS Utf8)restaurant.town@0 !~ one|two |
| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) AS restaurant.town !~ Utf8("one|two")) AND (CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")), projection=[count, system, time, town] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town LIKE Utf8("%foo%") OR CAST(restaurant.town AS Utf8)restaurant.town LIKE Utf8("%bar%") OR CAST(restaurant.town AS Utf8)restaurant.town LIKE Utf8("%baz%")) AND CAST(restaurant.town AS Utf8)restaurant.town NOT LIKE Utf8("%one%") AND CAST(restaurant.town AS Utf8)restaurant.town NOT LIKE Utf8("%two%") |
| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) AS restaurant.town LIKE Utf8("%foo%") OR CAST(restaurant.town AS Utf8) AS restaurant.town LIKE Utf8("%bar%") OR CAST(restaurant.town AS Utf8) AS restaurant.town LIKE Utf8("%baz%") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) AS restaurant.town NOT LIKE Utf8("%one%") AND CAST(restaurant.town AS Utf8) AS restaurant.town NOT LIKE Utf8("%two%") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) LIKE Utf8("%foo%") OR CAST(restaurant.town AS Utf8) LIKE Utf8("%bar%") OR CAST(restaurant.town AS Utf8) LIKE Utf8("%baz%"), CAST(restaurant.town AS Utf8) NOT LIKE Utf8("%one%"), CAST(restaurant.town AS Utf8) NOT LIKE Utf8("%two%")] |
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %foo% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %bar% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %baz% AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %one% AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %two% |
| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%foo%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%bar%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%baz%")) AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%two%") AND (CAST(town AS Utf8) LIKE Utf8("%foo%") OR CAST(town AS Utf8) LIKE Utf8("%bar%") OR CAST(town AS Utf8) LIKE Utf8("%baz%")) AND CAST(town AS Utf8) NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) NOT LIKE Utf8("%two%"), projection=[count, system, time, town] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -9,30 +9,27 @@
+------+------+----------------------+
-- SQL: EXPLAIN SELECT * FROM cpu order by host, load, time;
-- Results After Normalizing UUIDs
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.load ASC NULLS LAST, cpu.time ASC NULLS LAST |
| | Projection: cpu.host, cpu.load, cpu.time |
| | TableScan: cpu projection=[host, load, time] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | SortExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.load ASC NULLS LAST, cpu.time ASC NULLS LAST |
| | Projection: cpu.host, cpu.load, cpu.time |
| | TableScan: cpu projection=[host, load, time] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
+------+------+----------------------+
| host | load | time |
@ -42,30 +39,27 @@
+------+------+----------------------+
-- SQL: EXPLAIN SELECT * FROM cpu WHERE host != 'b' ORDER BY host,time;
-- Results After Normalizing UUIDs
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.time ASC NULLS LAST |
| | Projection: cpu.host, cpu.load, cpu.time |
| | Filter: cpu.host != Dictionary(Int32, Utf8("b")) |
| | TableScan: cpu projection=[host, load, time], partial_filters=[cpu.host != Dictionary(Int32, Utf8("b"))] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: host@0 != b |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | SortExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.time ASC NULLS LAST |
| | Projection: cpu.host, cpu.load, cpu.time |
| | Filter: cpu.host != Dictionary(Int32, Utf8("b")) |
| | TableScan: cpu projection=[host, load, time], partial_filters=[cpu.host != Dictionary(Int32, Utf8("b"))] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: host@0 != b |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
| | UnionExec |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@2 < -9223372036854775808 OR time@2 > -3600000000000 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=host != Dictionary(Int32, Utf8("b")), pruning_predicate=host_min@0 != b OR b != host_max@1, output_ordering=[host@0 ASC, time@2 ASC], projection=[host, load, time] |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@ -14,25 +14,26 @@
+---------+------------+-------+------+--------------------------------+
-- SQL: EXPLAIN SELECT * from h2o;
-- Results After Normalizing UUIDs
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: h2o.city, h2o.other_temp, h2o.state, h2o.temp, h2o.time |
| | TableScan: h2o projection=[city, other_temp, state, temp, time] |
| physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] |
| | UnionExec |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[city, other_temp, state, temp, time] |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: h2o.city, h2o.other_temp, h2o.state, h2o.temp, h2o.time |
| | TableScan: h2o projection=[city, other_temp, state, temp, time] |
| physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | UnionExec |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[city, other_temp, state, temp, time] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: select temp, other_temp, time from h2o;
-- Results After Sorting
+------+------------+--------------------------------+
@ -80,7 +81,7 @@
| | Filter: h2o.time >= TimestampNanosecond(250, None) |
| | TableScan: h2o projection=[city, other_temp, state, temp, time], partial_filters=[h2o.time >= TimestampNanosecond(250, None)] |
| physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@4 >= 250 |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | UnionExec |

View File

@ -28,7 +28,7 @@ bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["std"] }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "07f49803a3d7a9e9b3c2c9a7714c1bb08db71385", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
datafusion = { git = "https://github.com/alamb/arrow-datafusion.git", branch = "alamb/patched_for_iox", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] }
either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }