chore: Update datafusion, upgrade to arrow/parqet/arrow-flight 8.0.0 (#3517)

* chore: Update datafusion

* chore: update to arrow 8

* fix: update to use new DataFusion APIs

* fix: update case for sortedness

* fix: cargo hakari
pull/24376/head
Andrew Lamb 2022-01-27 08:33:27 -05:00 committed by GitHub
parent 7261571abf
commit 5488c257d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 139 additions and 96 deletions

65
Cargo.lock generated
View File

@ -84,9 +84,9 @@ dependencies = [
[[package]] [[package]]
name = "arrow" name = "arrow"
version = "7.0.0" version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66ec0a5964feebf378e2fc6db9530e712657b8edf72aa17b1b277b0f52a48e2d" checksum = "ce240772a007c63658c1d335bb424fd1019b87895dee899b7bf70e85b2d24e5f"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"chrono", "chrono",
@ -109,9 +109,9 @@ dependencies = [
[[package]] [[package]]
name = "arrow-flight" name = "arrow-flight"
version = "7.0.0" version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76c02bce74a675af058d1fb7d810a015769f2a01aad2aea505c4c913394dd4e8" checksum = "8ec7c637d2edd6969eeb63793584e0a3d17f99559b872ab73f47aea186aef50a"
dependencies = [ dependencies = [
"arrow", "arrow",
"base64 0.13.0", "base64 0.13.0",
@ -132,7 +132,7 @@ dependencies = [
"arrow", "arrow",
"chrono", "chrono",
"comfy-table", "comfy-table",
"hashbrown", "hashbrown 0.11.2",
"num-traits", "num-traits",
"rand", "rand",
"snafu", "snafu",
@ -851,14 +851,14 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "6.0.0" version = "6.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ad392fd529f9f8631b90271f7dbf3a4f2feadb7b#ad392fd529f9f8631b90271f7dbf3a4f2feadb7b" source = "git+https://github.com/apache/arrow-datafusion.git?rev=63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1#63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow", "arrow",
"async-trait", "async-trait",
"chrono", "chrono",
"futures", "futures",
"hashbrown", "hashbrown 0.12.0",
"lazy_static", "lazy_static",
"log", "log",
"num_cpus", "num_cpus",
@ -898,7 +898,7 @@ dependencies = [
"datafusion_util", "datafusion_util",
"dml", "dml",
"futures", "futures",
"hashbrown", "hashbrown 0.11.2",
"internal_types", "internal_types",
"iox_object_store", "iox_object_store",
"itertools", "itertools",
@ -1019,7 +1019,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"arrow_util", "arrow_util",
"data_types", "data_types",
"hashbrown", "hashbrown 0.11.2",
"mutable_batch", "mutable_batch",
"mutable_batch_lp", "mutable_batch_lp",
"ordered-float 2.10.0", "ordered-float 2.10.0",
@ -1468,13 +1468,22 @@ dependencies = [
"ahash", "ahash",
] ]
[[package]]
name = "hashbrown"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c21d40587b92fa6a6c6e3c1bdbf87d75511db5672f9c93175574b3a00df1758"
dependencies = [
"ahash",
]
[[package]] [[package]]
name = "hashlink" name = "hashlink"
version = "0.7.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf"
dependencies = [ dependencies = [
"hashbrown", "hashbrown 0.11.2",
] ]
[[package]] [[package]]
@ -1652,7 +1661,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown", "hashbrown 0.11.2",
] ]
[[package]] [[package]]
@ -1717,7 +1726,7 @@ dependencies = [
"flate2", "flate2",
"futures", "futures",
"generated_types", "generated_types",
"hashbrown", "hashbrown 0.11.2",
"heappy", "heappy",
"hex", "hex",
"http", "http",
@ -2162,7 +2171,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"data_types", "data_types",
"futures", "futures",
"hashbrown", "hashbrown 0.11.2",
"internal_types", "internal_types",
"observability_deps", "observability_deps",
"parking_lot", "parking_lot",
@ -2400,7 +2409,7 @@ dependencies = [
"arrow_util", "arrow_util",
"chrono", "chrono",
"data_types", "data_types",
"hashbrown", "hashbrown 0.11.2",
"itertools", "itertools",
"rand", "rand",
"schema", "schema",
@ -2413,7 +2422,7 @@ name = "mutable_batch_lp"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"arrow_util", "arrow_util",
"hashbrown", "hashbrown 0.11.2",
"influxdb_line_protocol", "influxdb_line_protocol",
"mutable_batch", "mutable_batch",
"schema", "schema",
@ -2428,7 +2437,7 @@ dependencies = [
"arrow_util", "arrow_util",
"dml", "dml",
"generated_types", "generated_types",
"hashbrown", "hashbrown 0.11.2",
"mutable_batch", "mutable_batch",
"mutable_batch_lp", "mutable_batch_lp",
"schema", "schema",
@ -2876,9 +2885,9 @@ dependencies = [
[[package]] [[package]]
name = "parquet" name = "parquet"
version = "7.0.0" version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c718575b34e488fa78d4f0286356abb8466573cb17ae8faa96ffd871ca6e8c6" checksum = "2d5a6492e0b849fd458bc9364aee4c8a9882b3cc21b2576767162725f69d2ad8"
dependencies = [ dependencies = [
"arrow", "arrow",
"base64 0.13.0", "base64 0.13.0",
@ -3386,7 +3395,7 @@ dependencies = [
"datafusion 0.1.0", "datafusion 0.1.0",
"datafusion_util", "datafusion_util",
"futures", "futures",
"hashbrown", "hashbrown 0.11.2",
"itertools", "itertools",
"libc", "libc",
"observability_deps", "observability_deps",
@ -3576,7 +3585,7 @@ dependencies = [
"data_types", "data_types",
"datafusion 0.1.0", "datafusion 0.1.0",
"either", "either",
"hashbrown", "hashbrown 0.11.2",
"itertools", "itertools",
"metric", "metric",
"observability_deps", "observability_deps",
@ -3719,7 +3728,7 @@ dependencies = [
"cache_loader_async", "cache_loader_async",
"data_types", "data_types",
"dml", "dml",
"hashbrown", "hashbrown 0.11.2",
"influxdb_iox_client", "influxdb_iox_client",
"metric", "metric",
"mutable_batch", "mutable_batch",
@ -3748,7 +3757,7 @@ dependencies = [
"flate2", "flate2",
"futures", "futures",
"generated_types", "generated_types",
"hashbrown", "hashbrown 0.11.2",
"hyper", "hyper",
"metric", "metric",
"mutable_batch", "mutable_batch",
@ -3973,7 +3982,7 @@ name = "schema"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"arrow", "arrow",
"hashbrown", "hashbrown 0.11.2",
"indexmap", "indexmap",
"itertools", "itertools",
"snafu", "snafu",
@ -4116,7 +4125,7 @@ dependencies = [
"futures", "futures",
"futures-util", "futures-util",
"generated_types", "generated_types",
"hashbrown", "hashbrown 0.11.2",
"influxdb_iox_client", "influxdb_iox_client",
"influxdb_line_protocol", "influxdb_line_protocol",
"internal_types", "internal_types",
@ -4951,7 +4960,7 @@ name = "trace_http"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"futures", "futures",
"hashbrown", "hashbrown 0.11.2",
"http", "http",
"http-body", "http-body",
"itertools", "itertools",
@ -5055,7 +5064,7 @@ name = "tracker"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"futures", "futures",
"hashbrown", "hashbrown 0.11.2",
"lock_api", "lock_api",
"metric", "metric",
"observability_deps", "observability_deps",
@ -5436,6 +5445,7 @@ name = "workspace-hack"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ahash", "ahash",
"arrow",
"base64 0.13.0", "base64 0.13.0",
"bitflags", "bitflags",
"byteorder", "byteorder",
@ -5450,7 +5460,7 @@ dependencies = [
"futures-task", "futures-task",
"futures-util", "futures-util",
"getrandom", "getrandom",
"hashbrown", "hashbrown 0.11.2",
"hyper", "hyper",
"indexmap", "indexmap",
"log", "log",
@ -5460,6 +5470,7 @@ dependencies = [
"num-integer", "num-integer",
"num-traits", "num-traits",
"once_cell", "once_cell",
"parquet",
"rand", "rand",
"regex", "regex",
"regex-automata", "regex-automata",

View File

@ -7,7 +7,7 @@ description = "Apache Arrow utilities"
[dependencies] [dependencies]
ahash = { version = "0.7.5", default-features = false } ahash = { version = "0.7.5", default-features = false }
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
# used by arrow anyway (needed for printing workaround) # used by arrow anyway (needed for printing workaround)
chrono = { version = "0.4", default-features = false } chrono = { version = "0.4", default-features = false }
comfy-table = { version = "5.0", default-features = false } comfy-table = { version = "5.0", default-features = false }

View File

@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug # Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro) # Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="ad392fd529f9f8631b90271f7dbf3a4f2feadb7b", default-features = false, package = "datafusion" } upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1", default-features = false, package = "datafusion" }
workspace-hack = { path = "../workspace-hack"} workspace-hack = { path = "../workspace-hack"}

View File

@ -5,6 +5,8 @@ use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::{ use datafusion::{
arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch}, arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch},
@ -201,6 +203,27 @@ impl RecordBatchStream for AdapterStream {
} }
} }
/// Create a SendableRecordBatchStream a RecordBatch
pub fn stream_from_batch(batch: RecordBatch) -> SendableRecordBatchStream {
stream_from_batches(vec![Arc::new(batch)])
}
/// Create a SendableRecordBatchStream from Vec of RecordBatches with the same schema
pub fn stream_from_batches(batches: Vec<Arc<RecordBatch>>) -> SendableRecordBatchStream {
let dummy_metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&dummy_metrics, 0);
let stream = SizedRecordBatchStream::new(batches[0].schema(), batches, baseline_metrics);
Box::pin(stream)
}
/// Create a SendableRecordBatchStream that sends back no RecordBatches with a specific schema
pub fn stream_from_schema(schema: SchemaRef) -> SendableRecordBatchStream {
let dummy_metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&dummy_metrics, 0);
let stream = SizedRecordBatchStream::new(schema, vec![], baseline_metrics);
Box::pin(stream)
}
/// Execute the [ExecutionPlan] with a default [RuntimeEnv] and /// Execute the [ExecutionPlan] with a default [RuntimeEnv] and
/// collect the results in memory. /// collect the results in memory.
/// ///

View File

@ -5,7 +5,7 @@ authors = ["pauldix <paul@pauldix.net>"]
edition = "2021" edition = "2021"
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
async-trait = "0.1" async-trait = "0.1"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" } datafusion = { path = "../datafusion" }

View File

@ -44,8 +44,8 @@ tracker = { path = "../tracker" }
trogging = { path = "../trogging", default-features = false, features = ["clap"] } trogging = { path = "../trogging", default-features = false, features = ["clap"] }
# Crates.io dependencies, in alphabetical order # Crates.io dependencies, in alphabetical order
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
arrow-flight = "7.0" arrow-flight = "8.0"
async-trait = "0.1" async-trait = "0.1"
backtrace = "0.3" backtrace = "0.3"
byteorder = "1.3.4" byteorder = "1.3.4"
@ -68,7 +68,7 @@ log = "0.4"
num_cpus = "1.13.0" num_cpus = "1.13.0"
once_cell = { version = "1.4.0", features = ["parking_lot"] } once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2" parking_lot = "0.11.2"
parquet = "7.0" parquet = "8.0"
pin-project = "1.0" pin-project = "1.0"
pprof = { version = "0.6", default-features = false, features = ["flamegraph", "protobuf"], optional = true } pprof = { version = "0.6", default-features = false, features = ["flamegraph", "protobuf"], optional = true }
prost = "0.9" prost = "0.9"

View File

@ -16,8 +16,8 @@ client_util = { path = "../client_util" }
generated_types = { path = "../generated_types" } generated_types = { path = "../generated_types" }
# Crates.io dependencies, in alphabetical order # Crates.io dependencies, in alphabetical order
arrow = { version = "7.0", optional = true } arrow = { version = "8.0", optional = true }
arrow-flight = { version = "7.0", optional = true } arrow-flight = { version = "8.0", optional = true }
bytes = "1.0" bytes = "1.0"
futures-util = { version = "0.3", optional = true } futures-util = { version = "0.3", optional = true }
dml = { path = "../dml", optional = true } dml = { path = "../dml", optional = true }

View File

@ -5,7 +5,7 @@ authors = ["Nga Tran <nga-tran@live.com>"]
edition = "2021" edition = "2021"
[dependencies] [dependencies]
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
datafusion = { path = "../datafusion" } datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" } data_types = { path = "../data_types" }

View File

@ -9,7 +9,11 @@ use data_types::{
delete_predicate::DeletePredicate, delete_predicate::DeletePredicate,
partition_metadata::TableSummary, partition_metadata::TableSummary,
}; };
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; use datafusion::physical_plan::{
common::SizedRecordBatchStream,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
SendableRecordBatchStream,
};
use iox_catalog::interface::Tombstone; use iox_catalog::interface::Tombstone;
use predicate::{ use predicate::{
delete_predicate::parse_delete_predicate, delete_predicate::parse_delete_predicate,
@ -185,7 +189,10 @@ impl QueryChunk for QueryableBatch {
} }
// Return sream of data // Return sream of data
let stream = SizedRecordBatchStream::new(self.schema().as_arrow(), stream_batches); let dummy_metrics = ExecutionPlanMetricsSet::new();
let baseline_metrics = BaselineMetrics::new(&dummy_metrics, 0);
let stream =
SizedRecordBatchStream::new(self.schema().as_arrow(), stream_batches, baseline_metrics);
Ok(Box::pin(stream)) Ok(Box::pin(stream))
} }

View File

@ -5,7 +5,7 @@ edition = "2021"
description = "A mutable arrow RecordBatch" description = "A mutable arrow RecordBatch"
[dependencies] [dependencies]
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
chrono = { version = "0.4", default-features = false } chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" } data_types = { path = "../data_types" }

View File

@ -5,7 +5,7 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2021" edition = "2021"
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
schema = { path = "../schema" } schema = { path = "../schema" }
metric = { path = "../metric" } metric = { path = "../metric" }

View File

@ -5,12 +5,12 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2021" edition = "2021"
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
influxdb_tsm = { path = "../influxdb_tsm" } influxdb_tsm = { path = "../influxdb_tsm" }
schema = { path = "../schema" } schema = { path = "../schema" }
snafu = "0.7" snafu = "0.7"
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
parquet = "7.0" parquet = "8.0"
workspace-hack = { path = "../workspace-hack"} workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] # In alphabetical order [dev-dependencies] # In alphabetical order

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
base64 = "0.13" base64 = "0.13"
bytes = "1.0" bytes = "1.0"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
@ -16,7 +16,7 @@ iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" } metric = { path = "../metric" }
object_store = { path = "../object_store" } object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
parquet = "7.0" parquet = "8.0"
parquet_file = { path = "../parquet_file" } parquet_file = { path = "../parquet_file" }
parquet-format = "4.0" parquet-format = "4.0"
parking_lot = "0.11.1" parking_lot = "0.11.1"

View File

@ -5,7 +5,7 @@ authors = ["Nga Tran <nga-tran@live.com>"]
edition = "2021" edition = "2021"
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
base64 = "0.13" base64 = "0.13"
bytes = "1.0" bytes = "1.0"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
@ -17,7 +17,7 @@ iox_object_store = { path = "../iox_object_store" }
metric = { path = "../metric" } metric = { path = "../metric" }
object_store = { path = "../object_store" } object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }
parquet = "7.0" parquet = {version = "8.0", features = ["experimental"]}
parquet-format = "4.0" parquet-format = "4.0"
parking_lot = "0.11.1" parking_lot = "0.11.1"
pbjson-types = "0.2" pbjson-types = "0.2"

View File

@ -346,8 +346,7 @@ mod tests {
use arrow::array::{ArrayRef, StringArray}; use arrow::array::{ArrayRef, StringArray};
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use data_types::chunk_metadata::{ChunkId, ChunkOrder}; use data_types::chunk_metadata::{ChunkId, ChunkOrder};
use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion_util::{stream_from_batch, MemoryStream};
use datafusion_util::MemoryStream;
use parquet::schema::types::ColumnPath; use parquet::schema::types::ColumnPath;
use time::Time; use time::Time;
@ -427,10 +426,7 @@ mod tests {
// write the data in // write the data in
let schema = batch.schema(); let schema = batch.schema();
let input_stream = Box::pin(SizedRecordBatchStream::new( let input_stream = stream_from_batch(batch);
batch.schema(),
vec![Arc::new(batch)],
));
let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint( let (partition_checkpoint, database_checkpoint) = create_partition_and_database_checkpoint(
Arc::clone(&table_name), Arc::clone(&table_name),
Arc::clone(&partition_key), Arc::clone(&partition_key),

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
chrono = { version = "0.4", default-features = false } chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" } data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" } datafusion = { path = "../datafusion" }

View File

@ -14,7 +14,7 @@ description = "IOx Query Interface and Executor"
# 2. Allow for query logic testing without bringing in all the storage systems. # 2. Allow for query logic testing without bringing in all the storage systems.
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
async-trait = "0.1" async-trait = "0.1"
chrono = { version = "0.4", default-features = false } chrono = { version = "0.4", default-features = false }

View File

@ -300,6 +300,7 @@ impl ExecutionPlan for SchemaPivotExec {
Ok(Box::pin(SizedRecordBatchStream::new( Ok(Box::pin(SizedRecordBatchStream::new(
self.schema(), self.schema(),
batches, batches,
baseline_metrics,
))) )))
} }

View File

@ -490,7 +490,7 @@ mod tests {
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion_util::{stream_from_batch, stream_from_schema};
use test_helpers::{str_pair_vec_to_vec, str_vec_to_arc_vec}; use test_helpers::{str_pair_vec_to_vec, str_vec_to_arc_vec};
use super::*; use super::*;
@ -498,7 +498,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_convert_empty() { async fn test_convert_empty() {
let schema = Arc::new(Schema::new(vec![])); let schema = Arc::new(Schema::new(vec![]));
let empty_iterator = Box::pin(SizedRecordBatchStream::new(schema, vec![])); let empty_iterator = stream_from_schema(schema);
let table_name = "foo"; let table_name = "foo";
let tag_columns = []; let tag_columns = [];
@ -849,9 +849,6 @@ mod tests {
} }
fn batch_to_iterator(batch: RecordBatch) -> SendableRecordBatchStream { fn batch_to_iterator(batch: RecordBatch) -> SendableRecordBatchStream {
Box::pin(SizedRecordBatchStream::new( stream_from_batch(batch)
batch.schema(),
vec![Arc::new(batch)],
))
} }
} }

View File

@ -12,6 +12,7 @@ use arrow::{
array::{Array, ArrayRef, BooleanArray}, array::{Array, ArrayRef, BooleanArray},
compute::{self, filter_record_batch}, compute::{self, filter_record_batch},
datatypes::SchemaRef, datatypes::SchemaRef,
error::ArrowError,
error::Result as ArrowResult, error::Result as ArrowResult,
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
@ -274,17 +275,16 @@ impl StreamSplitExec {
Err(e) => { Err(e) => {
debug!(%e, "error joining task"); debug!(%e, "error joining task");
for tx in &txs { for tx in &txs {
let err = DataFusionError::Execution(format!("Join Error: {}", e)); let err: ArrowError =
let err = Err(err.into_arrow_external_error()); DataFusionError::Execution(format!("Join Error: {}", e)).into();
tx.send(err).await.ok(); tx.send(Err(err)).await.ok();
} }
} }
Ok(Err(e)) => { Ok(Err(e)) => {
debug!(%e, "error in work function"); debug!(%e, "error in work function");
for tx in &txs { for tx in &txs {
let err = DataFusionError::Execution(e.to_string()); let err: ArrowError = DataFusionError::Execution(e.to_string()).into();
let err = Err(err.into_arrow_external_error()); tx.send(Err(err)).await.ok();
tx.send(err).await.ok();
} }
} }
// Input task completed successfully // Input task completed successfully

View File

@ -1055,7 +1055,7 @@ mod test {
use std::num::NonZeroU64; use std::num::NonZeroU64;
use arrow::datatypes::DataType; use arrow::datatypes::DataType;
use arrow_util::assert_batches_eq; use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_util::test_collect; use datafusion_util::test_collect;
use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME}; use schema::{builder::SchemaBuilder, TIME_COLUMN_NAME};
@ -2280,7 +2280,9 @@ mod test {
"| 10 | MT | 1970-01-01T00:00:00.000007Z |", "| 10 | MT | 1970-01-01T00:00:00.000007Z |",
"+-----------+------+--------------------------------+", "+-----------+------+--------------------------------+",
]; ];
assert_batches_eq!(&expected, &batch); // Since output is partially sorted, allow order to vary and
// test to still pass
assert_batches_sorted_eq!(&expected, &batch);
} }
#[tokio::test] #[tokio::test]

View File

@ -242,10 +242,8 @@ mod tests {
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use datafusion::physical_plan::{ use datafusion::physical_plan::{common::collect, metrics::ExecutionPlanMetricsSet};
common::{collect, SizedRecordBatchStream}, use datafusion_util::stream_from_batch;
metrics::ExecutionPlanMetricsSet,
};
use test_helpers::assert_contains; use test_helpers::assert_contains;
#[tokio::test] #[tokio::test]
@ -253,10 +251,9 @@ mod tests {
let batch = make_batch(); let batch = make_batch();
let output_schema = batch.schema(); let output_schema = batch.schema();
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); let input_stream = stream_from_batch(batch);
let adapter_stream = let adapter_stream =
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()) SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()).unwrap();
.unwrap();
let output = collect(Box::pin(adapter_stream)) let output = collect(Box::pin(adapter_stream))
.await .await
@ -283,10 +280,9 @@ mod tests {
Field::new("c", DataType::Utf8, false), Field::new("c", DataType::Utf8, false),
Field::new("a", DataType::Int32, false), Field::new("a", DataType::Int32, false),
])); ]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); let input_stream = stream_from_batch(batch);
let adapter_stream = let adapter_stream =
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()) SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()).unwrap();
.unwrap();
let output = collect(Box::pin(adapter_stream)) let output = collect(Box::pin(adapter_stream))
.await .await
@ -314,10 +310,9 @@ mod tests {
Field::new("d", DataType::Float32, false), Field::new("d", DataType::Float32, false),
Field::new("a", DataType::Int32, false), Field::new("a", DataType::Int32, false),
])); ]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); let input_stream = stream_from_batch(batch);
let adapter_stream = let adapter_stream =
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics()) SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics()).unwrap();
.unwrap();
let output = collect(Box::pin(adapter_stream)) let output = collect(Box::pin(adapter_stream))
.await .await
@ -343,9 +338,8 @@ mod tests {
Field::new("c", DataType::Utf8, false), Field::new("c", DataType::Utf8, false),
Field::new("a", DataType::Int32, false), Field::new("a", DataType::Int32, false),
])); ]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); let input_stream = stream_from_batch(batch);
let res = let res = SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics());
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics());
assert_contains!( assert_contains!(
res.unwrap_err().to_string(), res.unwrap_err().to_string(),
@ -363,9 +357,8 @@ mod tests {
Field::new("b", DataType::Int32, false), Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Int32, false), Field::new("a", DataType::Int32, false),
])); ]));
let input_stream = SizedRecordBatchStream::new(batch.schema(), vec![Arc::new(batch)]); let input_stream = stream_from_batch(batch);
let res = let res = SchemaAdapterStream::try_new(input_stream, output_schema, baseline_metrics());
SchemaAdapterStream::try_new(Box::pin(input_stream), output_schema, baseline_metrics());
assert_contains!(res.unwrap_err().to_string(), "input field 'c' had type 'Utf8' which is different than output field 'c' which had type 'Float32'"); assert_contains!(res.unwrap_err().to_string(), "input field 'c' had type 'Utf8' which is different than output field 'c' which had type 'Float32'");
} }

