From a30803e692eb7191678f074321e6738f31d41995 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 15 Feb 2022 07:10:24 -0500 Subject: [PATCH] chore: Update datafusion, update `arrow`/`parquet`/`arrow-flight` to 9.0 (#3733) * chore: Update datafusion * chore: Update arrow * fix: missing updates * chore: Update cargo.lock * fix: update for smaller parquet size * fix: update test for smaller parquet files * test: ensure parquet_file tests write multiple row groups * fix: update callsite * fix: Update for tests * fix: harkari * fix: use IoxObjectStore::existing Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 31 +++++++------ arrow_util/Cargo.toml | 2 +- datafusion/Cargo.toml | 2 +- db/Cargo.toml | 2 +- influxdb_iox/Cargo.toml | 6 +-- influxdb_iox_client/Cargo.toml | 4 +- ingester/Cargo.toml | 7 +-- ingester/src/persist.rs | 16 ++++++- mutable_batch/Cargo.toml | 2 +- mutable_buffer/Cargo.toml | 2 +- packers/Cargo.toml | 4 +- parquet_catalog/Cargo.toml | 6 +-- parquet_catalog/src/dump.rs | 8 ++-- parquet_file/Cargo.toml | 6 +-- parquet_file/src/metadata.rs | 5 +- parquet_file/src/storage.rs | 59 +++++++++++++++++------- parquet_file/src/test_utils.rs | 2 + parquet_file/src/test_utils/generator.rs | 6 +++ predicate/Cargo.toml | 2 +- querier/Cargo.toml | 2 +- query/Cargo.toml | 2 +- query_tests/Cargo.toml | 2 +- read_buffer/Cargo.toml | 2 +- schema/Cargo.toml | 2 +- workspace-hack/Cargo.toml | 4 +- 25 files changed, 119 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c678f31a82..af7b2bfe96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,9 +84,9 @@ dependencies = [ [[package]] name = "arrow" -version = "8.0.0" +version = "9.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce240772a007c63658c1d335bb424fd1019b87895dee899b7bf70e85b2d24e5f" +checksum = "374ec8e5f39015ab754cfc63398a897423c877b66aecbfd036903b2c9c07f244" dependencies = [ "bitflags", "chrono", @@ -109,9 +109,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "8.0.0" +version = "9.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ec7c637d2edd6969eeb63793584e0a3d17f99559b872ab73f47aea186aef50a" +checksum = "f9234708bb25937fe23f6cd49400163502ddef84038b3fc859b4ec743840e07a" dependencies = [ "arrow", "base64 0.13.0", @@ -917,7 +917,7 @@ dependencies = [ [[package]] name = "datafusion" version = "7.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=0a50dcdfb1f2854e59b17da2d87c106c614b226d#0a50dcdfb1f2854e59b17da2d87c106c614b226d" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=81e76edc330067332ac9a3972525ecbe92953267#81e76edc330067332ac9a3972525ecbe92953267" dependencies = [ "ahash", "arrow", @@ -946,7 +946,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "7.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=0a50dcdfb1f2854e59b17da2d87c106c614b226d#0a50dcdfb1f2854e59b17da2d87c106c614b226d" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=81e76edc330067332ac9a3972525ecbe92953267#81e76edc330067332ac9a3972525ecbe92953267" dependencies = [ "arrow", "ordered-float 2.10.0", @@ -957,7 +957,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "7.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=0a50dcdfb1f2854e59b17da2d87c106c614b226d#0a50dcdfb1f2854e59b17da2d87c106c614b226d" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=81e76edc330067332ac9a3972525ecbe92953267#81e76edc330067332ac9a3972525ecbe92953267" dependencies = [ "ahash", "arrow", @@ -2037,6 +2037,7 @@ dependencies = [ "generated_types", "hyper", "iox_catalog", + "iox_object_store", "metric", "mutable_batch", "mutable_batch_lp", @@ -3262,9 +3263,9 @@ dependencies = [ [[package]] name = "parquet" -version = "8.0.0" +version = "9.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d5a6492e0b849fd458bc9364aee4c8a9882b3cc21b2576767162725f69d2ad8" +checksum = "36531313e3e81a646672001c02d1764fabc1320055fe8b176e696b08d3cf44cd" dependencies = [ "arrow", "base64 0.13.0", @@ -6104,18 +6105,18 @@ checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006" [[package]] name = "zstd" -version = "0.9.2+zstd.1.5.1" +version = "0.10.0+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2390ea1bf6c038c39674f22d95f0564725fc06034a47129179810b2fc58caa54" +checksum = "3b1365becbe415f3f0fcd024e2f7b45bacfb5bdd055f0dc113571394114e7bdd" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "4.1.3+zstd.1.5.1" +version = "4.1.4+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e99d81b99fb3c2c2c794e3fe56c305c63d5173a16a46b5850b07c935ffc7db79" +checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee" dependencies = [ "libc", "zstd-sys", @@ -6123,9 +6124,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "1.6.2+zstd.1.5.1" +version = "1.6.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2daf2f248d9ea44454bfcb2516534e8b8ad2fc91bf818a1885495fc42bc8ac9f" +checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8" dependencies = [ "cc", "libc", diff --git a/arrow_util/Cargo.toml b/arrow_util/Cargo.toml index 25bb9c9b9d..ccd6b339a0 100644 --- a/arrow_util/Cargo.toml +++ b/arrow_util/Cargo.toml @@ -7,7 +7,7 @@ description = "Apache Arrow utilities" [dependencies] ahash = { version = "0.7.5", default-features = false } -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } # used by arrow anyway (needed for printing workaround) chrono = { version = "0.4", default-features = false } comfy-table = { version = "5.0", default-features = false } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index d7d85d1686..42c89a2831 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (e.g. don't get support for crypo functions or avro) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="0a50dcdfb1f2854e59b17da2d87c106c614b226d", default-features = false, package = "datafusion" } +upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="81e76edc330067332ac9a3972525ecbe92953267", default-features = false, package = "datafusion" } workspace-hack = { path = "../workspace-hack"} diff --git a/db/Cargo.toml b/db/Cargo.toml index e5e8bfd9d2..662b0214c0 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -5,7 +5,7 @@ authors = ["pauldix "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } async-trait = "0.1" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 7fc4a3f27f..632c30894b 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -50,8 +50,8 @@ write_buffer = { path = "../write_buffer" } # Crates.io dependencies, in alphabetical order ansi_term = "0.12" -arrow = { version = "8.0", features = ["prettyprint"] } -arrow-flight = "8.0" +arrow = { version = "9.0", features = ["prettyprint"] } +arrow-flight = "9.0" async-trait = "0.1" backtrace = "0.3" byteorder = "1.3.4" @@ -74,7 +74,7 @@ log = "0.4" num_cpus = "1.13.0" once_cell = { version = "1.4.0", features = ["parking_lot"] } parking_lot = "0.12" -parquet = "8.0" +parquet = "9.0" pin-project = "1.0" pprof = { version = "0.6", default-features = false, features = ["flamegraph", "protobuf"], optional = true } prost = "0.9" diff --git a/influxdb_iox_client/Cargo.toml b/influxdb_iox_client/Cargo.toml index 39e45c4f48..ecd9c4c800 100644 --- a/influxdb_iox_client/Cargo.toml +++ b/influxdb_iox_client/Cargo.toml @@ -17,8 +17,8 @@ client_util = { path = "../client_util" } generated_types = { path = "../generated_types", default-features = false } # Crates.io dependencies, in alphabetical order -arrow = { version = "8.0", optional = true } -arrow-flight = { version = "8.0", optional = true } +arrow = { version = "9.0", optional = true } +arrow-flight = { version = "9.0", optional = true } bytes = "1.0" futures-util = { version = "0.3", optional = true } dml = { path = "../dml", optional = true } diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index e6dffe41f1..dff263af19 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -5,8 +5,8 @@ authors = ["Nga Tran "] edition = "2021" [dependencies] -arrow = { version = "8.0", features = ["prettyprint"] } -arrow-flight = "8.0" +arrow = { version = "9.0", features = ["prettyprint"] } +arrow-flight = "9.0" arrow_util = { path = "../arrow_util" } async-trait = "0.1.42" base64 = "0.13" @@ -20,11 +20,12 @@ db = { path = "../db" } dml = { path = "../dml" } hyper = "0.14" iox_catalog = { path = "../iox_catalog" } +iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch"} object_store = { path = "../object_store" } parking_lot = "0.12" -parquet = "8.0" +parquet = "9.0" parquet_file = { path = "../parquet_file" } observability_deps = { path = "../observability_deps" } predicate = { path = "../predicate" } diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index aa46030d7d..bc1cd259ef 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -1,5 +1,7 @@ //! Persist compacted data to parquet files in object storage +use std::sync::Arc; + use arrow::record_batch::RecordBatch; use bytes::Bytes; use object_store::{ @@ -28,7 +30,7 @@ pub type Result = std::result::Result; pub async fn persist( metadata: &IoxMetadata, record_batches: Vec, - object_store: &ObjectStore, + object_store: &Arc, ) -> Result<()> { if record_batches.is_empty() { return Ok(()); @@ -38,7 +40,17 @@ pub async fn persist( .expect("record_batches.is_empty was just checked") .schema(); - let data = parquet_file::storage::Storage::parquet_bytes(record_batches, schema, metadata) + // Make a fake IOx object store to conform to the parquet file + // interface, but note this isn't actually used to find parquet + // paths to write to + use iox_object_store::IoxObjectStore; + let iox_object_store = Arc::new(IoxObjectStore::existing( + Arc::clone(object_store), + IoxObjectStore::root_path_for(object_store, uuid::Uuid::new_v4()), + )); + + let data = parquet_file::storage::Storage::new(iox_object_store) + .parquet_bytes(record_batches, schema, metadata) .await .context(ConvertingToBytesSnafu)?; diff --git a/mutable_batch/Cargo.toml b/mutable_batch/Cargo.toml index adea1160db..27d10ab6c8 100644 --- a/mutable_batch/Cargo.toml +++ b/mutable_batch/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" description = "A mutable arrow RecordBatch" [dependencies] -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } chrono = { version = "0.4", default-features = false } data_types = { path = "../data_types" } diff --git a/mutable_buffer/Cargo.toml b/mutable_buffer/Cargo.toml index a221ba1f7f..8d3f9ff476 100644 --- a/mutable_buffer/Cargo.toml +++ b/mutable_buffer/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Andrew Lamb "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } data_types = { path = "../data_types" } schema = { path = "../schema" } metric = { path = "../metric" } diff --git a/packers/Cargo.toml b/packers/Cargo.toml index f14121917e..4228282b74 100644 --- a/packers/Cargo.toml +++ b/packers/Cargo.toml @@ -5,12 +5,12 @@ authors = ["Andrew Lamb "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } influxdb_tsm = { path = "../influxdb_tsm" } schema = { path = "../schema" } snafu = "0.7" observability_deps = { path = "../observability_deps" } -parquet = "8.0" +parquet = "9.0" workspace-hack = { path = "../workspace-hack"} [dev-dependencies] # In alphabetical order diff --git a/parquet_catalog/Cargo.toml b/parquet_catalog/Cargo.toml index af4f6ca3cb..670c450b75 100644 --- a/parquet_catalog/Cargo.toml +++ b/parquet_catalog/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } base64 = "0.13" bytes = "1.0" data_types = { path = "../data_types" } @@ -16,7 +16,7 @@ iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } object_store = { path = "../object_store" } observability_deps = { path = "../observability_deps" } -parquet = "8.0" +parquet = "9.0" parquet_file = { path = "../parquet_file" } parquet-format = "4.0" parking_lot = "0.12" @@ -32,5 +32,5 @@ time = { path = "../time" } tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] } tokio-stream = "0.1" uuid = { version = "0.8", features = ["v4"] } -zstd = "0.9" +zstd = "0.10" workspace-hack = { path = "../workspace-hack"} diff --git a/parquet_catalog/src/dump.rs b/parquet_catalog/src/dump.rs index 11ea2c0479..eb234b927b 100644 --- a/parquet_catalog/src/dump.rs +++ b/parquet_catalog/src/dump.rs @@ -301,8 +301,8 @@ File { file_name: "00000000-0000-0000-0000-000000000001.parquet", }, ), - file_size_bytes: 3052, - metadata: b"metadata omitted (935 bytes)", + file_size_bytes: 3660, + metadata: b"metadata omitted (952 bytes)", }, ), ), @@ -417,8 +417,8 @@ File { file_name: "00000000-0000-0000-0000-000000000001.parquet", }, ), - file_size_bytes: 3052, - metadata: b"metadata omitted (935 bytes)", + file_size_bytes: 3660, + metadata: b"metadata omitted (952 bytes)", }, ), ), diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index f2420acc38..ecdf472cf5 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Nga Tran "] edition = "2021" [dependencies] # In alphabetical order -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } base64 = "0.13" bytes = "1.0" data_types = { path = "../data_types" } @@ -18,7 +18,7 @@ iox_object_store = { path = "../iox_object_store" } metric = { path = "../metric" } object_store = { path = "../object_store" } observability_deps = { path = "../observability_deps" } -parquet = {version = "8.0", features = ["experimental"]} +parquet = {version = "9.0", features = ["experimental"]} parquet-format = "4.0" parking_lot = "0.12" pbjson-types = "0.2" @@ -33,7 +33,7 @@ time = { path = "../time" } tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] } tokio-stream = "0.1" uuid = { version = "0.8", features = ["v4"] } -zstd = "0.9" +zstd = "0.10" workspace-hack = { path = "../workspace-hack"} [dev-dependencies] diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 7e65f54159..9d1f24e137 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -1114,6 +1114,9 @@ mod tests { let parquet_metadata = chunk.parquet_metadata(); let decoded = parquet_metadata.decode().unwrap(); + // Several of these tests cover merging + // metata from several different row groups, so this should + // not be changed. assert!(decoded.md.num_row_groups() > 1); assert_ne!(decoded.md.file_metadata().schema_descr().num_columns(), 0); @@ -1194,7 +1197,7 @@ mod tests { let mut generator = ChunkGenerator::new().await; let (chunk, _) = generator.generate().await.unwrap(); let parquet_metadata = chunk.parquet_metadata(); - assert_eq!(parquet_metadata.size(), 3719); + assert_eq!(parquet_metadata.size(), 4068); } #[test] diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 90600d6809..123fee1f75 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -87,11 +87,22 @@ pub type Result = std::result::Result; #[derive(Debug, Clone)] pub struct Storage { iox_object_store: Arc, + + // If `Some`, restricts the size of the row groups created in the parquet file + max_row_group_size: Option, } impl Storage { pub fn new(iox_object_store: Arc) -> Self { - Self { iox_object_store } + Self { + iox_object_store, + max_row_group_size: None, + } + } + + /// Specify the maximum sized row group to make + pub fn set_max_row_group_size(&mut self, max_row_group_size: usize) { + self.max_row_group_size = Some(max_row_group_size); } /// Write the given stream of data of a specified table of @@ -112,7 +123,9 @@ impl Storage { let path = ParquetFilePath::new(&chunk_addr); let schema = stream.schema(); - let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?; + let data = self + .parquet_stream_to_bytes(stream, schema, metadata) + .await?; // no data if data.is_empty() { return Ok(None); @@ -128,30 +141,40 @@ impl Storage { Ok(Some((path, file_size_bytes, md))) } - fn writer_props(metadata_bytes: &[u8]) -> WriterProperties { - WriterProperties::builder() + fn writer_props(&self, metadata_bytes: &[u8]) -> WriterProperties { + let builder = WriterProperties::builder() .set_key_value_metadata(Some(vec![KeyValue { key: METADATA_KEY.to_string(), value: Some(base64::encode(&metadata_bytes)), }])) - .set_compression(Compression::ZSTD) - .build() + .set_compression(Compression::ZSTD); + + let builder = if let Some(max_row_group_size) = self.max_row_group_size.as_ref() { + builder.set_max_row_group_size(*max_row_group_size) + } else { + builder + }; + + builder.build() } /// Convert the given stream of RecordBatches to bytes. This should be deleted when switching /// over to use `ingester` only. async fn parquet_stream_to_bytes( + &self, stream: SendableRecordBatchStream, schema: SchemaRef, metadata: IoxMetadataOld, ) -> Result> { let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?; - Self::record_batches_to_parquet_bytes(stream, schema, &metadata_bytes).await + self.record_batches_to_parquet_bytes(stream, schema, &metadata_bytes) + .await } /// Convert the given metadata and RecordBatches to parquet file bytes. Used by `ingester`. pub async fn parquet_bytes( + &self, record_batches: Vec, schema: SchemaRef, metadata: &IoxMetadata, @@ -160,18 +183,20 @@ impl Storage { let stream = Box::pin(stream::iter(record_batches.into_iter().map(Ok))); - Self::record_batches_to_parquet_bytes(stream, schema, &metadata_bytes).await + self.record_batches_to_parquet_bytes(stream, schema, &metadata_bytes) + .await } /// Share code between `parquet_stream_to_bytes` and `parquet_bytes`. When /// `parquet_stream_to_bytes` is deleted, this code can be moved into `parquet_bytes` and /// made simpler by using a plain `Iter` rather than a `Stream`. async fn record_batches_to_parquet_bytes( + &self, mut stream: impl Stream> + Send + Sync + Unpin, schema: SchemaRef, metadata_bytes: &[u8], ) -> Result> { - let props = Self::writer_props(metadata_bytes); + let props = self.writer_props(metadata_bytes); let mem_writer = MemWriter::default(); { @@ -403,10 +428,10 @@ mod tests { record_batches, Arc::clone(schema.inner()), )); - let bytes = - Storage::parquet_stream_to_bytes(stream, Arc::clone(schema.inner()), metadata.clone()) - .await - .unwrap(); + let bytes = Storage::new(make_iox_object_store().await) + .parquet_stream_to_bytes(stream, Arc::clone(schema.inner()), metadata.clone()) + .await + .unwrap(); // extract metadata let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap().unwrap(); @@ -501,10 +526,12 @@ mod tests { assert_batches_eq!(&expected, &read_batches); } - #[test] - fn test_props_have_compression() { + #[tokio::test] + async fn test_props_have_compression() { + let storage = Storage::new(make_iox_object_store().await); + // should be writing with compression - let props = Storage::writer_props(&[]); + let props = storage.writer_props(&[]); // arbitrary column name to get default values let col_path: ColumnPath = "default".into(); diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index 1db11b658a..4def5c2210 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -651,6 +651,8 @@ fn create_column_timestamp( /// RecordBatches, schema and IOx statistics will be generated in separate ways to emulate what the normal data /// ingestion would do. This also ensures that the Parquet data that will later be created out of the RecordBatch is /// indeed self-contained and can act as a source to recorder schema and statistics. +/// +/// Returns: `(record_batches, schema, summaries, num_rows)` pub fn make_record_batch( column_prefix: &str, test_size: TestSize, diff --git a/parquet_file/src/test_utils/generator.rs b/parquet_file/src/test_utils/generator.rs index 365ae708ff..9038b5906d 100644 --- a/parquet_file/src/test_utils/generator.rs +++ b/parquet_file/src/test_utils/generator.rs @@ -106,6 +106,12 @@ impl ChunkGenerator { GeneratorConfig::Full => make_record_batch(&self.column_prefix, TestSize::Full), }; + // ensure we make multiple row groups if we have more than one + // record batch + if let Some(batch) = record_batches.get(0) { + self.storage.set_max_row_group_size(batch.num_rows()); + } + let table_summary = TableSummary { name: self.partition.table_name.to_string(), columns: column_summaries, diff --git a/predicate/Cargo.toml b/predicate/Cargo.toml index fab1d97de8..ece32e17cd 100644 --- a/predicate/Cargo.toml +++ b/predicate/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } chrono = { version = "0.4", default-features = false } data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } diff --git a/querier/Cargo.toml b/querier/Cargo.toml index ff69c5866f..48c8612933 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] -arrow-flight = "8.0" +arrow-flight = "9.0" client_util = { path = "../client_util" } futures = "0.3" generated_types = { path = "../generated_types" } diff --git a/query/Cargo.toml b/query/Cargo.toml index db3f5d4468..e8e8e24511 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -14,7 +14,7 @@ description = "IOx Query Interface and Executor" # 2. Allow for query logic testing without bringing in all the storage systems. [dependencies] # In alphabetical order -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } async-trait = "0.1" chrono = { version = "0.4", default-features = false } diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index 45a2b64e35..e7fa224203 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -18,7 +18,7 @@ query = { path = "../query" } workspace-hack = { path = "../workspace-hack"} [dev-dependencies] -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } metric = { path = "../metric" } object_store = { path = "../object_store" } diff --git a/read_buffer/Cargo.toml b/read_buffer/Cargo.toml index d1250b3128..7fe0c50c83 100644 --- a/read_buffer/Cargo.toml +++ b/read_buffer/Cargo.toml @@ -11,7 +11,7 @@ edition = "2021" # 2. Keep change/compile/link time down during development when working on just this crate [dependencies] # In alphabetical order -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } croaring = "0.5" data_types = { path = "../data_types" } diff --git a/schema/Cargo.toml b/schema/Cargo.toml index 6c9c2ec6b0..130bad70e4 100644 --- a/schema/Cargo.toml +++ b/schema/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" description = "IOx Schema definition" [dependencies] -arrow = { version = "8.0", features = ["prettyprint"] } +arrow = { version = "9.0", features = ["prettyprint"] } hashbrown = "0.12" indexmap = { version = "1.7", features = ["std"] } itertools = "0.10.1" diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 2f80a7d47e..751a85c5cf 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -14,7 +14,7 @@ publish = false ### BEGIN HAKARI SECTION [dependencies] ahash = { version = "0.7", features = ["std"] } -arrow = { version = "8", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] } +arrow = { version = "9", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] } base64 = { version = "0.13", features = ["alloc", "std"] } bitflags = { version = "1" } byteorder = { version = "1", features = ["std"] } @@ -40,7 +40,7 @@ num-integer = { version = "0.1", default-features = false, features = ["i128", " num-traits = { version = "0.2", features = ["i128", "libm", "std"] } once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] } parking_lot = { version = "0.11" } -parquet = { version = "8", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] } +parquet = { version = "9", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-automata = { version = "0.1", features = ["regex-syntax", "std"] }