diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index c41b7e1f79..45bb6eae3a 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -4,7 +4,7 @@ use std::{io::Write, sync::Arc}; use arrow::{error::ArrowError, record_batch::RecordBatch}; use futures::{pin_mut, Stream, StreamExt}; -use observability_deps::tracing::debug; +use observability_deps::tracing::{debug, warn}; use parquet::{ arrow::ArrowWriter, basic::Compression, @@ -25,6 +25,13 @@ pub enum CodecError { #[error("no record batches to convert")] NoRecordBatches, + /// The result stream contained at least one [`RecordBatch`] and all + /// instances yielded by the stream contained 0 rows. + /// + /// This would result in an empty file being uploaded to object store. + #[error("no rows to serialise")] + NoRows, + /// The codec could not infer the schema for the stream as the first stream /// item contained an [`ArrowError`]. #[error("failed to peek record stream schema")] @@ -61,6 +68,18 @@ pub enum CodecError { /// Returns the serialized [`FileMetaData`] for the encoded parquet file, from /// which an [`IoxParquetMetaData`] can be derived. /// +/// # Errors +/// +/// If the stream never yields any [`RecordBatch`], a +/// [`CodecError::NoRecordBatches`] is returned. +/// +/// If the stream yields a [`RecordBatch`] containing no rows, a warning is +/// logged and serialisation continues. +/// +/// If [`to_parquet()`] observes at least one [`RecordBatch`], but 0 rows across +/// all [`RecordBatch`], then [`CodecError::NoRows`] is returned as no useful +/// data was serialised. +/// /// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1 /// [`FileMetaData`]: parquet_format::FileMetaData /// [`IoxParquetMetaData`]: crate::metadata::IoxParquetMetaData @@ -98,12 +117,25 @@ where let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?; while let Some(maybe_batch) = stream.next().await { - writer.write(&maybe_batch?)?; + let batch = maybe_batch?; + if batch.num_rows() == 0 { + // It is likely this is a logical error, where the execution plan is + // producing no output, and therefore we're wasting CPU time by + // running it. + // + // Unfortunately it is not always possible to identify this before + // executing the plan, so this code MUST tolerate empty RecordBatch + // and even entire files. + warn!("parquet serialisation stream yielded empty record batch"); + } else { + writer.write(&batch)?; + } } let meta = writer.close().map_err(CodecError::from)?; if meta.num_rows == 0 { - panic!("serialised empty parquet file"); + warn!("parquet serialisation encoded 0 rows"); + return Err(CodecError::NoRows); } Ok(meta) @@ -129,16 +161,9 @@ where // Serialize the record batches into the in-memory buffer let meta = to_parquet(batches, meta, &mut bytes).await?; - if meta.row_groups.is_empty() { - // panic here to avoid later consequence of reading it for statistics - panic!("partition_id={}. Created Parquet metadata has no column metadata. HINT a common reason of this is writing empty data to parquet file: {:#?}", partition_id, meta); - } - - debug!(?partition_id, ?meta, "Parquet Metadata"); - bytes.shrink_to_fit(); - debug!(?partition_id, "Done shrink to fit"); + debug!(?partition_id, ?meta, "generated parquet file metadata"); Ok((bytes, meta)) } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 57478eb037..456ad840d6 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -132,13 +132,6 @@ impl ParquetStorage { // This is not a huge concern, as the resulting parquet files are // currently smallish on average. let (data, parquet_file_meta) = serialize::to_parquet_bytes(batches, meta).await?; - // TODO: remove this if after verifying the panic is thrown - // correctly inside the serialize::to_parquet_bytes above - if parquet_file_meta.row_groups.is_empty() { - debug!( - ?meta.partition_id, ?parquet_file_meta, - "Created parquet_file_meta has no row groups which will introduce panic later when its statistics is read"); - } // Read the IOx-specific parquet metadata from the file metadata let parquet_meta = diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs index 2e95bb7b27..8175250d3b 100644 --- a/parquet_file/tests/metadata.rs +++ b/parquet_file/tests/metadata.rs @@ -10,7 +10,11 @@ use data_types::{ }; use iox_time::Time; use object_store::DynObjectStore; -use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; +use parquet_file::{ + metadata::IoxMetadata, + serialize::CodecError, + storage::{ParquetStorage, UploadError}, +}; use schema::{builder::SchemaBuilder, sort::SortKey, InfluxFieldType, TIME_COLUMN_NAME}; #[tokio::test] @@ -105,16 +109,53 @@ async fn test_decoded_iox_metadata() { ); } -// Ensure that attempting to write an empty parquet file causes a panic for a -// human to investigate why it is happening. +// Ensure that attempting to write an empty parquet file causes a error to be +// raised. The caller can then decide if this is acceptable plan output or a +// bug. // -// The idea is that currently it is a logical error to be producing empty -// parquet files at all - this might not always be the case, in which case -// removing this panic behaviour is perfectly fine too! +// It used to be considered a logical error to be producing empty parquet files +// at all - we have previously identified cases of useless work being performed +// by inducing a panic when observing a parquet file with 0 rows, however we now +// tolerate 0 row outputs as the compactor can perform multiple splits at once, +// which is problematic when a single chunk can overlap multiple split points: // -// Relates to "https://github.com/influxdata/influxdb_iox/issues/4695" +// ────────────── Time ────────────▶ +// +// │ │ +// ┌────────────────────────────────┐ +// │ │ Chunk 1 │ │ +// └────────────────────────────────┘ +// │ │ +// +// │ │ +// +// Split T1 Split T2 +// +// If this chunk has an unusual distribution of writes over the time range it +// covers, we can wind up with the split between T1 and T2 containing no data. +// For example, if all the data is either before T1, or after T2 we can wind up +// with a split plan such as this, where the middle sub-section contains no +// data: +// +// │ │ +// ┌█████──────────────────────█████┐ +// │█████ │ Chunk 1 │ █████│ +// └█████──────────────────────█████┘ +// │ │ +// +// │ │ +// +// Split T1 Split T2 +// +// It is not possible to use the chunk statistics (min/max timestamps) to +// determine this empty sub-section will result ahead of time, therefore the +// parquet encoder must tolerate it and raise a non-fatal error instead of +// panicking. +// +// Relates to: +// * https://github.com/influxdata/influxdb_iox/issues/4695 +// * https://github.com/influxdata/conductor/issues/1121 #[tokio::test] -#[should_panic = "serialised empty parquet file"] async fn test_empty_parquet_file_panic() { // A representative IOx data sample (with a time column, an invariant upheld // in the IOx write path) @@ -150,7 +191,12 @@ async fn test_empty_parquet_file_panic() { let storage = ParquetStorage::new(object_store); // Serialising empty data should cause a panic for human investigation. - let _ = storage.upload(stream, &meta).await; + let err = storage + .upload(stream, &meta) + .await + .expect_err("empty file should raise an error"); + + assert!(matches!(err, UploadError::Serialise(CodecError::NoRows))); } #[tokio::test]