chore: Update datafusion, update `arrow`/`parquet`/`arrow-flight` to 9.0 (#3733)
* chore: Update datafusion * chore: Update arrow * fix: missing updates * chore: Update cargo.lock * fix: update for smaller parquet size * fix: update test for smaller parquet files * test: ensure parquet_file tests write multiple row groups * fix: update callsite * fix: Update for tests * fix: harkari * fix: use IoxObjectStore::existing Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
89105ccfab
commit
a30803e692
|
@ -84,9 +84,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow"
|
||||
version = "8.0.0"
|
||||
version = "9.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce240772a007c63658c1d335bb424fd1019b87895dee899b7bf70e85b2d24e5f"
|
||||
checksum = "374ec8e5f39015ab754cfc63398a897423c877b66aecbfd036903b2c9c07f244"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"chrono",
|
||||
|
@ -109,9 +109,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "arrow-flight"
|
||||
version = "8.0.0"
|
||||
version = "9.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8ec7c637d2edd6969eeb63793584e0a3d17f99559b872ab73f47aea186aef50a"
|
||||
checksum = "f9234708bb25937fe23f6cd49400163502ddef84038b3fc859b4ec743840e07a"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"base64 0.13.0",
|
||||
|
@ -917,7 +917,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion"
|
||||
version = "7.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=0a50dcdfb1f2854e59b17da2d87c106c614b226d#0a50dcdfb1f2854e59b17da2d87c106c614b226d"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=81e76edc330067332ac9a3972525ecbe92953267#81e76edc330067332ac9a3972525ecbe92953267"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"arrow",
|
||||
|
@ -946,7 +946,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-common"
|
||||
version = "7.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=0a50dcdfb1f2854e59b17da2d87c106c614b226d#0a50dcdfb1f2854e59b17da2d87c106c614b226d"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=81e76edc330067332ac9a3972525ecbe92953267#81e76edc330067332ac9a3972525ecbe92953267"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"ordered-float 2.10.0",
|
||||
|
@ -957,7 +957,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "datafusion-expr"
|
||||
version = "7.0.0"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=0a50dcdfb1f2854e59b17da2d87c106c614b226d#0a50dcdfb1f2854e59b17da2d87c106c614b226d"
|
||||
source = "git+https://github.com/apache/arrow-datafusion.git?rev=81e76edc330067332ac9a3972525ecbe92953267#81e76edc330067332ac9a3972525ecbe92953267"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"arrow",
|
||||
|
@ -2037,6 +2037,7 @@ dependencies = [
|
|||
"generated_types",
|
||||
"hyper",
|
||||
"iox_catalog",
|
||||
"iox_object_store",
|
||||
"metric",
|
||||
"mutable_batch",
|
||||
"mutable_batch_lp",
|
||||
|
@ -3262,9 +3263,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "parquet"
|
||||
version = "8.0.0"
|
||||
version = "9.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2d5a6492e0b849fd458bc9364aee4c8a9882b3cc21b2576767162725f69d2ad8"
|
||||
checksum = "36531313e3e81a646672001c02d1764fabc1320055fe8b176e696b08d3cf44cd"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"base64 0.13.0",
|
||||
|
@ -6104,18 +6105,18 @@ checksum = "7c88870063c39ee00ec285a2f8d6a966e5b6fb2becc4e8dac77ed0d370ed6006"
|
|||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.9.2+zstd.1.5.1"
|
||||
version = "0.10.0+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2390ea1bf6c038c39674f22d95f0564725fc06034a47129179810b2fc58caa54"
|
||||
checksum = "3b1365becbe415f3f0fcd024e2f7b45bacfb5bdd055f0dc113571394114e7bdd"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "4.1.3+zstd.1.5.1"
|
||||
version = "4.1.4+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e99d81b99fb3c2c2c794e3fe56c305c63d5173a16a46b5850b07c935ffc7db79"
|
||||
checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
|
@ -6123,9 +6124,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "1.6.2+zstd.1.5.1"
|
||||
version = "1.6.3+zstd.1.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2daf2f248d9ea44454bfcb2516534e8b8ad2fc91bf818a1885495fc42bc8ac9f"
|
||||
checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
|
|
|
@ -7,7 +7,7 @@ description = "Apache Arrow utilities"
|
|||
|
||||
[dependencies]
|
||||
ahash = { version = "0.7.5", default-features = false }
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
# used by arrow anyway (needed for printing workaround)
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
comfy-table = { version = "5.0", default-features = false }
|
||||
|
|
|
@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version"
|
|||
|
||||
# Rename to workaround doctest bug
|
||||
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
|
||||
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="0a50dcdfb1f2854e59b17da2d87c106c614b226d", default-features = false, package = "datafusion" }
|
||||
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="81e76edc330067332ac9a3972525ecbe92953267", default-features = false, package = "datafusion" }
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -5,7 +5,7 @@ authors = ["pauldix <paul@pauldix.net>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
async-trait = "0.1"
|
||||
data_types = { path = "../data_types" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
|
|
|
@ -50,8 +50,8 @@ write_buffer = { path = "../write_buffer" }
|
|||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
ansi_term = "0.12"
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow-flight = "8.0"
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
arrow-flight = "9.0"
|
||||
async-trait = "0.1"
|
||||
backtrace = "0.3"
|
||||
byteorder = "1.3.4"
|
||||
|
@ -74,7 +74,7 @@ log = "0.4"
|
|||
num_cpus = "1.13.0"
|
||||
once_cell = { version = "1.4.0", features = ["parking_lot"] }
|
||||
parking_lot = "0.12"
|
||||
parquet = "8.0"
|
||||
parquet = "9.0"
|
||||
pin-project = "1.0"
|
||||
pprof = { version = "0.6", default-features = false, features = ["flamegraph", "protobuf"], optional = true }
|
||||
prost = "0.9"
|
||||
|
|
|
@ -17,8 +17,8 @@ client_util = { path = "../client_util" }
|
|||
generated_types = { path = "../generated_types", default-features = false }
|
||||
|
||||
# Crates.io dependencies, in alphabetical order
|
||||
arrow = { version = "8.0", optional = true }
|
||||
arrow-flight = { version = "8.0", optional = true }
|
||||
arrow = { version = "9.0", optional = true }
|
||||
arrow-flight = { version = "9.0", optional = true }
|
||||
bytes = "1.0"
|
||||
futures-util = { version = "0.3", optional = true }
|
||||
dml = { path = "../dml", optional = true }
|
||||
|
|
|
@ -5,8 +5,8 @@ authors = ["Nga Tran <nga-tran@live.com>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow-flight = "8.0"
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
arrow-flight = "9.0"
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
async-trait = "0.1.42"
|
||||
base64 = "0.13"
|
||||
|
@ -20,11 +20,12 @@ db = { path = "../db" }
|
|||
dml = { path = "../dml" }
|
||||
hyper = "0.14"
|
||||
iox_catalog = { path = "../iox_catalog" }
|
||||
iox_object_store = { path = "../iox_object_store" }
|
||||
metric = { path = "../metric" }
|
||||
mutable_batch = { path = "../mutable_batch"}
|
||||
object_store = { path = "../object_store" }
|
||||
parking_lot = "0.12"
|
||||
parquet = "8.0"
|
||||
parquet = "9.0"
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
predicate = { path = "../predicate" }
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
//! Persist compacted data to parquet files in object storage
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use bytes::Bytes;
|
||||
use object_store::{
|
||||
|
@ -28,7 +30,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
pub async fn persist(
|
||||
metadata: &IoxMetadata,
|
||||
record_batches: Vec<RecordBatch>,
|
||||
object_store: &ObjectStore,
|
||||
object_store: &Arc<ObjectStore>,
|
||||
) -> Result<()> {
|
||||
if record_batches.is_empty() {
|
||||
return Ok(());
|
||||
|
@ -38,7 +40,17 @@ pub async fn persist(
|
|||
.expect("record_batches.is_empty was just checked")
|
||||
.schema();
|
||||
|
||||
let data = parquet_file::storage::Storage::parquet_bytes(record_batches, schema, metadata)
|
||||
// Make a fake IOx object store to conform to the parquet file
|
||||
// interface, but note this isn't actually used to find parquet
|
||||
// paths to write to
|
||||
use iox_object_store::IoxObjectStore;
|
||||
let iox_object_store = Arc::new(IoxObjectStore::existing(
|
||||
Arc::clone(object_store),
|
||||
IoxObjectStore::root_path_for(object_store, uuid::Uuid::new_v4()),
|
||||
));
|
||||
|
||||
let data = parquet_file::storage::Storage::new(iox_object_store)
|
||||
.parquet_bytes(record_batches, schema, metadata)
|
||||
.await
|
||||
.context(ConvertingToBytesSnafu)?;
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ edition = "2021"
|
|||
description = "A mutable arrow RecordBatch"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
data_types = { path = "../data_types" }
|
||||
|
|
|
@ -5,7 +5,7 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
data_types = { path = "../data_types" }
|
||||
schema = { path = "../schema" }
|
||||
metric = { path = "../metric" }
|
||||
|
|
|
@ -5,12 +5,12 @@ authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
influxdb_tsm = { path = "../influxdb_tsm" }
|
||||
schema = { path = "../schema" }
|
||||
snafu = "0.7"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parquet = "8.0"
|
||||
parquet = "9.0"
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies] # In alphabetical order
|
||||
|
|
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
base64 = "0.13"
|
||||
bytes = "1.0"
|
||||
data_types = { path = "../data_types" }
|
||||
|
@ -16,7 +16,7 @@ iox_object_store = { path = "../iox_object_store" }
|
|||
metric = { path = "../metric" }
|
||||
object_store = { path = "../object_store" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parquet = "8.0"
|
||||
parquet = "9.0"
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
parquet-format = "4.0"
|
||||
parking_lot = "0.12"
|
||||
|
@ -32,5 +32,5 @@ time = { path = "../time" }
|
|||
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] }
|
||||
tokio-stream = "0.1"
|
||||
uuid = { version = "0.8", features = ["v4"] }
|
||||
zstd = "0.9"
|
||||
zstd = "0.10"
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -301,8 +301,8 @@ File {
|
|||
file_name: "00000000-0000-0000-0000-000000000001.parquet",
|
||||
},
|
||||
),
|
||||
file_size_bytes: 3052,
|
||||
metadata: b"metadata omitted (935 bytes)",
|
||||
file_size_bytes: 3660,
|
||||
metadata: b"metadata omitted (952 bytes)",
|
||||
},
|
||||
),
|
||||
),
|
||||
|
@ -417,8 +417,8 @@ File {
|
|||
file_name: "00000000-0000-0000-0000-000000000001.parquet",
|
||||
},
|
||||
),
|
||||
file_size_bytes: 3052,
|
||||
metadata: b"metadata omitted (935 bytes)",
|
||||
file_size_bytes: 3660,
|
||||
metadata: b"metadata omitted (952 bytes)",
|
||||
},
|
||||
),
|
||||
),
|
||||
|
|
|
@ -5,7 +5,7 @@ authors = ["Nga Tran <nga-tran@live.com>"]
|
|||
edition = "2021"
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
base64 = "0.13"
|
||||
bytes = "1.0"
|
||||
data_types = { path = "../data_types" }
|
||||
|
@ -18,7 +18,7 @@ iox_object_store = { path = "../iox_object_store" }
|
|||
metric = { path = "../metric" }
|
||||
object_store = { path = "../object_store" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parquet = {version = "8.0", features = ["experimental"]}
|
||||
parquet = {version = "9.0", features = ["experimental"]}
|
||||
parquet-format = "4.0"
|
||||
parking_lot = "0.12"
|
||||
pbjson-types = "0.2"
|
||||
|
@ -33,7 +33,7 @@ time = { path = "../time" }
|
|||
tokio = { version = "1.13", features = ["macros", "parking_lot", "rt", "rt-multi-thread", "sync"] }
|
||||
tokio-stream = "0.1"
|
||||
uuid = { version = "0.8", features = ["v4"] }
|
||||
zstd = "0.9"
|
||||
zstd = "0.10"
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
@ -1114,6 +1114,9 @@ mod tests {
|
|||
let parquet_metadata = chunk.parquet_metadata();
|
||||
let decoded = parquet_metadata.decode().unwrap();
|
||||
|
||||
// Several of these tests cover merging
|
||||
// metata from several different row groups, so this should
|
||||
// not be changed.
|
||||
assert!(decoded.md.num_row_groups() > 1);
|
||||
assert_ne!(decoded.md.file_metadata().schema_descr().num_columns(), 0);
|
||||
|
||||
|
@ -1194,7 +1197,7 @@ mod tests {
|
|||
let mut generator = ChunkGenerator::new().await;
|
||||
let (chunk, _) = generator.generate().await.unwrap();
|
||||
let parquet_metadata = chunk.parquet_metadata();
|
||||
assert_eq!(parquet_metadata.size(), 3719);
|
||||
assert_eq!(parquet_metadata.size(), 4068);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -87,11 +87,22 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct Storage {
|
||||
iox_object_store: Arc<IoxObjectStore>,
|
||||
|
||||
// If `Some`, restricts the size of the row groups created in the parquet file
|
||||
max_row_group_size: Option<usize>,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn new(iox_object_store: Arc<IoxObjectStore>) -> Self {
|
||||
Self { iox_object_store }
|
||||
Self {
|
||||
iox_object_store,
|
||||
max_row_group_size: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Specify the maximum sized row group to make
|
||||
pub fn set_max_row_group_size(&mut self, max_row_group_size: usize) {
|
||||
self.max_row_group_size = Some(max_row_group_size);
|
||||
}
|
||||
|
||||
/// Write the given stream of data of a specified table of
|
||||
|
@ -112,7 +123,9 @@ impl Storage {
|
|||
let path = ParquetFilePath::new(&chunk_addr);
|
||||
|
||||
let schema = stream.schema();
|
||||
let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?;
|
||||
let data = self
|
||||
.parquet_stream_to_bytes(stream, schema, metadata)
|
||||
.await?;
|
||||
// no data
|
||||
if data.is_empty() {
|
||||
return Ok(None);
|
||||
|
@ -128,30 +141,40 @@ impl Storage {
|
|||
Ok(Some((path, file_size_bytes, md)))
|
||||
}
|
||||
|
||||
fn writer_props(metadata_bytes: &[u8]) -> WriterProperties {
|
||||
WriterProperties::builder()
|
||||
fn writer_props(&self, metadata_bytes: &[u8]) -> WriterProperties {
|
||||
let builder = WriterProperties::builder()
|
||||
.set_key_value_metadata(Some(vec![KeyValue {
|
||||
key: METADATA_KEY.to_string(),
|
||||
value: Some(base64::encode(&metadata_bytes)),
|
||||
}]))
|
||||
.set_compression(Compression::ZSTD)
|
||||
.build()
|
||||
.set_compression(Compression::ZSTD);
|
||||
|
||||
let builder = if let Some(max_row_group_size) = self.max_row_group_size.as_ref() {
|
||||
builder.set_max_row_group_size(*max_row_group_size)
|
||||
} else {
|
||||
builder
|
||||
};
|
||||
|
||||
builder.build()
|
||||
}
|
||||
|
||||
/// Convert the given stream of RecordBatches to bytes. This should be deleted when switching
|
||||
/// over to use `ingester` only.
|
||||
async fn parquet_stream_to_bytes(
|
||||
&self,
|
||||
stream: SendableRecordBatchStream,
|
||||
schema: SchemaRef,
|
||||
metadata: IoxMetadataOld,
|
||||
) -> Result<Vec<u8>> {
|
||||
let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?;
|
||||
|
||||
Self::record_batches_to_parquet_bytes(stream, schema, &metadata_bytes).await
|
||||
self.record_batches_to_parquet_bytes(stream, schema, &metadata_bytes)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Convert the given metadata and RecordBatches to parquet file bytes. Used by `ingester`.
|
||||
pub async fn parquet_bytes(
|
||||
&self,
|
||||
record_batches: Vec<RecordBatch>,
|
||||
schema: SchemaRef,
|
||||
metadata: &IoxMetadata,
|
||||
|
@ -160,18 +183,20 @@ impl Storage {
|
|||
|
||||
let stream = Box::pin(stream::iter(record_batches.into_iter().map(Ok)));
|
||||
|
||||
Self::record_batches_to_parquet_bytes(stream, schema, &metadata_bytes).await
|
||||
self.record_batches_to_parquet_bytes(stream, schema, &metadata_bytes)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Share code between `parquet_stream_to_bytes` and `parquet_bytes`. When
|
||||
/// `parquet_stream_to_bytes` is deleted, this code can be moved into `parquet_bytes` and
|
||||
/// made simpler by using a plain `Iter` rather than a `Stream`.
|
||||
async fn record_batches_to_parquet_bytes(
|
||||
&self,
|
||||
mut stream: impl Stream<Item = ArrowResult<RecordBatch>> + Send + Sync + Unpin,
|
||||
schema: SchemaRef,
|
||||
metadata_bytes: &[u8],
|
||||
) -> Result<Vec<u8>> {
|
||||
let props = Self::writer_props(metadata_bytes);
|
||||
let props = self.writer_props(metadata_bytes);
|
||||
|
||||
let mem_writer = MemWriter::default();
|
||||
{
|
||||
|
@ -403,10 +428,10 @@ mod tests {
|
|||
record_batches,
|
||||
Arc::clone(schema.inner()),
|
||||
));
|
||||
let bytes =
|
||||
Storage::parquet_stream_to_bytes(stream, Arc::clone(schema.inner()), metadata.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let bytes = Storage::new(make_iox_object_store().await)
|
||||
.parquet_stream_to_bytes(stream, Arc::clone(schema.inner()), metadata.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// extract metadata
|
||||
let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap().unwrap();
|
||||
|
@ -501,10 +526,12 @@ mod tests {
|
|||
assert_batches_eq!(&expected, &read_batches);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_props_have_compression() {
|
||||
#[tokio::test]
|
||||
async fn test_props_have_compression() {
|
||||
let storage = Storage::new(make_iox_object_store().await);
|
||||
|
||||
// should be writing with compression
|
||||
let props = Storage::writer_props(&[]);
|
||||
let props = storage.writer_props(&[]);
|
||||
|
||||
// arbitrary column name to get default values
|
||||
let col_path: ColumnPath = "default".into();
|
||||
|
|
|
@ -651,6 +651,8 @@ fn create_column_timestamp(
|
|||
/// RecordBatches, schema and IOx statistics will be generated in separate ways to emulate what the normal data
|
||||
/// ingestion would do. This also ensures that the Parquet data that will later be created out of the RecordBatch is
|
||||
/// indeed self-contained and can act as a source to recorder schema and statistics.
|
||||
///
|
||||
/// Returns: `(record_batches, schema, summaries, num_rows)`
|
||||
pub fn make_record_batch(
|
||||
column_prefix: &str,
|
||||
test_size: TestSize,
|
||||
|
|
|
@ -106,6 +106,12 @@ impl ChunkGenerator {
|
|||
GeneratorConfig::Full => make_record_batch(&self.column_prefix, TestSize::Full),
|
||||
};
|
||||
|
||||
// ensure we make multiple row groups if we have more than one
|
||||
// record batch
|
||||
if let Some(batch) = record_batches.get(0) {
|
||||
self.storage.set_max_row_group_size(batch.num_rows());
|
||||
}
|
||||
|
||||
let table_summary = TableSummary {
|
||||
name: self.partition.table_name.to_string(),
|
||||
columns: column_summaries,
|
||||
|
|
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
data_types = { path = "../data_types" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
|
|
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
arrow-flight = "8.0"
|
||||
arrow-flight = "9.0"
|
||||
client_util = { path = "../client_util" }
|
||||
futures = "0.3"
|
||||
generated_types = { path = "../generated_types" }
|
||||
|
|
|
@ -14,7 +14,7 @@ description = "IOx Query Interface and Executor"
|
|||
# 2. Allow for query logic testing without bringing in all the storage systems.
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
async-trait = "0.1"
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
|
|
|
@ -18,7 +18,7 @@ query = { path = "../query" }
|
|||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
||||
[dev-dependencies]
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
metric = { path = "../metric" }
|
||||
object_store = { path = "../object_store" }
|
||||
|
|
|
@ -11,7 +11,7 @@ edition = "2021"
|
|||
# 2. Keep change/compile/link time down during development when working on just this crate
|
||||
|
||||
[dependencies] # In alphabetical order
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
croaring = "0.5"
|
||||
data_types = { path = "../data_types" }
|
||||
|
|
|
@ -6,7 +6,7 @@ edition = "2021"
|
|||
description = "IOx Schema definition"
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "8.0", features = ["prettyprint"] }
|
||||
arrow = { version = "9.0", features = ["prettyprint"] }
|
||||
hashbrown = "0.12"
|
||||
indexmap = { version = "1.7", features = ["std"] }
|
||||
itertools = "0.10.1"
|
||||
|
|
|
@ -14,7 +14,7 @@ publish = false
|
|||
### BEGIN HAKARI SECTION
|
||||
[dependencies]
|
||||
ahash = { version = "0.7", features = ["std"] }
|
||||
arrow = { version = "8", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] }
|
||||
arrow = { version = "9", features = ["comfy-table", "csv", "csv_crate", "flatbuffers", "ipc", "prettyprint", "rand", "test_utils"] }
|
||||
base64 = { version = "0.13", features = ["alloc", "std"] }
|
||||
bitflags = { version = "1" }
|
||||
byteorder = { version = "1", features = ["std"] }
|
||||
|
@ -40,7 +40,7 @@ num-integer = { version = "0.1", default-features = false, features = ["i128", "
|
|||
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
|
||||
once_cell = { version = "1", features = ["alloc", "parking_lot", "race", "std"] }
|
||||
parking_lot = { version = "0.11" }
|
||||
parquet = { version = "8", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
|
||||
parquet = { version = "9", features = ["arrow", "base64", "brotli", "experimental", "flate2", "lz4", "snap", "zstd"] }
|
||||
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
|
||||
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
regex-automata = { version = "0.1", features = ["regex-syntax", "std"] }
|
||||
|
|
Loading…
Reference in New Issue