diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index 0147b7e04d..2a452cacb7 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -4,7 +4,7 @@ use std::{io::Write, sync::Arc}; use arrow::{error::ArrowError, record_batch::RecordBatch}; use futures::{pin_mut, Stream, StreamExt}; -use observability_deps::tracing::{debug, warn}; +use observability_deps::tracing::{debug, trace, warn}; use parquet::{ arrow::ArrowWriter, basic::Compression, @@ -111,24 +111,36 @@ where // Serialize the IoxMetadata to the protobuf bytes. let props = writer_props(meta)?; + let write_batch_size = props.write_batch_size(); + let max_row_group_size = props.max_row_group_size(); // Construct the arrow serializer with the metadata as part of the parquet // file properties. let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?; + let mut num_batches = 0; while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; writer.write(&batch)?; + num_batches += 1; } - let meta = writer.close().map_err(CodecError::from)?; - if meta.num_rows == 0 { + let writer_meta = writer.close().map_err(CodecError::from)?; + if writer_meta.num_rows == 0 { // throw warning if all input batches are empty warn!("parquet serialisation encoded 0 rows"); return Err(CodecError::NoRows); } - Ok(meta) + debug!(num_batches, + num_rows=writer_meta.num_rows, + object_store_id=?meta.object_store_id, + partition_id=?meta.partition_id, + write_batch_size, + max_row_group_size, + "Created parquet file"); + + Ok(writer_meta) } /// A helper function that calls [`to_parquet()`], serialising the parquet file @@ -153,7 +165,7 @@ where let meta = to_parquet(batches, meta, &mut bytes).await?; bytes.shrink_to_fit(); - debug!(?partition_id, ?meta, "generated parquet file metadata"); + trace!(?partition_id, ?meta, "generated parquet file metadata"); Ok((bytes, meta)) } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index c9123e1d40..a1606b28ca 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -28,7 +28,11 @@ use object_store::{DynObjectStore, ObjectMeta}; use observability_deps::tracing::*; use predicate::Predicate; use schema::selection::{select_schema, Selection}; -use std::{num::TryFromIntError, sync::Arc, time::Duration}; +use std::{ + num::TryFromIntError, + sync::Arc, + time::{Duration, Instant}, +}; use thiserror::Error; /// Parquet row group read size @@ -127,6 +131,8 @@ impl ParquetStorage { where S: Stream> + Send, { + let start = Instant::now(); + // Stream the record batches into a parquet file. // // It would be nice to stream the encoded parquet to disk for this and @@ -140,7 +146,7 @@ impl ParquetStorage { // Read the IOx-specific parquet metadata from the file metadata let parquet_meta = IoxParquetMetaData::try_from(parquet_file_meta).map_err(UploadError::Metadata)?; - debug!( + trace!( ?meta.partition_id, ?parquet_meta, "IoxParquetMetaData coverted from Row Group Metadata (aka FileMetaData)" @@ -152,6 +158,15 @@ impl ParquetStorage { let file_size = data.len(); let data = Bytes::from(data); + debug!( + file_size, + object_store_id=?meta.object_store_id, + partition_id=?meta.partition_id, + // includes the time to run the datafusion plan (that is the batches) + total_time_to_create_parquet_bytes=?(Instant::now() - start), + "Uploading parquet to object store" + ); + // Retry uploading the file endlessly. // // This is abort-able by the user by dropping the upload() future.