View File

@ -10,7 +10,7 @@ use observability_deps::tracing::{debug, trace};
use predicate::predicate::Predicate; use predicate::predicate::Predicate;
use crate::{ use crate::{
statistics::{max_to_scalar, min_to_scalar}, statistics::{max_to_scalar, min_to_scalar, null_count_as_scalar},
QueryChunkMeta, QueryChunkMeta,
}; };
@ -142,6 +142,12 @@ impl<'a> PruningStatistics for ChunkMetaStats<'a> {
// on a single chunk at a time // on a single chunk at a time
1 1
} }
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
self.column_summary(&column.name)
.map(|c| null_count_as_scalar(&c.stats))
.map(|s| s.to_array_of_size(1))
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -49,6 +49,11 @@ pub(crate) fn max_to_scalar(
} }
} }
/// Converts stats.null_count to an appropriate `ScalarValue`
pub(crate) fn null_count_as_scalar(stats: &IOxStatistics) -> ScalarValue {
ScalarValue::UInt64(Some(stats.null_count()))
}
/// Creates a DataFusion `Statistics` object from an IOx `TableSummary` /// Creates a DataFusion `Statistics` object from an IOx `TableSummary`
pub(crate) fn df_from_iox(schema: &Schema, summary: &TableSummary) -> DFStatistics { pub(crate) fn df_from_iox(schema: &Schema, summary: &TableSummary) -> DFStatistics {
// reorder the column statistics so DF sees them in the same order // reorder the column statistics so DF sees them in the same order

View File

@ -22,7 +22,8 @@ use data_types::{
delete_predicate::DeletePredicate, delete_predicate::DeletePredicate,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
}; };
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::stream_from_batches;
use futures::StreamExt; use futures::StreamExt;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use parking_lot::Mutex; use parking_lot::Mutex;
@ -871,8 +872,7 @@ impl QueryChunk for TestChunk {
self.predicates.lock().push(predicate.clone()); self.predicates.lock().push(predicate.clone());
let batches = self.table_data.clone(); let batches = self.table_data.clone();
let stream = SizedRecordBatchStream::new(batches[0].schema(), batches); Ok(stream_from_batches(batches))
Ok(Box::pin(stream))
} }
/// Returns true if data of this chunk is sorted /// Returns true if data of this chunk is sorted

