From ee151c8b41981490bab2d4f4670e031ab7d0ec9a Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 3 Aug 2022 11:13:04 -0400 Subject: [PATCH] fix: make batch size back to 1024 to see if the OOM in the compactor go away (#5289) * fix: make batch size back to 1024 to see if the OOM in the compactor go away * fix: address review comments * chore: Apply suggestions from code review Co-authored-by: Marco Neumann * fix: import needed constant Co-authored-by: Marco Neumann Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- parquet_file/src/serialize.rs | 6 +++--- parquet_file/src/storage.rs | 12 ++++++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index 052cde23b3..685d389074 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -15,8 +15,8 @@ use thiserror::Error; use crate::metadata::{IoxMetadata, METADATA_KEY}; -/// Parquet row group size -pub const ROW_GROUP_SIZE: usize = 1024 * 1024; +/// Parquet row group write size +pub const ROW_GROUP_WRITE_SIZE: usize = 1024 * 1024; /// [`RecordBatch`] to Parquet serialisation errors. #[derive(Debug, Error)] @@ -155,7 +155,7 @@ fn writer_props(meta: &IoxMetadata) -> Result. +#[allow(clippy::assertions_on_constants)] +const _: () = assert!(ROW_GROUP_WRITE_SIZE % ROW_GROUP_READ_SIZE == 0); /// Errors returned during a Parquet "put" operation, covering [`RecordBatch`] /// pull from the provided stream, encoding, and finally uploading the bytes to /// the object store. @@ -275,7 +282,8 @@ async fn download_and_scan_parquet( }; let mask = ProjectionMask::roots(arrow_reader.parquet_schema(), mask); - let record_batch_reader = arrow_reader.get_record_reader_by_columns(mask, ROW_GROUP_SIZE)?; + let record_batch_reader = + arrow_reader.get_record_reader_by_columns(mask, ROW_GROUP_READ_SIZE)?; for batch in record_batch_reader { let batch = batch.map(|batch| {