chore: Improve debug logging when parquet files are created (#5699)

* chore: Improve debug logging when parquet files are created

* fix: add duration for encoding parqut

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-09-20 16:43:34 -04:00 committed by GitHub
parent 1d306061b9
commit ea51feadf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 7 deletions

View File

@ -4,7 +4,7 @@ use std::{io::Write, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use futures::{pin_mut, Stream, StreamExt};
use observability_deps::tracing::{debug, warn};
use observability_deps::tracing::{debug, trace, warn};
use parquet::{
arrow::ArrowWriter,
basic::Compression,
@ -111,24 +111,36 @@ where
// Serialize the IoxMetadata to the protobuf bytes.
let props = writer_props(meta)?;
let write_batch_size = props.write_batch_size();
let max_row_group_size = props.max_row_group_size();
// Construct the arrow serializer with the metadata as part of the parquet
// file properties.
let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?;
let mut num_batches = 0;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
writer.write(&batch)?;
num_batches += 1;
}
let meta = writer.close().map_err(CodecError::from)?;
if meta.num_rows == 0 {
let writer_meta = writer.close().map_err(CodecError::from)?;
if writer_meta.num_rows == 0 {
// throw warning if all input batches are empty
warn!("parquet serialisation encoded 0 rows");
return Err(CodecError::NoRows);
}
Ok(meta)
debug!(num_batches,
num_rows=writer_meta.num_rows,
object_store_id=?meta.object_store_id,
partition_id=?meta.partition_id,
write_batch_size,
max_row_group_size,
"Created parquet file");
Ok(writer_meta)
}
/// A helper function that calls [`to_parquet()`], serialising the parquet file
@ -153,7 +165,7 @@ where
let meta = to_parquet(batches, meta, &mut bytes).await?;
bytes.shrink_to_fit();
debug!(?partition_id, ?meta, "generated parquet file metadata");
trace!(?partition_id, ?meta, "generated parquet file metadata");
Ok((bytes, meta))
}

View File

@ -28,7 +28,11 @@ use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::*;
use predicate::Predicate;
use schema::selection::{select_schema, Selection};
use std::{num::TryFromIntError, sync::Arc, time::Duration};
use std::{
num::TryFromIntError,
sync::Arc,
time::{Duration, Instant},
};
use thiserror::Error;
/// Parquet row group read size
@ -127,6 +131,8 @@ impl ParquetStorage {
where
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
{
let start = Instant::now();
// Stream the record batches into a parquet file.
//
// It would be nice to stream the encoded parquet to disk for this and
@ -140,7 +146,7 @@ impl ParquetStorage {
// Read the IOx-specific parquet metadata from the file metadata
let parquet_meta =
IoxParquetMetaData::try_from(parquet_file_meta).map_err(UploadError::Metadata)?;
debug!(
trace!(
?meta.partition_id,
?parquet_meta,
"IoxParquetMetaData coverted from Row Group Metadata (aka FileMetaData)"
@ -152,6 +158,15 @@ impl ParquetStorage {
let file_size = data.len();
let data = Bytes::from(data);
debug!(
file_size,
object_store_id=?meta.object_store_id,
partition_id=?meta.partition_id,
// includes the time to run the datafusion plan (that is the batches)
total_time_to_create_parquet_bytes=?(Instant::now() - start),
"Uploading parquet to object store"
);
// Retry uploading the file endlessly.
//
// This is abort-able by the user by dropping the upload() future.