View File

@ -18,7 +18,7 @@ query = { path = "../query" }
workspace-hack = { path = "../workspace-hack"} workspace-hack = { path = "../workspace-hack"}
[dev-dependencies] [dev-dependencies]
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
metric = { path = "../metric" } metric = { path = "../metric" }
object_store = { path = "../object_store" } object_store = { path = "../object_store" }

View File

@ -11,7 +11,7 @@ edition = "2021"
# 2. Keep change/compile/link time down during development when working on just this crate # 2. Keep change/compile/link time down during development when working on just this crate
[dependencies] # In alphabetical order [dependencies] # In alphabetical order
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
arrow_util = { path = "../arrow_util" } arrow_util = { path = "../arrow_util" }
croaring = "0.5" croaring = "0.5"
data_types = { path = "../data_types" } data_types = { path = "../data_types" }

View File

@ -6,7 +6,7 @@ edition = "2021"
description = "IOx Schema definition" description = "IOx Schema definition"
[dependencies] [dependencies]
arrow = { version = "7.0", features = ["prettyprint"] } arrow = { version = "8.0", features = ["prettyprint"] }
hashbrown = "0.11" hashbrown = "0.11"
indexmap = { version = "1.7", features = ["std"] } indexmap = { version = "1.7", features = ["std"] }
itertools = "0.10.1" itertools = "0.10.1"

