diff --git a/Cargo.lock b/Cargo.lock index 23d1518988..77376c8288 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2455,6 +2455,7 @@ dependencies = [ "chrono-english", "clap 4.0.17", "criterion", + "datafusion_util", "futures", "handlebars", "humantime", @@ -2514,6 +2515,7 @@ dependencies = [ "bytes", "data_types", "datafusion 0.1.0", + "datafusion_util", "futures", "iox_catalog", "iox_query", diff --git a/iox_data_generator/Cargo.toml b/iox_data_generator/Cargo.toml index 24d6baac34..06f92b1ea3 100644 --- a/iox_data_generator/Cargo.toml +++ b/iox_data_generator/Cargo.toml @@ -10,6 +10,7 @@ bytes = "1.2" chrono = { version = "0.4", default-features = false } chrono-english = "0.1.4" clap = { version = "4", features = ["derive", "env", "cargo"] } +datafusion_util = { path = "../datafusion_util" } futures = "0.3" handlebars = "4.3.5" humantime = "2.1.0" diff --git a/iox_data_generator/src/write.rs b/iox_data_generator/src/write.rs index 991fc9d1ce..55870dcc48 100644 --- a/iox_data_generator/src/write.rs +++ b/iox_data_generator/src/write.rs @@ -2,6 +2,7 @@ use crate::measurement::LineToGenerate; use bytes::Bytes; +use datafusion_util::MemoryStream; use futures::stream; use influxdb2_client::models::WriteDataPoint; use mutable_batch_lp::lines_to_batches; @@ -351,7 +352,7 @@ impl InnerPointsWriter { let record_batch = batch .to_arrow(Selection::All) .context(ConvertToArrowSnafu)?; - let stream = futures::stream::iter([Ok(record_batch)]); + let stream = Box::pin(MemoryStream::new(vec![record_batch])); let meta = IoxMetadata::external(crate::now_ns(), &*measurement); diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index bc27bfa630..7ea9600eb6 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -10,6 +10,7 @@ arrow = "25.0.0" bytes = "1.2" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } +datafusion_util = { path = "../datafusion_util" } iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } metric = { path = "../metric" } diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 160dcca498..026b0e9695 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -10,6 +10,7 @@ use data_types::{ ShardIndex, Table, TableId, TableSchema, Timestamp, Tombstone, TombstoneId, TopicMetadata, }; use datafusion::physical_plan::metrics::Count; +use datafusion_util::MemoryStream; use iox_catalog::{ interface::{get_schema_by_id, get_table_schema_by_id, Catalog, PartitionRepo}, mem::MemCatalog, @@ -847,7 +848,7 @@ async fn create_parquet_file( metadata: &IoxMetadata, record_batch: RecordBatch, ) -> usize { - let stream = futures::stream::once(async { Ok(record_batch) }); + let stream = Box::pin(MemoryStream::new(vec![record_batch])); let (_meta, file_size) = store .upload(stream, metadata) .await diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 477c7f3299..8f5d4cb967 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -994,6 +994,7 @@ mod tests { record_batch::RecordBatch, }; use data_types::CompactionLevel; + use datafusion_util::MemoryStream; use schema::builder::SchemaBuilder; #[test] @@ -1057,7 +1058,7 @@ mod tests { .as_arrow(); let batch = RecordBatch::try_new(schema, vec![data, timestamps]).unwrap(); - let stream = futures::stream::iter([Ok(batch.clone())]); + let stream = Box::pin(MemoryStream::new(vec![batch.clone()])); let (bytes, file_meta) = crate::serialize::to_parquet_bytes(stream, &meta) .await diff --git a/parquet_file/src/serialize.rs b/parquet_file/src/serialize.rs index 96131a9254..825164c899 100644 --- a/parquet_file/src/serialize.rs +++ b/parquet_file/src/serialize.rs @@ -1,9 +1,12 @@ //! Streaming [`RecordBatch`] / Parquet file encoder routines. +//! +//! [`RecordBatch`]: arrow::record_batch::RecordBatch use std::{io::Write, sync::Arc}; -use arrow::{error::ArrowError, record_batch::RecordBatch}; -use futures::{pin_mut, Stream, StreamExt}; +use arrow::error::ArrowError; +use datafusion::physical_plan::SendableRecordBatchStream; +use futures::{pin_mut, TryStreamExt}; use observability_deps::tracing::{debug, trace, warn}; use parquet::{ arrow::ArrowWriter, @@ -19,6 +22,8 @@ use crate::metadata::{IoxMetadata, METADATA_KEY}; pub const ROW_GROUP_WRITE_SIZE: usize = 1024 * 1024; /// [`RecordBatch`] to Parquet serialisation errors. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug, Error)] pub enum CodecError { /// The result stream contained no batches. @@ -29,14 +34,11 @@ pub enum CodecError { /// instances yielded by the stream contained 0 rows. /// /// This would result in an empty file being uploaded to object store. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch #[error("no rows to serialise")] NoRows, - /// The codec could not infer the schema for the stream as the first stream - /// item contained an [`ArrowError`]. - #[error("failed to peek record stream schema")] - SchemaPeek, - /// An arrow error during the plan execution. #[error(transparent)] Arrow(#[from] ArrowError), @@ -70,9 +72,6 @@ pub enum CodecError { /// /// # Errors /// -/// If the stream never yields any [`RecordBatch`], a -/// [`CodecError::NoRecordBatches`] is returned. -/// /// If the stream yields a [`RecordBatch`] containing no rows, a warning is /// logged and serialisation continues. /// @@ -83,31 +82,21 @@ pub enum CodecError { /// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1 /// [`FileMetaData`]: parquet::format::FileMetaData /// [`IoxParquetMetaData`]: crate::metadata::IoxParquetMetaData -pub async fn to_parquet( - batches: S, +/// [`RecordBatch`]: arrow::record_batch::RecordBatch +pub async fn to_parquet( + batches: SendableRecordBatchStream, meta: &IoxMetadata, sink: W, ) -> Result where - S: Stream> + Send, W: Write + Send, { - let stream = batches.peekable(); - pin_mut!(stream); - - // Peek into the stream and extract the schema from the first record batch. - // // The ArrowWriter::write() call will return an error if any subsequent // batch does not match this schema, enforcing schema uniformity. - let schema = stream - .as_mut() - .peek() - .await - .ok_or(CodecError::NoRecordBatches)? - .as_ref() - .ok() - .map(|v| v.schema()) - .ok_or(CodecError::SchemaPeek)?; + let schema = batches.schema(); + + let stream = batches; + pin_mut!(stream); // Serialize the IoxMetadata to the protobuf bytes. let props = writer_props(meta)?; @@ -119,8 +108,7 @@ where let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?; let mut num_batches = 0; - while let Some(maybe_batch) = stream.next().await { - let batch = maybe_batch?; + while let Some(batch) = stream.try_next().await? { writer.write(&batch)?; num_batches += 1; } @@ -145,13 +133,10 @@ where /// A helper function that calls [`to_parquet()`], serialising the parquet file /// into an in-memory buffer and returning the resulting bytes. -pub async fn to_parquet_bytes( - batches: S, +pub async fn to_parquet_bytes( + batches: SendableRecordBatchStream, meta: &IoxMetadata, -) -> Result<(Vec, parquet::format::FileMetaData), CodecError> -where - S: Stream> + Send, -{ +) -> Result<(Vec, parquet::format::FileMetaData), CodecError> { let mut bytes = vec![]; let partition_id = meta.partition_id; @@ -189,10 +174,14 @@ fn writer_props(meta: &IoxMetadata) -> Result. #[allow(clippy::assertions_on_constants)] const _: () = assert!(ROW_GROUP_WRITE_SIZE % ROW_GROUP_READ_SIZE == 0); + /// Errors returned during a Parquet "put" operation, covering [`RecordBatch`] /// pull from the provided stream, encoding, and finally uploading the bytes to /// the object store. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug, Error)] pub enum UploadError { /// A codec failure during serialisation. @@ -113,6 +112,7 @@ impl From<&'static str> for StorageId { /// type that encapsulates the storage & retrieval implementation. /// /// [`ObjectStore`]: object_store::ObjectStore +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug, Clone)] pub struct ParquetStorage { /// Underlying object store. @@ -146,14 +146,13 @@ impl ParquetStorage { /// /// This method retries forever in the presence of object store errors. All /// other errors are returned as they occur. - pub async fn upload( + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch + pub async fn upload( &self, - batches: S, + batches: SendableRecordBatchStream, meta: &IoxMetadata, - ) -> Result<(IoxParquetMetaData, usize), UploadError> - where - S: Stream> + Send, - { + ) -> Result<(IoxParquetMetaData, usize), UploadError> { let start = Instant::now(); // Stream the record batches into a parquet file. @@ -215,6 +214,8 @@ impl ParquetStorage { /// No caching is performed by `read_filter()`, and each call to /// `read_filter()` will re-download the parquet file unless the underlying /// object store impl caches the fetched bytes. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch pub fn read_filter( &self, predicate: &Predicate, @@ -295,9 +296,13 @@ pub enum ProjectionError { #[cfg(test)] mod tests { use super::*; - use arrow::array::{ArrayRef, Int64Array, StringArray}; + use arrow::{ + array::{ArrayRef, Int64Array, StringArray}, + record_batch::RecordBatch, + }; use data_types::{CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId}; use datafusion::common::DataFusionError; + use datafusion_util::MemoryStream; use iox_time::Time; use std::collections::HashMap; @@ -563,7 +568,7 @@ mod tests { meta: &IoxMetadata, batch: RecordBatch, ) -> (IoxParquetMetaData, usize) { - let stream = futures::stream::iter([Ok(batch)]); + let stream = Box::pin(MemoryStream::new(vec![batch])); store .upload(stream, meta) .await diff --git a/parquet_file/tests/metadata.rs b/parquet_file/tests/metadata.rs index f4837a3351..b1a9ffcfaf 100644 --- a/parquet_file/tests/metadata.rs +++ b/parquet_file/tests/metadata.rs @@ -8,6 +8,7 @@ use data_types::{ ColumnId, CompactionLevel, NamespaceId, PartitionId, SequenceNumber, ShardId, TableId, Timestamp, }; +use datafusion_util::MemoryStream; use iox_time::Time; use object_store::DynObjectStore; use parquet_file::{ @@ -56,7 +57,7 @@ async fn test_decoded_iox_metadata() { }; let batch = RecordBatch::try_from_iter(data).unwrap(); - let stream = futures::stream::iter([Ok(batch.clone())]); + let stream = Box::pin(MemoryStream::new(vec![batch.clone()])); let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); let storage = ParquetStorage::new(object_store, StorageId::from("iox")); @@ -185,7 +186,7 @@ async fn test_empty_parquet_file_panic() { }; let batch = RecordBatch::try_from_iter(data).unwrap(); - let stream = futures::stream::iter([Ok(batch.clone())]); + let stream = Box::pin(MemoryStream::new(vec![batch.clone()])); let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); let storage = ParquetStorage::new(object_store, StorageId::from("iox")); @@ -267,7 +268,7 @@ async fn test_decoded_many_columns_with_null_cols_iox_metadata() { }; let batch = RecordBatch::try_from_iter(data).unwrap(); - let stream = futures::stream::iter([Ok(batch.clone())]); + let stream = Box::pin(MemoryStream::new(vec![batch.clone()])); let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); let storage = ParquetStorage::new(object_store, StorageId::from("iox")); @@ -352,7 +353,7 @@ async fn test_derive_parquet_file_params() { .as_arrow(); let batch = RecordBatch::try_new(schema, data).unwrap(); - let stream = futures::stream::iter([Ok(batch.clone())]); + let stream = Box::pin(MemoryStream::new(vec![batch.clone()])); let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); let storage = ParquetStorage::new(object_store, StorageId::from("iox"));