chore: Update datafusion + `arrow`/`parquet`/`arrow-flight` to `15.0.0` (#4743)
* chore: Update datafusion + `arrow`/`parquet`/`arrow-flight` to `15.0.0` * chore: Update APIs * chore: Run cargo hakari tasks * feat: normalize parquet file metadata * chore: update size tests * chore: add docs on metadata stripping * chore: TEMP UPDATE TO DF BRANCH * chore: Update for new API * fix: Update to latest DF * fix: cargo hakari Co-authored-by: CircleCI[bot] <circleci@influxdata.com> Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>pull/24376/head
parent
9a21292db8
commit
3592aa52d8
arrow_util
compactor
datafusion
influxdb_iox
influxdb_iox_client
ingester
iox_query
iox_tests
ioxd_querier
mutable_batch
packers
parquet_file
predicate
querier
src
system_tables
table
query_functions
query_tests
read_buffer
src
schema
service_grpc_flight
service_grpc_influxrpc
test_helpers_end_to_end
workspace-hack
|
@ -96,9 +96,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
|
|||
|
||||
[[package]]
|
||||
name = "arrow"
|
||||
version = "14.0.0"
|
||||
version = "15.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0612b6a634de6c3f5e63fdaa6932f7bc598f92de0462ac6e69b0aebd77e093aa"
|
||||
checksum = "6510d919fa4c27880f54430510d09327d7c86699c3692664bc0bb7c314f71385"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"chrono",
|
||||
|
@ -121,9 +121,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-flight"
|
||||
version = "14.0.0"
|
||||
version = "15.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce7b7cfa8eb0dcb0691f18b6a1d9c81cfe3c42726c254be5128d15ebe7580a1d"
|
||||
checksum = "3683337e9a7a300ae42355c96a2e5d421cf9dbe16f09ce8a6475157fe69dc92d"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"base64 0.13.0",
|
||||
|
@ -1172,7 +1172,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"arrow",
|
||||
|
@ -1181,11 +1181,14 @@ dependencies = [
|
|||
"datafusion-common",
|
||||
"datafusion-data-access",
|
||||
"datafusion-expr",
|
||||
"datafusion-optimizer",
|
||||
"datafusion-physical-expr",
|
||||
"datafusion-row",
|
||||
"datafusion-sql",
|
||||
"futures",
|
||||
"glob",
|
||||
"hashbrown 0.12.1",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"num_cpus",
|
||||
|
@ -1200,13 +1203,14 @@ dependencies = [
|
|||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"url",
|
||||
"uuid 1.0.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"ordered-float 3.0.0",
|
||||
|
@ -1217,12 +1221,11 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-data-access"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"futures",
|
||||
"glob",
|
||||
"parking_lot 0.12.1",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
|
@ -1231,7 +1234,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"arrow",
|
||||
|
@ -1239,10 +1242,24 @@ dependencies = [
|
|||
"sqlparser",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-optimizer"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
"hashbrown 0.12.1",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "datafusion-physical-expr"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"arrow",
|
||||
|
@ -1266,7 +1283,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-proto"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion 8.0.0",
|
||||
|
@ -1279,7 +1296,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-row"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"datafusion-common",
|
||||
|
@ -1290,7 +1307,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-sql"
|
||||
version = "8.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=7b7edf9c43383c1d3310286b69d2d037db72c967#7b7edf9c43383c1d3310286b69d2d037db72c967"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=8ddd99c8432fdac2c236040973f984a4146f18b7#8ddd99c8432fdac2c236040973f984a4146f18b7"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"arrow",
|
||||
|
@ -3451,9 +3468,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "parquet"
|
||||
version = "14.0.0"
|
||||
version = "15.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba1185ee1da5091e40b86519265a44d2704e3916ff867059c915141cab14d413"
|
||||
checksum = "94d31dde60b151ef88ec2c847e3a8f66d42d7dbdaeefd05d13d79db676b0b56f"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"base64 0.13.0",
|
||||
|
|
|
@ -7,7 +7,7 @@ description = "Apache Arrow utilities"
|
|||
|
||||
[dependencies]
|
||||
ahash = { version = "0.7.5", default-features = false }
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
# used by arrow anyway (needed for printing workaround)
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
comfy-table = { version = "6.0", default-features = false }
|
||||
|
@ -18,5 +18,5 @@ snafu = "0.7"
|
|||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
arrow-flight = "14.0.0"
|
||||
arrow-flight = "15.0.0"
|
||||
rand = "0.8.3"
|
||||
|
|
|
@ -146,21 +146,20 @@ impl StringDictionary<i32> {
|
|||
// is entirely non null
|
||||
let dictionary_nulls = None;
|
||||
let keys = keys.into_iter();
|
||||
let mut array_builder = ArrayDataBuilder::new(DataType::Dictionary(
|
||||
|
||||
let array_data = ArrayDataBuilder::new(DataType::Dictionary(
|
||||
Box::new(DataType::Int32),
|
||||
Box::new(DataType::Utf8),
|
||||
))
|
||||
.len(keys.len())
|
||||
.add_buffer(keys.collect())
|
||||
.add_child_data(self.storage.to_arrow(dictionary_nulls).data().clone());
|
||||
|
||||
if let Some(nulls) = nulls {
|
||||
array_builder = array_builder.null_bit_buffer(nulls);
|
||||
}
|
||||
|
||||
.add_child_data(self.storage.to_arrow(dictionary_nulls).data().clone())
|
||||
.null_bit_buffer(nulls)
|
||||
// TODO consider skipping the validation checks by using
|
||||
// `build_unchecked()`
|
||||
let array_data = array_builder.build().expect("Valid array data");
|
||||
.build()
|
||||
.expect("Valid array data");
|
||||
|
||||
DictionaryArray::<Int32Type>::from(array_data)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -390,7 +390,7 @@ mod tests {
|
|||
))
|
||||
.len(keys.len())
|
||||
.add_buffer(keys.data().buffers()[0].clone())
|
||||
.null_bit_buffer(keys.data().null_buffer().unwrap().clone())
|
||||
.null_bit_buffer(keys.data().null_buffer().cloned())
|
||||
.add_child_data(values.data().clone())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
|
|
@ -162,16 +162,11 @@ impl PackedStringArray<i32> {
|
|||
let offsets = Buffer::from_slice_ref(&self.offsets);
|
||||
let values = Buffer::from(self.storage.as_bytes());
|
||||
|
||||
let mut array_builder = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
|
||||
let data = ArrayDataBuilder::new(arrow::datatypes::DataType::Utf8)
|
||||
.len(len)
|
||||
.add_buffer(offsets)
|
||||
.add_buffer(values);
|
||||
|
||||
if let Some(nulls) = nulls {
|
||||
array_builder = array_builder.null_bit_buffer(nulls);
|
||||
}
|
||||
|
||||
let data = array_builder
|
||||
.add_buffer(values)
|
||||
.null_bit_buffer(nulls)
|
||||
.build()
|
||||
// TODO consider skipping the validation checks by using
|
||||
// `new_unchecked`
|
||||
|
|
|
@ -5,7 +5,7 @@ authors = ["Luke Bond <luke.n.bond@gmail.com>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
async-trait = "0.1.56"
|
||||
backoff = { path = "../backoff" }
|
||||
bytes = "1.0"
|
||||
|
|
|
@ -9,6 +9,6 @@ description = "Re-exports datafusion at a specific version"
|
|||
|
||||
# Rename to workaround doctest bug
|
||||
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
|
||||
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="7b7edf9c43383c1d3310286b69d2d037db72c967", default-features = false, package = "datafusion" }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="7b7edf9c43383c1d3310286b69d2d037db72c967" }
|
||||
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="8ddd99c8432fdac2c236040973f984a4146f18b7", default-features = false, package = "datafusion" }
|
||||
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="8ddd99c8432fdac2c236040973f984a4146f18b7" }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -35,7 +35,7 @@ trogging = { path = "../trogging", default-features = false, features = ["clap"]
|
|||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
ansi_term = "0.12"
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
backtrace = "0.3"
|
||||
bytes = "1.0"
|
||||
clap = { version = "3", features = ["derive", "env"] }
|
||||
|
|
|
@ -20,8 +20,8 @@ mutable_batch_lp = { path = "../mutable_batch_lp", optional = true }
|
|||
mutable_batch_pb = { path = "../mutable_batch_pb", optional = true }
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
arrow = { version = "14.0.0", optional = true }
|
||||
arrow-flight = { version = "14.0.0", optional = true }
|
||||
arrow = { version = "15.0.0", optional = true }
|
||||
arrow-flight = { version = "15.0.0", optional = true }
|
||||
bytes = "1.0"
|
||||
futures-util = { version = "0.3", optional = true }
|
||||
prost = "0.10"
|
||||
|
|
|
@ -5,8 +5,8 @@ authors = ["Nga Tran <nga-tran@live.com>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow-flight = "14.0.0"
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow-flight = "15.0.0"
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
async-trait = "0.1.56"
|
||||
backoff = { path = "../backoff" }
|
||||
|
|
|
@ -14,7 +14,7 @@ description = "IOx Query Interface and Executor"
|
|||
# 2. Allow for query logic testing without bringing in all the storage systems.
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
async-trait = "0.1"
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
|
|
|
@ -8,6 +8,7 @@ use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
|
|||
use datafusion::{
|
||||
datasource::TableProvider,
|
||||
error::{DataFusionError, Result as DataFusionResult},
|
||||
execution::context::SessionState,
|
||||
logical_expr::{TableProviderFilterPushDown, TableType},
|
||||
logical_plan::Expr,
|
||||
physical_plan::{
|
||||
|
@ -240,6 +241,7 @@ impl TableProvider for ChunkTableProvider {
|
|||
|
||||
async fn scan(
|
||||
&self,
|
||||
_ctx: &SessionState,
|
||||
projection: &Option<Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
|
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
|||
description = "IOx test utils and tests"
|
||||
|
||||
[dependencies]
|
||||
arrow = "14.0.0"
|
||||
arrow = "15.0.0"
|
||||
bytes = "1.0"
|
||||
data_types = { path = "../data_types" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
|
|
|
@ -21,7 +21,7 @@ iox_time = { path = "../iox_time" }
|
|||
trace = { path = "../trace" }
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
arrow-flight = "14.0.0"
|
||||
arrow-flight = "15.0.0"
|
||||
async-trait = "0.1"
|
||||
hyper = "0.14"
|
||||
tokio = { version = "1.18", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
||||
|
|
|
@ -5,7 +5,7 @@ edition = "2021"
|
|||
description = "A mutable arrow RecordBatch"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
data_types = { path = "../data_types" }
|
||||
|
|
|
@ -257,7 +257,7 @@ impl Column {
|
|||
let data = ArrayDataBuilder::new(DataType::Float64)
|
||||
.len(data.len())
|
||||
.add_buffer(data.iter().cloned().collect())
|
||||
.null_bit_buffer(nulls)
|
||||
.null_bit_buffer(Some(nulls))
|
||||
.build()
|
||||
.context(CreatingArrowArraySnafu)?;
|
||||
Arc::new(Float64Array::from(data))
|
||||
|
@ -267,7 +267,7 @@ impl Column {
|
|||
let data = ArrayDataBuilder::new(TIME_DATA_TYPE())
|
||||
.len(data.len())
|
||||
.add_buffer(data.iter().cloned().collect())
|
||||
.null_bit_buffer(nulls)
|
||||
.null_bit_buffer(Some(nulls))
|
||||
.build()
|
||||
.context(CreatingArrowArraySnafu)?;
|
||||
Arc::new(TimestampNanosecondArray::from(data))
|
||||
|
@ -277,7 +277,7 @@ impl Column {
|
|||
let data = ArrayDataBuilder::new(DataType::Int64)
|
||||
.len(data.len())
|
||||
.add_buffer(data.iter().cloned().collect())
|
||||
.null_bit_buffer(nulls)
|
||||
.null_bit_buffer(Some(nulls))
|
||||
.build()
|
||||
.context(CreatingArrowArraySnafu)?;
|
||||
Arc::new(Int64Array::from(data))
|
||||
|
@ -288,7 +288,7 @@ impl Column {
|
|||
let data = ArrayDataBuilder::new(DataType::UInt64)
|
||||
.len(data.len())
|
||||
.add_buffer(data.iter().cloned().collect())
|
||||
.null_bit_buffer(nulls)
|
||||
.null_bit_buffer(Some(nulls))
|
||||
.build()
|
||||
.context(CreatingArrowArraySnafu)?;
|
||||
Arc::new(UInt64Array::from(data))
|
||||
|
@ -298,7 +298,7 @@ impl Column {
|
|||
let data = ArrayDataBuilder::new(DataType::Boolean)
|
||||
.len(data.len())
|
||||
.add_buffer(data.to_arrow())
|
||||
.null_bit_buffer(nulls)
|
||||
.null_bit_buffer(Some(nulls))
|
||||
.build()
|
||||
.context(CreatingArrowArraySnafu)?;
|
||||
Arc::new(BooleanArray::from(data))
|
||||
|
|
|
@ -5,11 +5,11 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
influxdb_tsm = { path = "../influxdb_tsm" }
|
||||
schema = { path = "../schema" }
|
||||
snafu = "0.7"
|
||||
parquet = "14.0.0"
|
||||
parquet = "15.0.0"
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
|
|
|
@ -5,7 +5,7 @@ authors = ["Nga Tran <nga-tran@live.com>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
base64 = "0.13"
|
||||
bytes = "1.0"
|
||||
data_types = { path = "../data_types" }
|
||||
|
@ -18,7 +18,7 @@ metric = { path = "../metric" }
|
|||
object_store = "0.0.1"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.12"
|
||||
parquet = {version = "14.0.0", features = ["experimental"]}
|
||||
parquet = {version = "15.0.0", features = ["experimental"]}
|
||||
parquet-format = "4.0"
|
||||
pbjson-types = "0.3"
|
||||
predicate = { path = "../predicate" }
|
||||
|
|
|
@ -713,7 +713,12 @@ impl DecodedIoxParquetMetaData {
|
|||
)
|
||||
.context(ArrowFromParquetFailureSnafu {})?;
|
||||
|
||||
let arrow_schema_ref = Arc::new(arrow_schema);
|
||||
// The parquet reader will propagate any metadata keys present in the parquet
|
||||
// metadata onto the arrow schema. This will include the encoded IOxMetadata
|
||||
//
|
||||
// We strip this out to avoid false negatives when comparing schemas for equality,
|
||||
// as this metadata will vary from file to file
|
||||
let arrow_schema_ref = Arc::new(arrow_schema.with_metadata(Default::default()));
|
||||
|
||||
let schema: Schema = arrow_schema_ref
|
||||
.try_into()
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Streaming [`RecordBatch`] / Parquet file encoder routines.
|
||||
|
||||
use std::{ops::DerefMut, sync::Arc};
|
||||
use std::{io::Write, sync::Arc};
|
||||
|
||||
use arrow::{error::ArrowError, record_batch::RecordBatch};
|
||||
use futures::{pin_mut, Stream, StreamExt};
|
||||
|
@ -9,11 +9,7 @@ use parquet::{
|
|||
arrow::ArrowWriter,
|
||||
basic::Compression,
|
||||
errors::ParquetError,
|
||||
file::{
|
||||
metadata::KeyValue,
|
||||
properties::WriterProperties,
|
||||
writer::{InMemoryWriteableCursor, ParquetWriter},
|
||||
},
|
||||
file::{metadata::KeyValue, properties::WriterProperties},
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -65,20 +61,15 @@ pub enum CodecError {
|
|||
/// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1
|
||||
/// [`FileMetaData`]: parquet_format::FileMetaData
|
||||
/// [`IoxParquetMetaData`]: crate::metadata::IoxParquetMetaData
|
||||
pub async fn to_parquet<S, W, T>(
|
||||
pub async fn to_parquet<S, W>(
|
||||
batches: S,
|
||||
meta: &IoxMetadata,
|
||||
sink: W,
|
||||
) -> Result<parquet_format::FileMetaData, CodecError>
|
||||
where
|
||||
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
|
||||
W: DerefMut<Target = T> + Send,
|
||||
T: ParquetWriter + Send + 'static,
|
||||
W: Write + Send,
|
||||
{
|
||||
// Let the caller pass a mutable ref to something that impls ParquetWriter
|
||||
// while still providing the necessary owned impl to the ArrowWriter.
|
||||
let sink = sink.deref().try_clone().map_err(CodecError::CloneSink)?;
|
||||
|
||||
let stream = batches.peekable();
|
||||
pin_mut!(stream);
|
||||
|
||||
|
@ -124,7 +115,7 @@ pub async fn to_parquet_bytes<S>(
|
|||
where
|
||||
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
|
||||
{
|
||||
let mut w = InMemoryWriteableCursor::default();
|
||||
let mut bytes = vec![];
|
||||
|
||||
let partition_id = meta.partition_id;
|
||||
debug!(
|
||||
|
@ -134,14 +125,14 @@ where
|
|||
);
|
||||
|
||||
// Serialize the record batches into the in-memory buffer
|
||||
let meta = to_parquet(batches, meta, &mut w).await?;
|
||||
let meta = to_parquet(batches, meta, &mut bytes).await?;
|
||||
if meta.row_groups.is_empty() {
|
||||
// panic here to avoid later consequence of reading it for statistics
|
||||
panic!("partition_id={}. Created Parquet metadata has no column metadata. HINT a common reason of this is writing empty data to parquet file: {:#?}", partition_id, meta);
|
||||
}
|
||||
|
||||
trace!(?partition_id, ?meta, "Parquet Metadata");
|
||||
|
||||
let mut bytes = w
|
||||
.into_inner()
|
||||
.expect("mem writer has outstanding reference");
|
||||
|
||||
bytes.shrink_to_fit();
|
||||
|
||||
Ok((bytes, meta))
|
||||
|
|
|
@ -12,7 +12,7 @@ use arrow::{
|
|||
record_batch::RecordBatch,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use datafusion::{parquet::arrow::ProjectionMask, physical_plan::SendableRecordBatchStream};
|
||||
use datafusion_util::{AdapterStream, AutoAbortJoinHandle};
|
||||
use futures::{Stream, TryStreamExt};
|
||||
use object_store::{DynObjectStore, GetResult};
|
||||
|
@ -277,7 +277,12 @@ async fn download_and_scan_parquet(
|
|||
let cursor = SliceableCursor::new(data);
|
||||
let file_reader = SerializedFileReader::new(cursor)?;
|
||||
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
|
||||
let record_batch_reader = arrow_reader.get_record_reader_by_columns(projection, batch_size)?;
|
||||
|
||||
let mask = ProjectionMask::roots(
|
||||
arrow_reader.get_metadata().file_metadata().schema_descr(),
|
||||
projection,
|
||||
);
|
||||
let record_batch_reader = arrow_reader.get_record_reader_by_columns(mask, batch_size)?;
|
||||
|
||||
for batch in record_batch_reader {
|
||||
if tx.send(batch).await.is_err() {
|
||||
|
|
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
data_types = { path = "../data_types" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
|
|
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = "14.0.0"
|
||||
arrow = "15.0.0"
|
||||
async-trait = "0.1.56"
|
||||
backoff = { path = "../backoff" }
|
||||
cache_system = { path = "../cache_system" }
|
||||
|
|
|
@ -6,7 +6,7 @@ use datafusion::{
|
|||
catalog::schema::SchemaProvider,
|
||||
datasource::TableProvider,
|
||||
error::Result as DataFusionResult,
|
||||
execution::context::TaskContext,
|
||||
execution::context::{SessionState, TaskContext},
|
||||
logical_expr::TableType,
|
||||
physical_plan::{
|
||||
expressions::PhysicalSortExpr, ExecutionPlan, Partitioning, RecordBatchStream,
|
||||
|
@ -99,6 +99,7 @@ where
|
|||
|
||||
async fn scan(
|
||||
&self,
|
||||
_ctx: &SessionState,
|
||||
projection: &Option<Vec<usize>>,
|
||||
// It would be cool to push projection and limit down
|
||||
_filters: &[datafusion::logical_plan::Expr],
|
||||
|
|
|
@ -5,6 +5,7 @@ use async_trait::async_trait;
|
|||
use datafusion::{
|
||||
datasource::{TableProvider, TableType},
|
||||
error::DataFusionError,
|
||||
execution::context::SessionState,
|
||||
logical_expr::TableProviderFilterPushDown,
|
||||
logical_plan::Expr,
|
||||
physical_plan::ExecutionPlan,
|
||||
|
@ -35,6 +36,7 @@ impl TableProvider for QuerierTable {
|
|||
|
||||
async fn scan(
|
||||
&self,
|
||||
ctx: &SessionState,
|
||||
projection: &Option<Vec<usize>>,
|
||||
filters: &[Expr],
|
||||
limit: Option<usize>,
|
||||
|
@ -63,7 +65,7 @@ impl TableProvider for QuerierTable {
|
|||
Err(e) => panic!("unexpected error: {:?}", e),
|
||||
};
|
||||
|
||||
provider.scan(projection, filters, limit).await
|
||||
provider.scan(ctx, projection, filters, limit).await
|
||||
}
|
||||
|
||||
fn supports_filter_pushdown(
|
||||
|
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
datafusion = { path = "../datafusion" }
|
||||
itertools = "0.10.2"
|
||||
|
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
|||
description = "Tests of the query engine against different database configurations"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
async-trait = "0.1"
|
||||
backoff = { path = "../backoff" }
|
||||
|
@ -33,7 +33,7 @@ workspace-hack = { path = "../workspace-hack"}
|
|||
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
||||
|
||||
[dev-dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
snafu = "0.7"
|
||||
tempfile = "3.1.0"
|
||||
|
|
|
@ -11,7 +11,7 @@ edition = "2021"
|
|||
# 2. Keep change/compile/link time down during development when working on just this crate
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
croaring = "0.6"
|
||||
data_types = { path = "../data_types" }
|
||||
|
|
|
@ -730,9 +730,9 @@ mod test {
|
|||
kind: MetricKind::U64Gauge,
|
||||
observations: vec![
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "BT_U32-FIXED"), ("log_data_type", "i64")]), Observation::U64Gauge(192)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FBT_U8-FIXEDN"), ("log_data_type", "f64")]), Observation::U64Gauge(906)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FBT_U8-FIXEDN"), ("log_data_type", "f64")]), Observation::U64Gauge(1002)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FIXED"), ("log_data_type", "f64")]), Observation::U64Gauge(186)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FIXEDN"), ("log_data_type", "bool")]), Observation::U64Gauge(672)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FIXEDN"), ("log_data_type", "bool")]), Observation::U64Gauge(768)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "RLE"), ("log_data_type", "string")]), Observation::U64Gauge(688)),
|
||||
]
|
||||
},
|
||||
|
@ -759,9 +759,9 @@ mod test {
|
|||
kind: MetricKind::U64Gauge,
|
||||
observations: vec![
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "BT_U32-FIXED"), ("log_data_type", "i64")]), Observation::U64Gauge(192)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FBT_U8-FIXEDN"), ("log_data_type", "f64")]), Observation::U64Gauge(906)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FBT_U8-FIXEDN"), ("log_data_type", "f64")]), Observation::U64Gauge(1002)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FIXED"), ("log_data_type", "f64")]), Observation::U64Gauge(186)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FIXEDN"), ("log_data_type", "bool")]), Observation::U64Gauge(672)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "FIXEDN"), ("log_data_type", "bool")]), Observation::U64Gauge(768)),
|
||||
(Attributes::from(&[("db_name", "mydb"), ("encoding", "RLE"), ("log_data_type", "string")]), Observation::U64Gauge(352)),
|
||||
]
|
||||
},
|
||||
|
|
|
@ -361,7 +361,7 @@ mod test {
|
|||
#[test]
|
||||
fn size() {
|
||||
let v = Bool::from(vec![None, None, Some(true), Some(false)].as_slice());
|
||||
assert_eq!(v.size(), 400);
|
||||
assert_eq!(v.size(), 448);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -544,8 +544,8 @@ mod test {
|
|||
#[test]
|
||||
fn size() {
|
||||
let (v, _) = new_mock_encoding(vec![None, None, Some(100), Some(2222)]);
|
||||
assert_eq!(v.size(false), 408);
|
||||
assert_eq!(v.size(true), 408); // no difference in reported size
|
||||
assert_eq!(v.size(false), 456);
|
||||
assert_eq!(v.size(true), 456); // no difference in reported size
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -971,13 +971,13 @@ mod test {
|
|||
|
||||
// Input data containing NULL will be stored in an Arrow array encoding
|
||||
let cases = vec![
|
||||
(vec![None, Some(0_i64)], 460_usize), // u8 Arrow array
|
||||
(vec![None, Some(-120_i64)], 460), // i8
|
||||
(vec![None, Some(399_i64)], 461), // u16
|
||||
(vec![None, Some(-399_i64)], 461), // i16
|
||||
(vec![None, Some(u32::MAX as i64)], 461), // u32
|
||||
(vec![None, Some(i32::MIN as i64)], 461), // i32
|
||||
(vec![None, Some(u32::MAX as i64 + 1)], 454), // u64
|
||||
(vec![None, Some(0_i64)], 508_usize), // u8 Arrow array
|
||||
(vec![None, Some(-120_i64)], 508), // i8
|
||||
(vec![None, Some(399_i64)], 509), // u16
|
||||
(vec![None, Some(-399_i64)], 509), // i16
|
||||
(vec![None, Some(u32::MAX as i64)], 509), // u32
|
||||
(vec![None, Some(i32::MIN as i64)], 509), // i32
|
||||
(vec![None, Some(u32::MAX as i64 + 1)], 502), // u64
|
||||
];
|
||||
|
||||
for (case, name) in cases.iter().cloned() {
|
||||
|
@ -1163,10 +1163,10 @@ mod test {
|
|||
|
||||
// Input data containing NULL will be stored in an Arrow array encoding
|
||||
let cases = vec![
|
||||
(vec![None, Some(0_u64)], 460_usize),
|
||||
(vec![None, Some(399_u64)], 461),
|
||||
(vec![None, Some(u32::MAX as u64)], 461),
|
||||
(vec![None, Some(u64::MAX)], 454),
|
||||
(vec![None, Some(0_u64)], 508_usize),
|
||||
(vec![None, Some(399_u64)], 509),
|
||||
(vec![None, Some(u32::MAX as u64)], 509),
|
||||
(vec![None, Some(u64::MAX)], 502),
|
||||
];
|
||||
|
||||
for (case, size) in cases.iter().cloned() {
|
||||
|
|
|
@ -1670,18 +1670,16 @@ impl From<Values<'_>> for arrow::array::ArrayRef {
|
|||
values.into_iter().collect::<arrow::array::StringArray>()
|
||||
};
|
||||
|
||||
let mut builder = ArrayDataBuilder::new(DataType::Dictionary(
|
||||
let data = ArrayDataBuilder::new(DataType::Dictionary(
|
||||
Box::new(DataType::Int32),
|
||||
Box::new(DataType::Utf8),
|
||||
))
|
||||
.len(keys.len())
|
||||
.add_buffer(Buffer::from_iter(keys))
|
||||
.add_child_data(values_arr.data().clone());
|
||||
|
||||
if let Some(bm) = null_bitmap {
|
||||
builder = builder.null_bit_buffer(bm.to_arrow());
|
||||
}
|
||||
let data = builder.build().unwrap();
|
||||
.add_child_data(values_arr.data().clone())
|
||||
.null_bit_buffer(null_bitmap.map(|bm| bm.to_arrow()))
|
||||
.build()
|
||||
.unwrap();
|
||||
Arc::new(DictionaryArray::<Int32Type>::from(data))
|
||||
}
|
||||
Values::I64(values) => Arc::new(arrow::array::Int64Array::from(values)),
|
||||
|
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
|||
description = "IOx Schema definition"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
hashbrown = "0.12"
|
||||
indexmap = { version = "1.8", features = ["std"] }
|
||||
itertools = "0.10.1"
|
||||
|
|
|
@ -16,8 +16,8 @@ iox_query = { path = "../iox_query" }
|
|||
service_common = { path = "../service_common" }
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow-flight = "14.0.0"
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow-flight = "15.0.0"
|
||||
bytes = "1.0"
|
||||
futures = "0.3"
|
||||
pin-project = "1.0"
|
||||
|
|
|
@ -16,7 +16,7 @@ schema = { path = "../schema" }
|
|||
service_common = { path = "../service_common" }
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
pin-project = "1.0"
|
||||
|
|
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "14.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
assert_cmd = "2.0.2"
|
||||
bytes = "1.0"
|
||||
|
|
|
@ -14,7 +14,7 @@ publish = false
|
|||
### BEGIN HAKARI SECTION
|
||||
[dependencies]
|
||||
ahash = { version = "0.7", features = ["std"] }
|
||||
arrow = { version = "14", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] }
|
||||
arrow = { version = "15", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] }
|
||||
bitflags = { version = "1" }
|
||||
byteorder = { version = "1", features = ["std"] }
|
||||
bytes = { version = "1", features = ["std"] }
|
||||
|
@ -42,7 +42,7 @@ num-integer = { version = "0.1", default-features = false, features = ["i128", "
|
|||
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
|
||||
object_store = { version = "0.0.1", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "cloud-storage", "gcp", "hyper", "hyper-rustls", "reqwest", "rusoto_core", "rusoto_credential", "rusoto_s3"] }
|
||||
once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] }
|
||||
parquet = { version = "14", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
|
||||
parquet = { version = "15", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
|
||||
predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] }
|
||||
proptest = { version = "1", features = ["bit-set", "break-dead-code", "fork", "lazy_static", "quick-error", "regex-syntax", "rusty-fork", "std", "tempfile", "timeout"] }
|
||||
prost = { version = "0.10", features = ["prost-derive", "std"] }
|
||||
|
|
Loading…
Reference in New Issue