refactor: raise error for no rows in parquet file
Previously when attempting to serialise a stream of one or more
RecordBatch containing no rows (resulting in an empty file), the parquet
serialisation code would panic.
This changes the code path to raise an error instead, to support the
compactor making multiple splits at once, which may overlap a single
chunk:
────────────── Time ────────────▶
│ │
┌█████──────────────────────█████┐
│█████ │ Chunk 1 │ █████│
└█████──────────────────────█████┘
│ │
│ │
Split T1 Split T2
In the example above, the chunk has an unusual distribution of write
timestamps over the time range it covers, with all data having a
timestamp before T1, or after T2. When a running a SplitExec to slice
this chunk at T1 and T2, the middle of the resulting 3 subsets will
contain no rows. Because we store only the min/max timestamps in the
chunk statistics, it is unfortunately impossible to prune one of these
split points from the plan ahead of time.
pull/24376/head
parent
711ba77341
commit
7698264768
|
|
@ -4,7 +4,7 @@ use std::{io::Write, sync::Arc};
|
|||
|
||||
use arrow::{error::ArrowError, record_batch::RecordBatch};
|
||||
use futures::{pin_mut, Stream, StreamExt};
|
||||
use observability_deps::tracing::debug;
|
||||
use observability_deps::tracing::{debug, warn};
|
||||
use parquet::{
|
||||
arrow::ArrowWriter,
|
||||
basic::Compression,
|
||||
|
|
@ -25,6 +25,13 @@ pub enum CodecError {
|
|||
#[error("no record batches to convert")]
|
||||
NoRecordBatches,
|
||||
|
||||
/// The result stream contained at least one [`RecordBatch`] and all
|
||||
/// instances yielded by the stream contained 0 rows.
|
||||
///
|
||||
/// This would result in an empty file being uploaded to object store.
|
||||
#[error("no rows to serialise")]
|
||||
NoRows,
|
||||
|
||||
/// The codec could not infer the schema for the stream as the first stream
|
||||
/// item contained an [`ArrowError`].
|
||||
#[error("failed to peek record stream schema")]
|
||||
|
|
@ -61,6 +68,18 @@ pub enum CodecError {
|
|||
/// Returns the serialized [`FileMetaData`] for the encoded parquet file, from
|
||||
/// which an [`IoxParquetMetaData`] can be derived.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// If the stream never yields any [`RecordBatch`], a
|
||||
/// [`CodecError::NoRecordBatches`] is returned.
|
||||
///
|
||||
/// If the stream yields a [`RecordBatch`] containing no rows, a warning is
|
||||
/// logged and serialisation continues.
|
||||
///
|
||||
/// If [`to_parquet()`] observes at least one [`RecordBatch`], but 0 rows across
|
||||
/// all [`RecordBatch`], then [`CodecError::NoRows`] is returned as no useful
|
||||
/// data was serialised.
|
||||
///
|
||||
/// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1
|
||||
/// [`FileMetaData`]: parquet_format::FileMetaData
|
||||
/// [`IoxParquetMetaData`]: crate::metadata::IoxParquetMetaData
|
||||
|
|
@ -98,12 +117,25 @@ where
|
|||
let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?;
|
||||
|
||||
while let Some(maybe_batch) = stream.next().await {
|
||||
writer.write(&maybe_batch?)?;
|
||||
let batch = maybe_batch?;
|
||||
if batch.num_rows() == 0 {
|
||||
// It is likely this is a logical error, where the execution plan is
|
||||
// producing no output, and therefore we're wasting CPU time by
|
||||
// running it.
|
||||
//
|
||||
// Unfortunately it is not always possible to identify this before
|
||||
// executing the plan, so this code MUST tolerate empty RecordBatch
|
||||
// and even entire files.
|
||||
warn!("parquet serialisation stream yielded empty record batch");
|
||||
} else {
|
||||
writer.write(&batch)?;
|
||||
}
|
||||
}
|
||||
|
||||
let meta = writer.close().map_err(CodecError::from)?;
|
||||
if meta.num_rows == 0 {
|
||||
panic!("serialised empty parquet file");
|
||||
warn!("parquet serialisation encoded 0 rows");
|
||||
return Err(CodecError::NoRows);
|
||||
}
|
||||
|
||||
Ok(meta)
|
||||
|
|
@ -129,16 +161,9 @@ where
|
|||
|
||||
// Serialize the record batches into the in-memory buffer
|
||||
let meta = to_parquet(batches, meta, &mut bytes).await?;
|
||||
if meta.row_groups.is_empty() {
|
||||
// panic here to avoid later consequence of reading it for statistics
|
||||
panic!("partition_id={}. Created Parquet metadata has no column metadata. HINT a common reason of this is writing empty data to parquet file: {:#?}", partition_id, meta);
|
||||
}
|
||||
|
||||
debug!(?partition_id, ?meta, "Parquet Metadata");
|
||||
|
||||
bytes.shrink_to_fit();
|
||||
|
||||
debug!(?partition_id, "Done shrink to fit");
|
||||
debug!(?partition_id, ?meta, "generated parquet file metadata");
|
||||
|
||||
Ok((bytes, meta))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -132,13 +132,6 @@ impl ParquetStorage {
|
|||
// This is not a huge concern, as the resulting parquet files are
|
||||
// currently smallish on average.
|
||||
let (data, parquet_file_meta) = serialize::to_parquet_bytes(batches, meta).await?;
|
||||
// TODO: remove this if after verifying the panic is thrown
|
||||
// correctly inside the serialize::to_parquet_bytes above
|
||||
if parquet_file_meta.row_groups.is_empty() {
|
||||
debug!(
|
||||
?meta.partition_id, ?parquet_file_meta,
|
||||
"Created parquet_file_meta has no row groups which will introduce panic later when its statistics is read");
|
||||
}
|
||||
|
||||
// Read the IOx-specific parquet metadata from the file metadata
|
||||
let parquet_meta =
|
||||
|
|
|
|||
|
|
@ -10,7 +10,11 @@ use data_types::{
|
|||
};
|
||||
use iox_time::Time;
|
||||
use object_store::DynObjectStore;
|
||||
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
|
||||
use parquet_file::{
|
||||
metadata::IoxMetadata,
|
||||
serialize::CodecError,
|
||||
storage::{ParquetStorage, UploadError},
|
||||
};
|
||||
use schema::{builder::SchemaBuilder, sort::SortKey, InfluxFieldType, TIME_COLUMN_NAME};
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -105,16 +109,53 @@ async fn test_decoded_iox_metadata() {
|
|||
);
|
||||
}
|
||||
|
||||
// Ensure that attempting to write an empty parquet file causes a panic for a
|
||||
// human to investigate why it is happening.
|
||||
// Ensure that attempting to write an empty parquet file causes a error to be
|
||||
// raised. The caller can then decide if this is acceptable plan output or a
|
||||
// bug.
|
||||
//
|
||||
// The idea is that currently it is a logical error to be producing empty
|
||||
// parquet files at all - this might not always be the case, in which case
|
||||
// removing this panic behaviour is perfectly fine too!
|
||||
// It used to be considered a logical error to be producing empty parquet files
|
||||
// at all - we have previously identified cases of useless work being performed
|
||||
// by inducing a panic when observing a parquet file with 0 rows, however we now
|
||||
// tolerate 0 row outputs as the compactor can perform multiple splits at once,
|
||||
// which is problematic when a single chunk can overlap multiple split points:
|
||||
//
|
||||
// Relates to "https://github.com/influxdata/influxdb_iox/issues/4695"
|
||||
// ────────────── Time ────────────▶
|
||||
//
|
||||
// │ │
|
||||
// ┌────────────────────────────────┐
|
||||
// │ │ Chunk 1 │ │
|
||||
// └────────────────────────────────┘
|
||||
// │ │
|
||||
//
|
||||
// │ │
|
||||
//
|
||||
// Split T1 Split T2
|
||||
//
|
||||
// If this chunk has an unusual distribution of writes over the time range it
|
||||
// covers, we can wind up with the split between T1 and T2 containing no data.
|
||||
// For example, if all the data is either before T1, or after T2 we can wind up
|
||||
// with a split plan such as this, where the middle sub-section contains no
|
||||
// data:
|
||||
//
|
||||
// │ │
|
||||
// ┌█████──────────────────────█████┐
|
||||
// │█████ │ Chunk 1 │ █████│
|
||||
// └█████──────────────────────█████┘
|
||||
// │ │
|
||||
//
|
||||
// │ │
|
||||
//
|
||||
// Split T1 Split T2
|
||||
//
|
||||
// It is not possible to use the chunk statistics (min/max timestamps) to
|
||||
// determine this empty sub-section will result ahead of time, therefore the
|
||||
// parquet encoder must tolerate it and raise a non-fatal error instead of
|
||||
// panicking.
|
||||
//
|
||||
// Relates to:
|
||||
// * https://github.com/influxdata/influxdb_iox/issues/4695
|
||||
// * https://github.com/influxdata/conductor/issues/1121
|
||||
#[tokio::test]
|
||||
#[should_panic = "serialised empty parquet file"]
|
||||
async fn test_empty_parquet_file_panic() {
|
||||
// A representative IOx data sample (with a time column, an invariant upheld
|
||||
// in the IOx write path)
|
||||
|
|
@ -150,7 +191,12 @@ async fn test_empty_parquet_file_panic() {
|
|||
let storage = ParquetStorage::new(object_store);
|
||||
|
||||
// Serialising empty data should cause a panic for human investigation.
|
||||
let _ = storage.upload(stream, &meta).await;
|
||||
let err = storage
|
||||
.upload(stream, &meta)
|
||||
.await
|
||||
.expect_err("empty file should raise an error");
|
||||
|
||||
assert!(matches!(err, UploadError::Serialise(CodecError::NoRows)));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
Loading…
Reference in New Issue