feat: use zstd compression when writing parquet files (#2218)

* feat: use ZSTD when writing parquet files

* fix: test
pull/24376/head
Andrew Lamb 2021-08-06 14:45:55 -04:00 committed by GitHub
parent 5d525cdc70
commit d41b44d312
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 9 deletions

View File

@ -18,8 +18,6 @@ internal_types = {path = "../internal_types"}
metrics = { path = "../metrics" }
object_store = {path = "../object_store"}
observability_deps = { path = "../observability_deps" }
# Turn off the "arrow" feature; it currently has a bug that causes the crate to rebuild every time
# and we're not currently using it anyway
parquet = "5.0"
parquet-format = "2.6"
parking_lot = "0.11.1"

View File

@ -18,6 +18,7 @@ use observability_deps::tracing::debug;
use parquet::{
self,
arrow::ArrowWriter,
basic::Compression,
file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone},
};
use query::{exec::stream::AdapterStream, predicate::Predicate};
@ -191,6 +192,16 @@ impl Storage {
Ok((path, file_size_bytes, md))
}
fn writer_props(metadata_bytes: &[u8]) -> WriterProperties {
WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue {
key: METADATA_KEY.to_string(),
value: Some(base64::encode(&metadata_bytes)),
}]))
.set_compression(Compression::ZSTD)
.build()
}
/// Convert the given stream of RecordBatches to bytes
async fn parquet_stream_to_bytes(
mut stream: SendableRecordBatchStream,
@ -199,12 +210,7 @@ impl Storage {
) -> Result<Vec<u8>> {
let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailure)?;
let props = WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue {
key: METADATA_KEY.to_string(),
value: Some(base64::encode(&metadata_bytes)),
}]))
.build();
let props = Self::writer_props(&metadata_bytes);
let mem_writer = MemWriter::default();
{
@ -465,6 +471,7 @@ mod tests {
use chrono::Utc;
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion_util::MemoryStream;
use parquet::schema::types::ColumnPath;
#[tokio::test]
async fn test_parquet_contains_key_value_metadata() {
@ -605,4 +612,14 @@ mod tests {
let l2 = storage.location(&chunk_addr);
assert_ne!(l1, l2);
}
#[test]
fn test_props_have_compression() {
// should be writing with compression
let props = Storage::writer_props(&[]);
// arbitrary column name to get default values
let col_path: ColumnPath = "default".into();
assert_eq!(props.compression(&col_path), Compression::ZSTD);
}
}

View File

@ -3401,7 +3401,7 @@ mod tests {
storage: ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action,
memory_bytes: 3284, // size of RB and OS chunks
object_store_bytes: 1523, // size of parquet file
object_store_bytes: 1577, // size of parquet file
row_count: 2,
time_of_last_access: None,
time_of_first_write: Utc.timestamp_nanos(1),