diff --git a/Cargo.lock b/Cargo.lock index 734c6af08a..fc76dbc3c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,18 +8,19 @@ checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" [[package]] name = "ahash" -version = "0.4.6" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6789e291be47ace86a60303502173d84af8327e3627ecf334356ee0f87a164c" +checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e" [[package]] name = "ahash" -version = "0.5.7" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad4243ec6feddc812c0f442d9765374d250aba94e10ecf8b632e1b1c118547e8" +checksum = "a75b7e6a93ecd6dbd2c225154d0fa7f86205574ecaa6c87429fb5f66ee677c44" dependencies = [ "getrandom 0.2.0", "lazy_static", + "version_check", ] [[package]] @@ -66,9 +67,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.34" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf8dcb5b4bbaa28653b647d8c77bd4ed40183b48882e130c1f1ffb73de069fd7" +checksum = "2c0df63cb2955042487fad3aefd2c6e3ae7389ac5dc1beb28921de0b69f779d4" [[package]] name = "arrayref" @@ -85,7 +86,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=db20c7a611adac7be5cdd9350792852345f5b6b4#db20c7a611adac7be5cdd9350792852345f5b6b4" +source = "git+https://github.com/apache/arrow.git?rev=edff65dfe73d3380a211b678f3760a60a66e5544#edff65dfe73d3380a211b678f3760a60a66e5544" dependencies = [ "chrono", "csv", @@ -444,9 +445,9 @@ dependencies = [ [[package]] name = "const_fn" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c478836e029dcef17fb47c89023448c64f781a046e0300e257ad8225ae59afab" +checksum = "cd51eab21ab4fd6a3bf889e2d0958c0a6e3a61ad04260325e919e652a2a62826" [[package]] name = "constant_time_eq" @@ -728,15 +729,16 @@ dependencies = [ [[package]] name = "datafusion" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=db20c7a611adac7be5cdd9350792852345f5b6b4#db20c7a611adac7be5cdd9350792852345f5b6b4" +source = "git+https://github.com/apache/arrow.git?rev=edff65dfe73d3380a211b678f3760a60a66e5544#edff65dfe73d3380a211b678f3760a60a66e5544" dependencies = [ - "ahash 0.5.7", + "ahash 0.6.2", "arrow", "async-trait", "chrono", "clap", "crossbeam 0.8.0", "futures", + "hashbrown", "num_cpus", "parquet", "paste", @@ -1171,7 +1173,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" dependencies = [ - "ahash 0.4.6", + "ahash 0.4.7", ] [[package]] @@ -1536,9 +1538,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "libloading" @@ -1805,9 +1807,9 @@ dependencies = [ [[package]] name = "net2" -version = "0.2.36" +version = "0.2.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7cf75f38f16cb05ea017784dc6dbfd354f76c223dba37701734c4f5a9337d02" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" dependencies = [ "cfg-if 0.1.10", "libc", @@ -1978,12 +1980,12 @@ checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" [[package]] name = "openssl" -version = "0.10.30" +version = "0.10.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d575eff3665419f9b83678ff2815858ad9d11567e082f5ac1814baba4e2bcb4" +checksum = "8d008f51b1acffa0d3450a68606e6a51c123012edaacb0f4e1426bd978869187" dependencies = [ "bitflags", - "cfg-if 0.1.10", + "cfg-if 1.0.0", "foreign-types", "lazy_static", "libc", @@ -1998,9 +2000,9 @@ checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" [[package]] name = "openssl-sys" -version = "0.9.58" +version = "0.9.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a842db4709b604f0fe5d1170ae3565899be2ad3d9cbc72dedc789ac0511f78de" +checksum = "de52d8eabd217311538a39bba130d7dea1f1e118010fee7a033d966845e7d5fe" dependencies = [ "autocfg", "cc", @@ -2039,9 +2041,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3741934be594d77de1c8461ebcbbe866f585ea616a9753aa78f2bdc69f0e4579" +checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7" dependencies = [ "num-traits", ] @@ -2073,7 +2075,7 @@ dependencies = [ [[package]] name = "parquet" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow.git?rev=db20c7a611adac7be5cdd9350792852345f5b6b4#db20c7a611adac7be5cdd9350792852345f5b6b4" +source = "git+https://github.com/apache/arrow.git?rev=edff65dfe73d3380a211b678f3760a60a66e5544#edff65dfe73d3380a211b678f3760a60a66e5544" dependencies = [ "arrow", "base64 0.13.0", @@ -2100,9 +2102,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7151b083b0664ed58ed669fcdd92f01c3d2fdbf10af4931a301474950b52bfa9" +checksum = "c5d65c4d95931acda4498f675e332fcbdc9a06705cd07086c510e9b6009cd1c1" [[package]] name = "peeking_take_while" @@ -2808,9 +2810,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" +checksum = "06c64263859d87aa2eb554587e2d23183398d617427327cf2b3d0ed8c69e4800" dependencies = [ "serde_derive", ] @@ -2827,9 +2829,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.117" +version = "1.0.118" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" +checksum = "c84d3526699cd55261af4b941e4e725444df67aa4f9e6a3564f18030d12672df" dependencies = [ "proc-macro2", "quote", @@ -2957,9 +2959,9 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] name = "smallvec" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7acad6f34eb9e8a259d3283d1e8c1d34d7415943d4895f65cc73813c7396fc85" +checksum = "ae524f056d7d770e174287294f562e95044c68e88dec909a00d2094805db9d75" [[package]] name = "snafu" @@ -3106,9 +3108,9 @@ checksum = "2d67a5a62ba6e01cb2192ff309324cb4875d0c451d55fe2319433abe7a05a8ee" [[package]] name = "syn" -version = "1.0.53" +version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8833e20724c24de12bbaba5ad230ea61c3eafb05b881c7c9d3cfe8638b187e68" +checksum = "9a2af957a63d6bd42255c359c93d9bfdb97076bd3b820897ce55ffbfbf107f44" dependencies = [ "proc-macro2", "quote", @@ -3275,9 +3277,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "0.2.23" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" +checksum = "099837d3464c16a808060bb3f02263b412f6fafcb5d01c533d309985fbeebe48" dependencies = [ "bytes", "fnv", @@ -3750,9 +3752,9 @@ dependencies = [ [[package]] name = "vcpkg" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" +checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb" [[package]] name = "vec_map" @@ -4030,9 +4032,9 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "zeroize" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f33972566adbd2d3588b0491eb94b98b43695c4ef897903470ede4f3f5a28a" +checksum = "81a974bcdd357f0dca4d41677db03436324d45a4c9ed2d0b873a5a360ce41c36" [[package]] name = "zstd" diff --git a/arrow_deps/Cargo.toml b/arrow_deps/Cargo.toml index 14bbaba3af..cc69bf953e 100644 --- a/arrow_deps/Cargo.toml +++ b/arrow_deps/Cargo.toml @@ -11,10 +11,10 @@ description = "Apache Arrow / Parquet / DataFusion dependencies for InfluxDB IOx [dependencies] # We are using development version of arrow/parquet/datafusion and the dependencies are at the same rev -# The version can be found here: https://github.com/apache/arrow/commit/db20c7a611adac7be5cdd9350792852345f5b6b4 +# The version can be found here: https://github.com/apache/arrow/commit/edff65dfe73d3380a211b678f3760a60a66e5544 # -arrow = { git = "https://github.com/apache/arrow.git", rev = "db20c7a611adac7be5cdd9350792852345f5b6b4" , features = ["simd"] } -datafusion = { git = "https://github.com/apache/arrow.git", rev = "db20c7a611adac7be5cdd9350792852345f5b6b4" } +arrow = { git = "https://github.com/apache/arrow.git", rev = "edff65dfe73d3380a211b678f3760a60a66e5544" , features = ["simd"] } +datafusion = { git = "https://github.com/apache/arrow.git", rev = "edff65dfe73d3380a211b678f3760a60a66e5544" } # Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time # and we're not currently using it anyway -parquet = { git = "https://github.com/apache/arrow.git", rev = "db20c7a611adac7be5cdd9350792852345f5b6b4", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } +parquet = { git = "https://github.com/apache/arrow.git", rev = "edff65dfe73d3380a211b678f3760a60a66e5544", default-features = false, features = ["snap", "brotli", "flate2", "lz4", "zstd"] } diff --git a/query/src/exec.rs b/query/src/exec.rs index a05af63eaf..3b384b3247 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -439,12 +439,15 @@ async fn run_logical_plans( #[cfg(test)] mod tests { - use arrow_deps::arrow::{ - array::Int64Array, - array::StringArray, - array::StringBuilder, - datatypes::DataType, - datatypes::{Field, Schema, SchemaRef}, + use arrow_deps::{ + arrow::{ + array::Int64Array, + array::StringArray, + array::StringBuilder, + datatypes::DataType, + datatypes::{Field, Schema, SchemaRef}, + }, + datafusion::logical_plan::LogicalPlanBuilder, }; use stringset::StringSet; @@ -666,13 +669,14 @@ mod tests { // creates a DataFusion plan that reads the RecordBatches into memory fn make_plan(schema: SchemaRef, data: Vec) -> LogicalPlan { - let projected_schema = schema.clone(); - - LogicalPlan::InMemoryScan { - data: vec![data], // model one partition + let projection = None; + LogicalPlanBuilder::scan_memory( + vec![data], // model one partition, schema, - projection: None, - projected_schema, - } + projection, + ) + .unwrap() + .build() + .unwrap() } } diff --git a/query/src/exec/planning.rs b/query/src/exec/planning.rs index f547644c7b..dd9424a526 100644 --- a/query/src/exec/planning.rs +++ b/query/src/exec/planning.rs @@ -72,7 +72,7 @@ impl ExtensionPlanner for IOxExtensionPlanner { assert_eq!(inputs.len(), 1, "Inconsistent number of inputs"); Ok(Arc::new(SchemaPivotExec::new( inputs[0].clone(), - schema_pivot.schema().clone(), + schema_pivot.schema().as_ref().clone().into(), ))) } None => Err(Error::Internal(format!( diff --git a/query/src/exec/schema_pivot.rs b/query/src/exec/schema_pivot.rs index e46fcb9788..876786ef04 100644 --- a/query/src/exec/schema_pivot.rs +++ b/query/src/exec/schema_pivot.rs @@ -28,15 +28,18 @@ use std::{ use async_trait::async_trait; use arrow_deps::{ - arrow::array::StringBuilder, - arrow::datatypes::{DataType, Field, Schema, SchemaRef}, - arrow::record_batch::RecordBatch, - datafusion::logical_plan::{self, Expr, LogicalPlan, UserDefinedLogicalNode}, - datafusion::physical_plan::common::SizedRecordBatchStream, - datafusion::physical_plan::SendableRecordBatchStream, + arrow::{ + array::StringBuilder, + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, + }, datafusion::{ error::DataFusionError, - physical_plan::{Distribution, ExecutionPlan, Partitioning}, + logical_plan::{self, DFSchemaRef, Expr, LogicalPlan, ToDFSchema, UserDefinedLogicalNode}, + physical_plan::{ + common::SizedRecordBatchStream, Distribution, ExecutionPlan, Partitioning, + SendableRecordBatchStream, + }, }, }; @@ -47,7 +50,7 @@ pub use arrow_deps::datafusion::error::{DataFusionError as Error, Result}; /// Implementes the SchemaPivot operation described in make_schema_pivot, pub struct SchemaPivotNode { input: LogicalPlan, - schema: SchemaRef, + schema: DFSchemaRef, // these expressions represent what columns are "used" by this // node (in this case all of them) -- columns that are not used // are optimzied away by datafusion. @@ -92,7 +95,7 @@ impl UserDefinedLogicalNode for SchemaPivotNode { } /// Schema for Pivot is a single string - fn schema(&self) -> &SchemaRef { + fn schema(&self) -> &DFSchemaRef { &self.schema } @@ -123,14 +126,15 @@ impl UserDefinedLogicalNode for SchemaPivotNode { // ------ The implementation of SchemaPivot code follows ----- /// Create the schema describing the output -pub fn make_schema_pivot_output_schema() -> SchemaRef { +pub fn make_schema_pivot_output_schema() -> DFSchemaRef { let nullable = false; - let schema = Schema::new(vec![Field::new( + Schema::new(vec![Field::new( "non_null_column", DataType::Utf8, nullable, - )]); - SchemaRef::new(schema) + )]) + .to_dfschema_ref() + .unwrap() } /// Physical operator that implements the SchemaPivot operation aginst @@ -442,7 +446,7 @@ mod tests { /// Create a schema piv fn make_schema_pivot(input_schema: SchemaRef, data: Vec) -> SchemaPivotExec { let input = make_memory_exec(input_schema, data); - let output_schema = make_schema_pivot_output_schema(); + let output_schema = Arc::new(make_schema_pivot_output_schema().as_ref().clone().into()); SchemaPivotExec::new(input, output_schema) } diff --git a/query/src/util.rs b/query/src/util.rs index 6edad18f3e..075f4b7408 100644 --- a/query/src/util.rs +++ b/query/src/util.rs @@ -50,6 +50,14 @@ pub fn visit_expression(expr: &Expr, visitor: &mut V) { } } Expr::Not(expr) => visit_expression(expr, visitor), + Expr::Negative(expr) => visit_expression(expr, visitor), + Expr::Between { + expr, low, high, .. + } => { + visit_expression(expr, visitor); + visit_expression(low, visitor); + visit_expression(high, visitor); + } Expr::IsNull(expr) => visit_expression(expr, visitor), Expr::IsNotNull(expr) => visit_expression(expr, visitor), Expr::ScalarFunction { args, .. } => { diff --git a/write_buffer/src/database.rs b/write_buffer/src/database.rs index 09e4cb8286..fff5a51486 100644 --- a/write_buffer/src/database.rs +++ b/write_buffer/src/database.rs @@ -584,7 +584,7 @@ impl SQLDatabase for Db { for table in tables { let provider = - MemTable::new(table.schema, vec![table.data]).context(QueryError { query })?; + MemTable::try_new(table.schema, vec![table.data]).context(QueryError { query })?; ctx.register_table(&table.name, Box::new(provider)); } diff --git a/write_buffer/src/table.rs b/write_buffer/src/table.rs index a298352441..574ab30e51 100644 --- a/write_buffer/src/table.rs +++ b/write_buffer/src/table.rs @@ -168,6 +168,11 @@ pub enum Error { #[snafu(display("Duplicate group column '{}'", column_name))] DuplicateGroupColumn { column_name: String }, + + #[snafu(display("Internal error converting schema to DFSchema: {}", source))] + InternalConvertingSchema { + source: datafusion::error::DataFusionError, + }, } pub type Result = std::result::Result; @@ -338,14 +343,9 @@ impl Table { let schema = data.schema(); let projection = None; - let projected_schema = schema.clone(); - let plan_builder = LogicalPlanBuilder::from(&LogicalPlan::InMemoryScan { - data: vec![vec![data]], - schema, - projection, - projected_schema, - }); + let plan_builder = LogicalPlanBuilder::scan_memory(vec![vec![data]], schema, projection) + .context(BuildingPlan)?; let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?; @@ -403,16 +403,11 @@ impl Table { let schema = data.schema(); let projection = None; - let projected_schema = schema.clone(); let select_exprs = vec![col(column_name)]; // And build the plan! - let plan_builder = LogicalPlanBuilder::from(&LogicalPlan::InMemoryScan { - data: vec![vec![data]], - schema, - projection, - projected_schema, - }); + let plan_builder = LogicalPlanBuilder::scan_memory(vec![vec![data]], schema, projection) + .context(BuildingPlan)?; let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?; @@ -470,15 +465,10 @@ impl Table { let schema = data.schema(); let projection = None; - let projected_schema = schema.clone(); // And build the plan from the bottom up - let plan_builder = LogicalPlanBuilder::from(&LogicalPlan::InMemoryScan { - data: vec![vec![data]], - schema, - projection, - projected_schema, - }); + let plan_builder = LogicalPlanBuilder::scan_memory(vec![vec![data]], schema, projection) + .context(BuildingPlan)?; // Filtering let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?; @@ -599,15 +589,10 @@ impl Table { let schema = data.schema(); let projection = None; - let projected_schema = schema.clone(); // And build the plan from the bottom up - let plan_builder = LogicalPlanBuilder::from(&LogicalPlan::InMemoryScan { - data: vec![vec![data]], - schema, - projection, - projected_schema, - }); + let plan_builder = LogicalPlanBuilder::scan_memory(vec![vec![data]], schema, projection) + .context(BuildingPlan)?; // Filtering let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?; @@ -679,15 +664,10 @@ impl Table { let schema = data.schema(); let projection = None; - let projected_schema = schema.clone(); // And build the plan from the bottom up - let plan_builder = LogicalPlanBuilder::from(&LogicalPlan::InMemoryScan { - data: vec![vec![data]], - schema, - projection, - projected_schema, - }); + let plan_builder = LogicalPlanBuilder::scan_memory(vec![vec![data]], schema, projection) + .context(BuildingPlan)?; // Filtering let plan_builder = Self::add_datafusion_predicate(plan_builder, partition_predicate)?;