From 5488c257d1bbb9a9b2f6882444b9e88098e53fdc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 27 Jan 2022 08:33:27 -0500 Subject: [PATCH] chore: Update datafusion, upgrade to arrow/parqet/arrow-flight 8.0.0 (#3517) * chore: Update datafusion * chore: update to arrow 8 * fix: update to use new DataFusion APIs * fix: update case for sortedness * fix: cargo hakari --- Cargo.lock | 65 ++++++++++++++++----------- arrow_util/Cargo.toml | 2 +- datafusion/Cargo.toml | 2 +- datafusion_util/src/lib.rs | 23 ++++++++++ db/Cargo.toml | 2 +- influxdb_iox/Cargo.toml | 6 +-- influxdb_iox_client/Cargo.toml | 4 +- ingester/Cargo.toml | 2 +- ingester/src/query.rs | 11 ++++- mutable_batch/Cargo.toml | 2 +- mutable_buffer/Cargo.toml | 2 +- packers/Cargo.toml | 4 +- parquet_catalog/Cargo.toml | 4 +- parquet_file/Cargo.toml | 4 +- parquet_file/src/storage.rs | 8 +--- predicate/Cargo.toml | 2 +- query/Cargo.toml | 2 +- query/src/exec/schema_pivot.rs | 1 + query/src/exec/seriesset/converter.rs | 9 ++-- query/src/exec/split.rs | 12 ++--- query/src/provider.rs | 6 ++- query/src/provider/adapter.rs | 31 +++++-------- query/src/pruning.rs | 8 +++- query/src/statistics.rs | 5 +++ query/src/test.rs | 6 +-- query_tests/Cargo.toml | 2 +- read_buffer/Cargo.toml | 2 +- schema/Cargo.toml | 2 +- workspace-hack/Cargo.toml | 6 ++- 29 files changed, 139 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd8e035395..cf03fead58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,9 +84,9 @@ dependencies = [ [[package]] name = "arrow" -version = "7.0.0" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66ec0a5964feebf378e2fc6db9530e712657b8edf72aa17b1b277b0f52a48e2d" +checksum = "ce240772a007c63658c1d335bb424fd1019b87895dee899b7bf70e85b2d24e5f" dependencies = [ "bitflags", "chrono", @@ -109,9 +109,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "7.0.0" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c02bce74a675af058d1fb7d810a015769f2a01aad2aea505c4c913394dd4e8" +checksum = "8ec7c637d2edd6969eeb63793584e0a3d17f99559b872ab73f47aea186aef50a" dependencies = [ "arrow", "base64 0.13.0", @@ -132,7 +132,7 @@ dependencies = [ "arrow", "chrono", "comfy-table", - "hashbrown", + "hashbrown 0.11.2", "num-traits", "rand", "snafu", @@ -851,14 +851,14 @@ dependencies = [ [[package]] name = "datafusion" version = "6.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=ad392fd529f9f8631b90271f7dbf3a4f2feadb7b#ad392fd529f9f8631b90271f7dbf3a4f2feadb7b" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1#63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1" dependencies = [ "ahash", "arrow", "async-trait", "chrono", "futures", - "hashbrown", + "hashbrown 0.12.0", "lazy_static", "log", "num_cpus", @@ -898,7 +898,7 @@ dependencies = [ "datafusion_util", "dml", "futures", - "hashbrown", + "hashbrown 0.11.2", "internal_types", "iox_object_store", "itertools", @@ -1019,7 +1019,7 @@ version = "0.1.0" dependencies = [ "arrow_util", "data_types", - "hashbrown", + "hashbrown 0.11.2", "mutable_batch", "mutable_batch_lp", "ordered-float 2.10.0", @@ -1468,13 +1468,22 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c21d40587b92fa6a6c6e3c1bdbf87d75511db5672f9c93175574b3a00df1758" +dependencies = [ + "ahash", +] + [[package]] name = "hashlink" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" dependencies = [ - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -1652,7 +1661,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -1717,7 +1726,7 @@ dependencies = [ "flate2", "futures", "generated_types", - "hashbrown", + "hashbrown 0.11.2", "heappy", "hex", "http", @@ -2162,7 +2171,7 @@ version = "0.1.0" dependencies = [ "data_types", "futures", - "hashbrown", + "hashbrown 0.11.2", "internal_types", "observability_deps", "parking_lot", @@ -2400,7 +2409,7 @@ dependencies = [ "arrow_util", "chrono", "data_types", - "hashbrown", + "hashbrown 0.11.2", "itertools", "rand", "schema", @@ -2413,7 +2422,7 @@ name = "mutable_batch_lp" version = "0.1.0" dependencies = [ "arrow_util", - "hashbrown", + "hashbrown 0.11.2", "influxdb_line_protocol", "mutable_batch", "schema", @@ -2428,7 +2437,7 @@ dependencies = [ "arrow_util", "dml", "generated_types", - "hashbrown", + "hashbrown 0.11.2", "mutable_batch", "mutable_batch_lp", "schema", @@ -2876,9 +2885,9 @@ dependencies = [ [[package]] name = "parquet" -version = "7.0.0" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c718575b34e488fa78d4f0286356abb8466573cb17ae8faa96ffd871ca6e8c6" +checksum = "2d5a6492e0b849fd458bc9364aee4c8a9882b3cc21b2576767162725f69d2ad8" dependencies = [ "arrow", "base64 0.13.0", @@ -3386,7 +3395,7 @@ dependencies = [ "datafusion 0.1.0", "datafusion_util", "futures", - "hashbrown", + "hashbrown 0.11.2", "itertools", "libc", "observability_deps", @@ -3576,7 +3585,7 @@ dependencies = [ "data_types", "datafusion 0.1.0", "either", - "hashbrown", + "hashbrown 0.11.2", "itertools", "metric", "observability_deps", @@ -3719,7 +3728,7 @@ dependencies = [ "cache_loader_async", "data_types", "dml", - "hashbrown", + "hashbrown 0.11.2", "influxdb_iox_client", "metric", "mutable_batch", @@ -3748,7 +3757,7 @@ dependencies = [ "flate2", "futures", "generated_types", - "hashbrown", + "hashbrown 0.11.2", "hyper", "metric", "mutable_batch", @@ -3973,7 +3982,7 @@ name = "schema" version = "0.1.0" dependencies = [ "arrow", - "hashbrown", + "hashbrown 0.11.2", "indexmap", "itertools", "snafu", @@ -4116,7 +4125,7 @@ dependencies = [ "futures", "futures-util", "generated_types", - "hashbrown", + "hashbrown 0.11.2", "influxdb_iox_client", "influxdb_line_protocol", "internal_types", @@ -4951,7 +4960,7 @@ name = "trace_http" version = "0.1.0" dependencies = [ "futures", - "hashbrown", + "hashbrown 0.11.2", "http", "http-body", "itertools", @@ -5055,7 +5064,7 @@ name = "tracker" version = "0.1.0" dependencies = [ "futures", - "hashbrown", + "hashbrown 0.11.2", "lock_api", "metric", "observability_deps", @@ -5436,6 +5445,7 @@ name = "workspace-hack" version = "0.1.0" dependencies = [ "ahash", + "arrow", "base64 0.13.0", "bitflags", "byteorder", @@ -5450,7 +5460,7 @@ dependencies = [ "futures-task", "futures-util", "getrandom", - "hashbrown", + "hashbrown 0.11.2", "hyper", "indexmap", "log", @@ -5460,6 +5470,7 @@ dependencies = [ "num-integer", "num-traits", "once_cell", + "parquet", "rand", "regex", "regex-automata", diff --git a/arrow_util/Cargo.toml b/arrow_util/Cargo.toml index 5441ccc48d..1ef91832c0 100644 --- a/arrow_util/Cargo.toml +++ b/arrow_util/Cargo.toml @@ -7,7 +7,7 @@ description = "Apache Arrow utilities" [dependencies] ahash = { version = "0.7.5", default-features = false } -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } # used by arrow anyway (needed for printing workaround) chrono = { version = "0.4", default-features = false } comfy-table = { version = "5.0", default-features = false } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 15c1b95624..1836fd15f9 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (e.g. don't get support for crypo functions or avro) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="ad392fd529f9f8631b90271f7dbf3a4f2feadb7b", default-features = false, package = "datafusion" } +upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1", default-features = false, package = "datafusion" } workspace-hack = { path = "../workspace-hack"} diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index 967eefffee..dda30ec02a 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -5,6 +5,8 @@ use std::sync::Arc; use std::task::{Context, Poll}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::physical_plan::common::SizedRecordBatchStream; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::{ arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch}, @@ -201,6 +203,27 @@ impl RecordBatchStream for AdapterStream { } } +/// Create a SendableRecordBatchStream a RecordBatch +pub fn stream_from_batch(batch: RecordBatch) -> SendableRecordBatchStream { + stream_from_batches(vec![Arc::new(batch)]) +} + +/// Create a SendableRecordBatchStream from Vec of RecordBatches with the same schema +pub fn stream_from_batches(batches: Vec>) -> SendableRecordBatchStream { + let dummy_metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&dummy_metrics, 0); + let stream = SizedRecordBatchStream::new(batches[0].schema(), batches, baseline_metrics); + Box::pin(stream) +} + +/// Create a SendableRecordBatchStream that sends back no RecordBatches with a specific schema +pub fn stream_from_schema(schema: SchemaRef) -> SendableRecordBatchStream { + let dummy_metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&dummy_metrics, 0); + let stream = SizedRecordBatchStream::new(schema, vec![], baseline_metrics); + Box::pin(stream) +} + /// Execute the [ExecutionPlan] with a default [RuntimeEnv] and /// collect the results in memory. /// diff --git a/db/Cargo.toml b/db/Cargo.toml index 2369de3a6f..d78b44f573 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -5,7 +5,7 @@ authors = ["pauldix "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } async-trait = "0.1" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 79b1f791f7..a90b7f1f7b 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -44,8 +44,8 @@ tracker = { path = "../tracker" } trogging = { path = "../trogging", default-features = false, features = ["clap"] } # Crates.io dependencies, in alphabetical order -arrow = { version = "7.0", features = ["prettyprint"] } -arrow-flight = "7.0" +arrow = { version = "8.0", features = ["prettyprint"] } +arrow-flight = "8.0" async-trait = "0.1" backtrace = "0.3" byteorder = "1.3.4" @@ -68,7 +68,7 @@ log = "0.4" num_cpus = "1.13.0" once_cell = { version = "1.4.0", features = ["parking_lot"] } parking_lot = "0.11.2" -parquet = "7.0" +parquet = "8.0" pin-project = "1.0" pprof = { version = "0.6", default-features = false, features = ["flamegraph", "protobuf"], optional = true } prost = "0.9" diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 1b95336c0f..70bb4b3dcd 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -16,8 +16,8 @@ client_util = { path = "../client_util" } generated_types = { path = "../generated_types" } # Crates.io dependencies, in alphabetical order -arrow = { version = "7.0", optional = true } -arrow-flight = { version = "7.0", optional = true } +arrow = { version = "8.0", optional = true } +arrow-flight = { version = "8.0", optional = true } bytes = "1.0" futures-util = { version = "0.3", optional = true } dml = { path = "../dml", optional = true } diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index 20dc6c3544..7f230984bc 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Nga Tran "] edition = "2021" [dependencies] -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } datafusion = { path = "../datafusion" } data_types = { path = "../data_types" } diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 43e10b68e0..a09f487b40 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -9,7 +9,11 @@ use data_types::{ delete_predicate::DeletePredicate, partition_metadata::TableSummary, }; -use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; +use datafusion::physical_plan::{ + common::SizedRecordBatchStream, + metrics::{BaselineMetrics, ExecutionPlanMetricsSet}, + SendableRecordBatchStream, +}; use iox_catalog::interface::Tombstone; use predicate::{ delete_predicate::parse_delete_predicate, @@ -185,7 +189,10 @@ impl QueryChunk for QueryableBatch { } // Return sream of data - let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), stream_batches); + let dummy_metrics = ExecutionPlanMetricsSet::new(); + let baseline_metrics = BaselineMetrics::new(&dummy_metrics, 0); + let stream = + SizedRecordBatchStream::new(self.schema().as_arrow(), stream_batches, baseline_metrics); Ok(Box::pin(stream)) } diff --git a/mutable_batch/Cargo.toml b/mutable_batch/Cargo.toml index 0fba07c2b4..b4ba19e437 100644 --- a/mutable_batch/Cargo.toml +++ b/mutable_batch/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" description = "A mutable arrow RecordBatch" [dependencies] -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } chrono = { version = "0.4", default-features = false } data_types = { path = "../data_types" } diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index 75a065366c..78e0dba16d 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Andrew Lamb "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } data_types = { path = "../data_types" } schema = { path = "../schema" } metric = { path = "../metric" } diff --git a/packers/Cargo.toml b/packers/Cargo.toml index 4fb339f1cf..f14121917e 100644 --- a/packers/Cargo.toml +++ b/packers/Cargo.toml @@ -5,12 +5,12 @@ authors = ["Andrew Lamb "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } influxdb_tsm = { path = "../influxdb_tsm" } schema = { path = "../schema" } snafu = "0.7" observability_deps = { path = "../observability_deps" } -parquet = "7.0" +parquet = "8.0" workspace-hack = { path = "../workspace-hack"} [dev-dependencies] # In alphabetical order diff --git a/parquet_catalog/Cargo.toml b/parquet_catalog/Cargo.toml index 05d0ed108c..1a568ae862 100644 --- a/parquet_catalog/Cargo.toml +++ b/parquet_catalog/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } base64 = "0.13" bytes = "1.0" data_types = { path = "../data_types" } @@ -16,7 +16,7 @@ iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } object_store = { path = "../object_store" } observability_deps = { path = "../observability_deps" } -parquet = "7.0" +parquet = "8.0" parquet_file = { path = "../parquet_file" } parquet-format = "4.0" parking_lot = "0.11.1" diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index aaf4c986ac..8acb2084d2 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Nga Tran "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } base64 = "0.13" bytes = "1.0" data_types = { path = "../data_types" } @@ -17,7 +17,7 @@ iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } object_store = { path = "../object_store" } observability_deps = { path = "../observability_deps" } -parquet = "7.0" +parquet = {version = "8.0", features = ["experimental"]} parquet-format = "4.0" parking_lot = "0.11.1" pbjson-types = "0.2" diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index fe92c8cf73..39a21d6cf8 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -346,8 +346,7 @@ mod tests { use arrow::array::{ArrayRef, StringArray}; use arrow_util::assert_batches_eq; use data_types::chunk_metadata::{ChunkId, ChunkOrder}; - use datafusion::physical_plan::common::SizedRecordBatchStream; - use datafusion_util::MemoryStream; + use datafusion_util::{stream_from_batch, MemoryStream}; use parquet::schema::types::ColumnPath; use time::Time; @@ -427,10 +426,7 @@ mod tests { // write the data in let schema = batch.schema(); - let input_stream = Box::pin(SizedRecordBatchStream::new( - batch.schema(), - vec![Arc::new(batch)], - )); + let input_stream = stream_from_batch(batch); let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( Arc::clone(&table_name), Arc::clone(&partition_key), diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index 52c2cc093e..a3c431a974 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } chrono = { version = "0.4", default-features = false } data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } diff --git a/query/Cargo.toml b/query/Cargo.toml index 5803c134fc..0657f98920 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -14,7 +14,7 @@ description = "IOx Query Interface and Executor" # 2. Allow for query logic testing without bringing in all the storage systems. [dependencies] # In alphabetical order -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } async-trait = "0.1" chrono = { version = "0.4", default-features = false } diff --git a/query/src/exec/schema_pivot.rs b/query/src/exec/schema_pivot.rs index a8c28b743e..6144963497 100644 --- a/query/src/exec/schema_pivot.rs +++ b/query/src/exec/schema_pivot.rs @@ -300,6 +300,7 @@ impl ExecutionPlan for SchemaPivotExec { Ok(Box::pin(SizedRecordBatchStream::new( self.schema(), batches, + baseline_metrics, ))) } diff --git a/query/src/exec/seriesset/converter.rs b/query/src/exec/seriesset/converter.rs index dd9604c40e..cdc4e0e432 100644 --- a/query/src/exec/seriesset/converter.rs +++ b/query/src/exec/seriesset/converter.rs @@ -490,7 +490,7 @@ mod tests { record_batch::RecordBatch, }; use arrow_util::assert_batches_eq; - use datafusion::physical_plan::common::SizedRecordBatchStream; + use datafusion_util::{stream_from_batch, stream_from_schema}; use test_helpers::{str_pair_vec_to_vec, str_vec_to_arc_vec}; use super::*; @@ -498,7 +498,7 @@ mod tests { #[tokio::test] async fn test_convert_empty() { let schema = Arc::new(Schema::new(vec![])); - let empty_iterator = Box::pin(SizedRecordBatchStream::new(schema, vec![])); + let empty_iterator = stream_from_schema(schema); let table_name = "foo"; let tag_columns = []; @@ -849,9 +849,6 @@ mod tests { } fn batch_to_iterator(batch: RecordBatch) -> SendableRecordBatchStream { - Box::pin(SizedRecordBatchStream::new( - batch.schema(), - vec![Arc::new(batch)], - )) + stream_from_batch(batch) } } diff --git a/query/src/exec/split.rs b/query/src/exec/split.rs index 87b86d283d..2a634d59c4 100644 --- a/query/src/exec/split.rs +++ b/query/src/exec/split.rs @@ -12,6 +12,7 @@ use arrow::{ array::{Array, ArrayRef, BooleanArray}, compute::{self, filter_record_batch}, datatypes::SchemaRef, + error::ArrowError, error::Result as ArrowResult, record_batch::RecordBatch, }; @@ -274,17 +275,16 @@ impl StreamSplitExec { Err(e) => { debug!(%e, "error joining task"); for tx in &txs { - let err = DataFusionError::Execution(format!("Join Error: {}", e)); - let err = Err(err.into_arrow_external_error()); - tx.send(err).await.ok(); + let err: ArrowError = + DataFusionError::Execution(format!("Join Error: {}", e)).into(); + tx.send(Err(err)).await.ok(); } } Ok(Err(e)) => { debug!(%e, "error in work function"); for tx in &txs { - let err = DataFusionError::Execution(e.to_string()); - let err = Err(err.into_arrow_external_error()); - tx.send(err).await.ok(); + let err: ArrowError = DataFusionError::Execution(e.to_string()).into(); + tx.send(Err(err)).await.ok(); } } // Input task completed successfully diff --git a/query/src/provider.rs b/query/src/provider.rs index 0485fcbbc6..ea6dfc308f 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -1055,7 +1055,7 @@ mod test { use std::num::NonZeroU64; use arrow::datatypes::DataType; - use arrow_util::assert_batches_eq; + use arrow_util::{assert_batches_eq, assert_batches_sorted_eq}; use datafusion_util::test_collect; use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME}; @@ -2280,7 +2280,9 @@ mod test { "| 10 | MT | 1970-01-01T00:00:00.000007Z |", "+-----------+------+--------------------------------+", ]; - assert_batches_eq!(&expected, &batch); + // Since output is partially sorted, allow order to vary and + // test to still pass + assert_batches_sorted_eq!(&expected, &batch); } #[tokio::test] diff --git a/query/src/provider/adapter.rs b/query/src/provider/adapter.rs index 6da76f62a5..b0a5a58f4d 100644 --- a/query/src/provider/adapter.rs +++ b/query/src/provider/adapter.rs @@ -242,10 +242,8 @@ mod tests { record_batch::RecordBatch, }; use arrow_util::assert_batches_eq; - use datafusion::physical_plan::{ - common::{collect, SizedRecordBatchStream}, - metrics::ExecutionPlanMetricsSet, - }; + use datafusion::physical_plan::{common::collect, metrics::ExecutionPlanMetricsSet}; + use datafusion_util::stream_from_batch; use test_helpers::assert_contains; #[tokio::test] @@ -253,10 +251,9 @@ mod tests { let batch = make_batch(); let output_schema = batch.schema(); - let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); + let input_stream = stream_from_batch(batch); let adapter_stream = - SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()) - .unwrap(); + SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()).unwrap(); let output = collect(Box::pin(adapter_stream)) .await @@ -283,10 +280,9 @@ mod tests { Field::new("c", DataType::Utf8, false), Field::new("a", DataType::Int32, false), ])); - let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); + let input_stream = stream_from_batch(batch); let adapter_stream = - SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()) - .unwrap(); + SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()).unwrap(); let output = collect(Box::pin(adapter_stream)) .await @@ -314,10 +310,9 @@ mod tests { Field::new("d", DataType::Float32, false), Field::new("a", DataType::Int32, false), ])); - let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); + let input_stream = stream_from_batch(batch); let adapter_stream = - SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()) - .unwrap(); + SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()).unwrap(); let output = collect(Box::pin(adapter_stream)) .await @@ -343,9 +338,8 @@ mod tests { Field::new("c", DataType::Utf8, false), Field::new("a", DataType::Int32, false), ])); - let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); - let res = - SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()); + let input_stream = stream_from_batch(batch); + let res = SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()); assert_contains!( res.unwrap_err().to_string(), @@ -363,9 +357,8 @@ mod tests { Field::new("b", DataType::Int32, false), Field::new("a", DataType::Int32, false), ])); - let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); - let res = - SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()); + let input_stream = stream_from_batch(batch); + let res = SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()); assert_contains!(res.unwrap_err().to_string(), "input field 'c' had type 'Utf8' which is different than output field 'c' which had type 'Float32'"); } diff --git a/query/src/pruning.rs b/query/src/pruning.rs index 49a982277c..d9ef2608a7 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -10,7 +10,7 @@ use observability_deps::tracing::{debug, trace}; use predicate::predicate::Predicate; use crate::{ - statistics::{max_to_scalar, min_to_scalar}, + statistics::{max_to_scalar, min_to_scalar, null_count_as_scalar}, QueryChunkMeta, }; @@ -142,6 +142,12 @@ impl<'a> PruningStatistics for ChunkMetaStats<'a> { // on a single chunk at a time 1 } + + fn null_counts(&self, column: &Column) -> Option { + self.column_summary(&column.name) + .map(|c| null_count_as_scalar(&c.stats)) + .map(|s| s.to_array_of_size(1)) + } } #[cfg(test)] diff --git a/query/src/statistics.rs b/query/src/statistics.rs index 3c1298aa17..5f46b4f45b 100644 --- a/query/src/statistics.rs +++ b/query/src/statistics.rs @@ -49,6 +49,11 @@ pub(crate) fn max_to_scalar( } } +/// Converts stats.null_count to an appropriate `ScalarValue` +pub(crate) fn null_count_as_scalar(stats: &IOxStatistics) -> ScalarValue { + ScalarValue::UInt64(Some(stats.null_count())) +} + /// Creates a DataFusion `Statistics` object from an IOx `TableSummary` pub(crate) fn df_from_iox(schema: &Schema, summary: &TableSummary) -> DFStatistics { // reorder the column statistics so DF sees them in the same order diff --git a/query/src/test.rs b/query/src/test.rs index 9e36e57243..d2f572c073 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -22,7 +22,8 @@ use data_types::{ delete_predicate::DeletePredicate, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, }; -use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; +use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion_util::stream_from_batches; use futures::StreamExt; use observability_deps::tracing::debug; use parking_lot::Mutex; @@ -871,8 +872,7 @@ impl QueryChunk for TestChunk { self.predicates.lock().push(predicate.clone()); let batches = self.table_data.clone(); - let stream = SizedRecordBatchStream::new(batches[0].schema(), batches); - Ok(Box::pin(stream)) + Ok(stream_from_batches(batches)) } /// Returns true if data of this chunk is sorted diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index 6c2d2ee71d..45a2b64e35 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -18,7 +18,7 @@ query = { path = "../query" } workspace-hack = { path = "../workspace-hack"} [dev-dependencies] -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } metric = { path = "../metric" } object_store = { path = "../object_store" } diff --git a/read_buffer/Cargo.toml b/read_buffer/Cargo.toml index b68fccfda2..e53f92f469 100644 --- a/read_buffer/Cargo.toml +++ b/read_buffer/Cargo.toml @@ -11,7 +11,7 @@ edition = "2021" # 2. Keep change/compile/link time down during development when working on just this crate [dependencies] # In alphabetical order -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } croaring = "0.5" data_types = { path = "../data_types" } diff --git a/schema/Cargo.toml b/schema/Cargo.toml index d531a866d0..146337b264 100644 --- a/schema/Cargo.toml +++ b/schema/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" description = "IOx Schema definition" [dependencies] -arrow = { version = "7.0", features = ["prettyprint"] } +arrow = { version = "8.0", features = ["prettyprint"] } hashbrown = "0.11" indexmap = { version = "1.7", features = ["std"] } itertools = "0.10.1" diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 3e23a16daa..e8638cf01d 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -14,6 +14,7 @@ publish = false ### BEGIN HAKARI SECTION [dependencies] ahash = { version = "0.7", features = ["std"] } +arrow = { version = "8", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] } base64 = { version = "0.13", features = ["std"] } bitflags = { version = "1" } byteorder = { version = "1", features = ["std"] } @@ -37,13 +38,14 @@ num-bigint = { version = "0.4", features = ["std"] } num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = { version = "0.2", features = ["i128", "libm", "std"] } once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] } +parquet = { version = "8", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-automata = { version = "0.1", features = ["regex-syntax", "std"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } reqwest = { version = "0.11", features = ["__tls", "default-tls", "hyper-tls", "json", "native-tls-crate", "serde_json", "tokio-native-tls"] } serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] } -serde_json = { version = "1", features = ["arbitrary_precision", "indexmap", "preserve_order", "raw_value", "std"] } +serde_json = { version = "1", features = ["indexmap", "preserve_order", "raw_value", "std"] } sha2 = { version = "0.9", features = ["std"] } smallvec = { version = "1", default-features = false, features = ["union"] } tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] } @@ -82,7 +84,7 @@ rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] } -serde_json = { version = "1", features = ["arbitrary_precision", "indexmap", "preserve_order", "raw_value", "std"] } +serde_json = { version = "1", features = ["indexmap", "preserve_order", "raw_value", "std"] } sha2 = { version = "0.9", features = ["std"] } smallvec = { version = "1", default-features = false, features = ["union"] } syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] }