refactor: use `SendableRecordBatchStream` to write parquets (#5911)

Use a proper typed stream instead of peeking the first element. This is
more in line with our remaining stack and shall also improve error
handling.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-10-19 12:59:53 +00:00 committed by GitHub
parent 66035ada48
commit 42b89ade03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 60 additions and 58 deletions

2
Cargo.lock generated
View File

@ -2455,6 +2455,7 @@ dependencies = [
"chrono-english",
"clap 4.0.17",
"criterion",
"datafusion_util",
"futures",
"handlebars",
"humantime",
@ -2514,6 +2515,7 @@ dependencies = [
"bytes",
"data_types",
"datafusion 0.1.0",
"datafusion_util",
"futures",
"iox_catalog",
"iox_query",

View File

@ -10,6 +10,7 @@ bytes = "1.2"
chrono = { version = "0.4", default-features = false }
chrono-english = "0.1.4"
clap = { version = "4", features = ["derive", "env", "cargo"] }
datafusion_util = { path = "../datafusion_util" }
futures = "0.3"
handlebars = "4.3.5"
humantime = "2.1.0"

View File

@ -2,6 +2,7 @@
use crate::measurement::LineToGenerate;
use bytes::Bytes;
use datafusion_util::MemoryStream;
use futures::stream;
use influxdb2_client::models::WriteDataPoint;
use mutable_batch_lp::lines_to_batches;
@ -351,7 +352,7 @@ impl InnerPointsWriter {
let record_batch = batch
.to_arrow(Selection::All)
.context(ConvertToArrowSnafu)?;
let stream = futures::stream::iter([Ok(record_batch)]);
let stream = Box::pin(MemoryStream::new(vec![record_batch]));
let meta = IoxMetadata::external(crate::now_ns(), &*measurement);

View File

@ -10,6 +10,7 @@ arrow = "25.0.0"
bytes = "1.2"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }

View File

@ -10,6 +10,7 @@ use data_types::{
ShardIndex, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId, TopicMetadata,
};
use datafusion::physical_plan::metrics::Count;
use datafusion_util::MemoryStream;
use iox_catalog::{
interface::{get_schema_by_id, get_table_schema_by_id, Catalog, PartitionRepo},
mem::MemCatalog,
@ -847,7 +848,7 @@ async fn create_parquet_file(
metadata: &IoxMetadata,
record_batch: RecordBatch,
) -> usize {
let stream = futures::stream::once(async { Ok(record_batch) });
let stream = Box::pin(MemoryStream::new(vec![record_batch]));
let (_meta, file_size) = store
.upload(stream, metadata)
.await

View File

@ -994,6 +994,7 @@ mod tests {
record_batch::RecordBatch,
};
use data_types::CompactionLevel;
use datafusion_util::MemoryStream;
use schema::builder::SchemaBuilder;
#[test]
@ -1057,7 +1058,7 @@ mod tests {
.as_arrow();
let batch = RecordBatch::try_new(schema, vec![data, timestamps]).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let stream = Box::pin(MemoryStream::new(vec![batch.clone()]));
let (bytes, file_meta) = crate::serialize::to_parquet_bytes(stream, &meta)
.await

View File

@ -1,9 +1,12 @@
//! Streaming [`RecordBatch`] / Parquet file encoder routines.
//!
//! [`RecordBatch`]: arrow::record_batch::RecordBatch
use std::{io::Write, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use futures::{pin_mut, Stream, StreamExt};
use arrow::error::ArrowError;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{pin_mut, TryStreamExt};
use observability_deps::tracing::{debug, trace, warn};
use parquet::{
arrow::ArrowWriter,
@ -19,6 +22,8 @@ use crate::metadata::{IoxMetadata, METADATA_KEY};
pub const ROW_GROUP_WRITE_SIZE: usize = 1024 * 1024;
/// [`RecordBatch`] to Parquet serialisation errors.
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[derive(Debug, Error)]
pub enum CodecError {
/// The result stream contained no batches.
@ -29,14 +34,11 @@ pub enum CodecError {
/// instances yielded by the stream contained 0 rows.
///
/// This would result in an empty file being uploaded to object store.
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[error("no rows to serialise")]
NoRows,
/// The codec could not infer the schema for the stream as the first stream
/// item contained an [`ArrowError`].
#[error("failed to peek record stream schema")]
SchemaPeek,
/// An arrow error during the plan execution.
#[error(transparent)]
Arrow(#[from] ArrowError),
@ -70,9 +72,6 @@ pub enum CodecError {
///
/// # Errors
///
/// If the stream never yields any [`RecordBatch`], a
/// [`CodecError::NoRecordBatches`] is returned.
///
/// If the stream yields a [`RecordBatch`] containing no rows, a warning is
/// logged and serialisation continues.
///
@ -83,31 +82,21 @@ pub enum CodecError {
/// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1
/// [`FileMetaData`]: parquet::format::FileMetaData
/// [`IoxParquetMetaData`]: crate::metadata::IoxParquetMetaData
pub async fn to_parquet<S, W>(
batches: S,
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
pub async fn to_parquet<W>(
batches: SendableRecordBatchStream,
meta: &IoxMetadata,
sink: W,
) -> Result<parquet::format::FileMetaData, CodecError>
where
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
W: Write + Send,
{
let stream = batches.peekable();
pin_mut!(stream);
// Peek into the stream and extract the schema from the first record batch.
//
// The ArrowWriter::write() call will return an error if any subsequent
// batch does not match this schema, enforcing schema uniformity.
let schema = stream
.as_mut()
.peek()
.await
.ok_or(CodecError::NoRecordBatches)?
.as_ref()
.ok()
.map(|v| v.schema())
.ok_or(CodecError::SchemaPeek)?;
let schema = batches.schema();
let stream = batches;
pin_mut!(stream);
// Serialize the IoxMetadata to the protobuf bytes.
let props = writer_props(meta)?;
@ -119,8 +108,7 @@ where
let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?;
let mut num_batches = 0;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
while let Some(batch) = stream.try_next().await? {
writer.write(&batch)?;
num_batches += 1;
}
@ -145,13 +133,10 @@ where
/// A helper function that calls [`to_parquet()`], serialising the parquet file
/// into an in-memory buffer and returning the resulting bytes.
pub async fn to_parquet_bytes<S>(
batches: S,
pub async fn to_parquet_bytes(
batches: SendableRecordBatchStream,
meta: &IoxMetadata,
) -> Result<(Vec<u8>, parquet::format::FileMetaData), CodecError>
where
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
{
) -> Result<(Vec<u8>, parquet::format::FileMetaData), CodecError> {
let mut bytes = vec![];
let partition_id = meta.partition_id;
@ -189,10 +174,14 @@ fn writer_props(meta: &IoxMetadata) -> Result<WriterProperties, prost::EncodeErr
mod tests {
use super::*;
use crate::metadata::IoxParquetMetaData;
use arrow::array::{ArrayRef, StringArray};
use arrow::{
array::{ArrayRef, StringArray},
record_batch::RecordBatch,
};
use bytes::Bytes;
use data_types::{CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use datafusion_util::MemoryStream;
use iox_time::Time;
use std::sync::Arc;
@ -214,7 +203,7 @@ mod tests {
};
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let stream = Box::pin(MemoryStream::new(vec![batch.clone()]));
let (bytes, _file_meta) = to_parquet_bytes(stream, &meta)
.await

View File

@ -6,11 +6,7 @@ use crate::{
serialize::{self, CodecError, ROW_GROUP_WRITE_SIZE},
ParquetFilePath,
};
use arrow::{
datatypes::{Field, SchemaRef},
error::ArrowError,
record_batch::RecordBatch,
};
use arrow::datatypes::{Field, SchemaRef};
use bytes::Bytes;
use datafusion::{
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
@ -23,7 +19,7 @@ use datafusion::{
},
prelude::SessionContext,
};
use futures::{Stream, TryStreamExt};
use futures::TryStreamExt;
use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::*;
use predicate::Predicate;
@ -42,9 +38,12 @@ pub const ROW_GROUP_READ_SIZE: usize = 1024 * 1024;
// Skip clippy due to <https://github.com/rust-lang/rust-clippy/issues/8159>.
#[allow(clippy::assertions_on_constants)]
const _: () = assert!(ROW_GROUP_WRITE_SIZE % ROW_GROUP_READ_SIZE == 0);
/// Errors returned during a Parquet "put" operation, covering [`RecordBatch`]
/// pull from the provided stream, encoding, and finally uploading the bytes to
/// the object store.
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[derive(Debug, Error)]
pub enum UploadError {
/// A codec failure during serialisation.
@ -113,6 +112,7 @@ impl From<&'static str> for StorageId {
/// type that encapsulates the storage & retrieval implementation.
///
/// [`ObjectStore`]: object_store::ObjectStore
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[derive(Debug, Clone)]
pub struct ParquetStorage {
/// Underlying object store.
@ -146,14 +146,13 @@ impl ParquetStorage {
///
/// This method retries forever in the presence of object store errors. All
/// other errors are returned as they occur.
pub async fn upload<S>(
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
pub async fn upload(
&self,
batches: S,
batches: SendableRecordBatchStream,
meta: &IoxMetadata,
) -> Result<(IoxParquetMetaData, usize), UploadError>
where
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
{
) -> Result<(IoxParquetMetaData, usize), UploadError> {
let start = Instant::now();
// Stream the record batches into a parquet file.
@ -215,6 +214,8 @@ impl ParquetStorage {
/// No caching is performed by `read_filter()`, and each call to
/// `read_filter()` will re-download the parquet file unless the underlying
/// object store impl caches the fetched bytes.
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
pub fn read_filter(
&self,
predicate: &Predicate,
@ -295,9 +296,13 @@ pub enum ProjectionError {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{ArrayRef, Int64Array, StringArray};
use arrow::{
array::{ArrayRef, Int64Array, StringArray},
record_batch::RecordBatch,
};
use data_types::{CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId};
use datafusion::common::DataFusionError;
use datafusion_util::MemoryStream;
use iox_time::Time;
use std::collections::HashMap;
@ -563,7 +568,7 @@ mod tests {
meta: &IoxMetadata,
batch: RecordBatch,
) -> (IoxParquetMetaData, usize) {
let stream = futures::stream::iter([Ok(batch)]);
let stream = Box::pin(MemoryStream::new(vec![batch]));
store
.upload(stream, meta)
.await

View File

@ -8,6 +8,7 @@ use data_types::{
ColumnId, CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId,
Timestamp,
};
use datafusion_util::MemoryStream;
use iox_time::Time;
use object_store::DynObjectStore;
use parquet_file::{
@ -56,7 +57,7 @@ async fn test_decoded_iox_metadata() {
};
let batch = RecordBatch::try_from_iter(data).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let stream = Box::pin(MemoryStream::new(vec![batch.clone()]));
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let storage = ParquetStorage::new(object_store, StorageId::from("iox"));
@ -185,7 +186,7 @@ async fn test_empty_parquet_file_panic() {
};
let batch = RecordBatch::try_from_iter(data).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let stream = Box::pin(MemoryStream::new(vec![batch.clone()]));
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let storage = ParquetStorage::new(object_store, StorageId::from("iox"));
@ -267,7 +268,7 @@ async fn test_decoded_many_columns_with_null_cols_iox_metadata() {
};
let batch = RecordBatch::try_from_iter(data).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let stream = Box::pin(MemoryStream::new(vec![batch.clone()]));
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let storage = ParquetStorage::new(object_store, StorageId::from("iox"));
@ -352,7 +353,7 @@ async fn test_derive_parquet_file_params() {
.as_arrow();
let batch = RecordBatch::try_new(schema, data).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let stream = Box::pin(MemoryStream::new(vec![batch.clone()]));
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let storage = ParquetStorage::new(object_store, StorageId::from("iox"));