View File

@ -14,6 +14,7 @@ publish = false
### BEGIN HAKARI SECTION ### BEGIN HAKARI SECTION
[dependencies] [dependencies]
ahash = { version = "0.7", features = ["std"] } ahash = { version = "0.7", features = ["std"] }
arrow = { version = "8", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] }
base64 = { version = "0.13", features = ["std"] } base64 = { version = "0.13", features = ["std"] }
bitflags = { version = "1" } bitflags = { version = "1" }
byteorder = { version = "1", features = ["std"] } byteorder = { version = "1", features = ["std"] }
@ -37,13 +38,14 @@ num-bigint = { version = "0.4", features = ["std"] }
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] } num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] } num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] } once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] }
parquet = { version = "8", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-automata = { version = "0.1", features = ["regex-syntax", "std"] } regex-automata = { version = "0.1", features = ["regex-syntax", "std"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", features = ["__tls", "default-tls", "hyper-tls", "json", "native-tls-crate", "serde_json", "tokio-native-tls"] } reqwest = { version = "0.11", features = ["__tls", "default-tls", "hyper-tls", "json", "native-tls-crate", "serde_json", "tokio-native-tls"] }
serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] } serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] }
serde_json = { version = "1", features = ["arbitrary_precision", "indexmap", "preserve_order", "raw_value", "std"] } serde_json = { version = "1", features = ["indexmap", "preserve_order", "raw_value", "std"] }
sha2 = { version = "0.9", features = ["std"] } sha2 = { version = "0.9", features = ["std"] }
smallvec = { version = "1", default-features = false, features = ["union"] } smallvec = { version = "1", default-features = false, features = ["union"] }
tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] } tokio = { version = "1", features = ["bytes", "fs", "full", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "signal-hook-registry", "sync", "time", "tokio-macros", "winapi"] }
@ -82,7 +84,7 @@ rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] } serde = { version = "1", features = ["derive", "rc", "serde_derive", "std"] }
serde_json = { version = "1", features = ["arbitrary_precision", "indexmap", "preserve_order", "raw_value", "std"] } serde_json = { version = "1", features = ["indexmap", "preserve_order", "raw_value", "std"] }
sha2 = { version = "0.9", features = ["std"] } sha2 = { version = "0.9", features = ["std"] }
smallvec = { version = "1", default-features = false, features = ["union"] } smallvec = { version = "1", default-features = false, features = ["union"] }
syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] } syn = { version = "1", features = ["clone-impls", "derive", "extra-traits", "full", "parsing", "printing", "proc-macro", "quote", "visit", "visit-mut"] }