diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index dfd6f858c9..103906a1cc 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -18,8 +18,6 @@ internal_types = {path = "../internal_types"} metrics = { path = "../metrics" } object_store = {path = "../object_store"} observability_deps = { path = "../observability_deps" } -# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time -# and we're not currently using it anyway parquet = "5.0" parquet-format = "2.6" parking_lot = "0.11.1" diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 664dfd3190..a72292ebf2 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -18,6 +18,7 @@ use observability_deps::tracing::debug; use parquet::{ self, arrow::ArrowWriter, + basic::Compression, file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone}, }; use query::{exec::stream::AdapterStream, predicate::Predicate}; @@ -191,6 +192,16 @@ impl Storage { Ok((path, file_size_bytes, md)) } + fn writer_props(metadata_bytes: &[u8]) -> WriterProperties { + WriterProperties::builder() + .set_key_value_metadata(Some(vec![KeyValue { + key: METADATA_KEY.to_string(), + value: Some(base64::encode(&metadata_bytes)), + }])) + .set_compression(Compression::ZSTD) + .build() + } + /// Convert the given stream of RecordBatches to bytes async fn parquet_stream_to_bytes( mut stream: SendableRecordBatchStream, @@ -199,12 +210,7 @@ impl Storage { ) -> Result> { let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailure)?; - let props = WriterProperties::builder() - .set_key_value_metadata(Some(vec![KeyValue { - key: METADATA_KEY.to_string(), - value: Some(base64::encode(&metadata_bytes)), - }])) - .build(); + let props = Self::writer_props(&metadata_bytes); let mem_writer = MemWriter::default(); { @@ -465,6 +471,7 @@ mod tests { use chrono::Utc; use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion_util::MemoryStream; + use parquet::schema::types::ColumnPath; #[tokio::test] async fn test_parquet_contains_key_value_metadata() { @@ -605,4 +612,14 @@ mod tests { let l2 = storage.location(&chunk_addr); assert_ne!(l1, l2); } + + #[test] + fn test_props_have_compression() { + // should be writing with compression + let props = Storage::writer_props(&[]); + + // arbitrary column name to get default values + let col_path: ColumnPath = "default".into(); + assert_eq!(props.compression(&col_path), Compression::ZSTD); + } } diff --git a/server/src/db.rs b/server/src/db.rs index 4691f6c1ce..1317bb861c 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -3401,7 +3401,7 @@ mod tests { storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, memory_bytes: 3284, // size of RB and OS chunks - object_store_bytes: 1523, // size of parquet file + object_store_bytes: 1577, // size of parquet file row_count: 2, time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1),