fix: make batch size back to 1024 to see if the OOM in the compactor go away (#5289)

* fix: make batch size back to 1024 to see if the OOM in the compactor go away

* fix: address review comments

* chore: Apply suggestions from code review

Co-authored-by: Marco Neumann <marco@crepererum.net>

* fix: import needed constant

Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-08-03 11:13:04 -04:00 committed by GitHub
parent 663a20d743
commit ee151c8b41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 5 deletions

View File

@ -15,8 +15,8 @@ use thiserror::Error;
use crate::metadata::{IoxMetadata, METADATA_KEY};
/// Parquet row group size
pub const ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Parquet row group write size
pub const ROW_GROUP_WRITE_SIZE: usize = 1024 * 1024;
/// [`RecordBatch`] to Parquet serialisation errors.
#[derive(Debug, Error)]
@ -155,7 +155,7 @@ fn writer_props(meta: &IoxMetadata) -> Result<WriterProperties, prost::EncodeErr
value: Some(base64::encode(&bytes)),
}]))
.set_compression(Compression::ZSTD)
.set_max_row_group_size(ROW_GROUP_SIZE);
.set_max_row_group_size(ROW_GROUP_WRITE_SIZE);
Ok(builder.build())
}

View File

@ -3,7 +3,7 @@
use crate::{
metadata::{IoxMetadata, IoxParquetMetaData},
serialize::{self, CodecError, ROW_GROUP_SIZE},
serialize::{self, CodecError, ROW_GROUP_WRITE_SIZE},
ParquetFilePath,
};
use arrow::{
@ -27,6 +27,13 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::io::AsyncReadExt;
/// Parquet row group read size
pub const ROW_GROUP_READ_SIZE: usize = 1024;
// ensure read and write work well together
// Skip clippy due to <https://github.com/rust-lang/rust-clippy/issues/8159>.
#[allow(clippy::assertions_on_constants)]
const _: () = assert!(ROW_GROUP_WRITE_SIZE % ROW_GROUP_READ_SIZE == 0);
/// Errors returned during a Parquet "put" operation, covering [`RecordBatch`]
/// pull from the provided stream, encoding, and finally uploading the bytes to
/// the object store.
@ -275,7 +282,8 @@ async fn download_and_scan_parquet(
};
let mask = ProjectionMask::roots(arrow_reader.parquet_schema(), mask);
let record_batch_reader = arrow_reader.get_record_reader_by_columns(mask, ROW_GROUP_SIZE)?;
let record_batch_reader =
arrow_reader.get_record_reader_by_columns(mask, ROW_GROUP_READ_SIZE)?;
for batch in record_batch_reader {
let batch = batch.map(|batch| {