From 70856a645ff9186663ae58643e26963b2d0b42db Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 18 May 2022 19:45:47 +0100 Subject: [PATCH 1/9] feat: streaming RecordBatch -> parquet encoding Implements a streaming RecordBatch to Parquet file serialiser. This impl automatically discovers the schema of the RecordBatch stream, and accepts &mut destination types (internalising the handle cloning/etc) to simplify caller usage. This encoder returns the resulting FileMetaData to allow callers to inspect the resulting metadata without reading back the file. Currently unused / not yet plumbed in. --- Cargo.lock | 1 + parquet_file/Cargo.toml | 1 + parquet_file/src/lib.rs | 1 + parquet_file/src/serialise.rs | 228 ++++++++++++++++++++++++++++++++++ 4 files changed, 231 insertions(+) create mode 100644 parquet_file/src/serialise.rs diff --git a/Cargo.lock b/Cargo.lock index fc377488d0..8225c42b69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3716,6 +3716,7 @@ dependencies = [ "schema", "snafu", "tempfile", + "thiserror", "thrift", "tokio", "uuid 0.8.2", diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index 855ead1014..57ef879b1c 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -31,3 +31,4 @@ tokio = { version = "1.18", features = ["macros", "parking_lot", "rt", "rt-multi uuid = { version = "0.8", features = ["v4"] } zstd = "0.11" workspace-hack = { path = "../workspace-hack"} +thiserror = "1.0.31" diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index 824bdaf1bb..5b4896dc03 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -10,6 +10,7 @@ pub mod chunk; pub mod metadata; +pub mod serialise; pub mod storage; use data_types::{NamespaceId, PartitionId, SequencerId, TableId}; diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialise.rs new file mode 100644 index 0000000000..086aa476d9 --- /dev/null +++ b/parquet_file/src/serialise.rs @@ -0,0 +1,228 @@ +use std::{ops::DerefMut, sync::Arc}; + +use arrow::{error::ArrowError, record_batch::RecordBatch}; +use futures::{pin_mut, Stream, StreamExt}; +use parquet::{ + arrow::ArrowWriter, + basic::Compression, + errors::ParquetError, + file::{ + metadata::KeyValue, + properties::WriterProperties, + writer::{InMemoryWriteableCursor, ParquetWriter}, + }, +}; +use thiserror::Error; + +use crate::metadata::{IoxMetadata, METADATA_KEY}; + +/// [`RecordBatch`] to Parquet serialisation errors. +#[derive(Debug, Error)] +pub enum CodecError { + /// The result stream contained no batches. + #[error("no record batches to convert")] + NoRecordBatches, + + /// The codec could not infer the schema for the stream as the first stream + /// item contained an [`ArrowError`]. + #[error("failed to peek record stream schema")] + SchemaPeek, + + /// An arrow error during the plan execution. + #[error(transparent)] + Arrow(#[from] ArrowError), + + /// Serialising the [`IoxMetadata`] to protobuf-encoded bytes failed. + #[error("failed to serialise iox metadata: {0}")] + MetadataSerialisation(#[from] prost::EncodeError), + + #[error("failed to build parquet file: {0}")] + Writer(#[from] ParquetError), + + #[error("failed to obtain writer handle clone: {0}")] + CloneSink(std::io::Error), +} + +/// An IOx-specific, streaming [`RecordBatch`] to parquet file encoder. +/// +/// This encoder discovers the schema from the first item in `batches`, and +/// encodes each [`RecordBatch`] one by one into `W`. All [`RecordBatch`] +/// yielded by the stream must be of the same schema, or this call will return +/// an error. +/// +/// IOx metadata is encoded into the parquet file's metadata under the key +/// [`METADATA_KEY`], with a base64-wrapped, protobuf serialised +/// [`proto::IoxMetadata`] structure. +/// +/// Returns the serialised [`FileMetaData`] for the encoded parquet file, from +/// which an [`IoxParquetMetaData`] can be derived. +/// +/// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1 +/// [`FileMetaData`]: parquet_format::FileMetaData +/// [`IoxParquetMetaData`]: crate::metadata::IoxParquetMetaData +pub async fn to_parquet( + batches: S, + meta: &IoxMetadata, + sink: W, +) -> Result +where + S: Stream> + Send, + W: DerefMut + Send, + T: ParquetWriter + Send + 'static, +{ + // Let the caller pass a mutable ref to something that impls ParquetWriter + // while still providing the necessary owned impl to the ArrowWriter. + let sink = sink.deref().try_clone().map_err(CodecError::CloneSink)?; + + let stream = batches.peekable(); + pin_mut!(stream); + + // Peek into the stream and extract the schema from the first record batch. + // + // The ArrowWriter::write() call will return an error if any subsequent + // batch does not match this schema, enforcing schema uniformity. + let schema = stream + .as_mut() + .peek() + .await + .ok_or(CodecError::NoRecordBatches)? + .as_ref() + .ok() + .map(|v| v.schema()) + .ok_or(CodecError::SchemaPeek)?; + + // Serialise the IoxMetadata to the protobuf bytes. + let props = writer_props(meta)?; + + // Construct the arrow serialiser with the metadata as part of the parquet + // file properties. + let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?; + + while let Some(maybe_batch) = stream.next().await { + writer.write(&maybe_batch?)?; + } + + writer.close().map_err(CodecError::from) +} + +/// A helper function that calls [`to_parquet()`], serialising the parquet file +/// into an in-memory buffer and returning the resulting bytes. +pub async fn to_parquet_bytes( + batches: S, + meta: &IoxMetadata, +) -> Result<(Vec, parquet_format::FileMetaData), CodecError> +where + S: Stream> + Send, +{ + let mut w = InMemoryWriteableCursor::default(); + + // w is a ref-counted buffer, so cloning actually passes an owned ref. + let meta = to_parquet(batches, meta, &mut w).await?; + + let mut bytes = w + .into_inner() + .expect("mem writer has outstanding reference"); + + bytes.shrink_to_fit(); + + Ok((bytes, meta)) +} + +/// Helper to construct [`WriterProperties`] for the [`ArrowWriter`], +/// serialising the given [`IoxMetadata`] and embedding it as a key=value +/// property keyed by [`METADATA_KEY`]. +fn writer_props(meta: &IoxMetadata) -> Result { + let bytes = meta.to_protobuf()?; + + let builder = WriterProperties::builder() + .set_key_value_metadata(Some(vec![KeyValue { + key: METADATA_KEY.to_string(), + value: Some(base64::encode(&bytes)), + }])) + .set_compression(Compression::ZSTD); + + Ok(builder.build()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{ArrayRef, StringBuilder}; + use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId}; + use iox_time::Time; + use parquet::{ + arrow::{ArrowReader, ParquetFileArrowReader}, + file::serialized_reader::{SerializedFileReader, SliceableCursor}, + }; + + use crate::metadata::IoxParquetMetaData; + + use super::*; + + #[tokio::test] + async fn test_encode_stream() { + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + time_of_first_write: Time::from_timestamp_nanos(4242), + time_of_last_write: Time::from_timestamp_nanos(424242), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + row_count: 1000, + compaction_level: 1, + sort_key: None, + }; + + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let (bytes, _file_meta) = to_parquet_bytes(stream, &meta) + .await + .expect("should serialise"); + + // Read the metadata from the file bytes. + // + // This is quite wordy... + let iox_parquet_meta = IoxParquetMetaData::from_file_bytes(Arc::new(bytes.clone())) + .expect("should decode") + .expect("should contain metadata") + .decode() + .expect("should decode IOx metadata") + .read_iox_metadata_new() + .expect("should read IOxMetadata"); + assert_eq!(iox_parquet_meta, meta); + + // Read the parquet file back to arrow records + let cursor = SliceableCursor::new(bytes); + let file_reader = SerializedFileReader::new(cursor).expect("should init reader"); + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + + let mut record_batches = arrow_reader + .get_record_reader(100) + .expect("should read") + .into_iter() + .collect::>(); + + assert_eq!(record_batches.len(), 1); + assert_eq!( + record_batches.pop().unwrap().expect("should be OK batch"), + batch + ); + } + + fn to_string_array(strs: &[&str]) -> ArrayRef { + let mut builder = StringBuilder::new(strs.len()); + for s in strs { + builder.append_value(s).expect("appending string"); + } + Arc::new(builder.finish()) + } +} From 76e08d14a35e45db588497bf7527d7398fcb9561 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 19 May 2022 16:40:14 +0100 Subject: [PATCH 2/9] perf: IoxParquetMetaData direct from file metadata Construct a IoxParquetMetaData instance directly from the FileMetaData instance returned by the ArrowWriter. This change will allow us to avoid the inefficient impl currently in use: * Serialise batches into memory * Wrap buffer in arrow cursor * Read parquet metadata with arrow file reader * Serialise schema with thrift * Serialise each row group's metadata with thrift * Construct our own FileMetaData instance * Serialise FileMetaData with thrift * zstd encode resulting thrift bytes * Wrap in IoxParquetMetaData Now we "only": * Stream batches into opaque Write impl * Serialise FileMetaData with thrift * zstd encode resulting thrift bytes * Wrap in IoxParquetMetaData Then accessing any data within the IoxParquetMetaData (as before this change) requires deserialising it first. There are still a number of easy performance improvements to be had w.r.t the metadata handling. --- parquet_file/src/metadata.rs | 88 ++++++++++++++++++++++++++++++------ 1 file changed, 73 insertions(+), 15 deletions(-) diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index aeaf88ff68..043d1f174e 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -434,7 +434,7 @@ fn decode_timestamp_from_field( } /// Parquet metadata with IOx-specific wrapper. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct IoxParquetMetaData { /// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes. /// @@ -523,20 +523,7 @@ impl IoxParquetMetaData { }; // step 2: serialize the thrift struct into bytes - let mut buffer = Vec::new(); - { - let mut protocol = TCompactOutputProtocol::new(&mut buffer); - thrift_file_metadata - .write_to_out_protocol(&mut protocol) - .context(ThriftWriteFailureSnafu {})?; - protocol.flush().context(ThriftWriteFailureSnafu {})?; - } - - // step 3: compress data - // Note: level 0 is the zstd-provided default - let buffer = zstd::encode_all(&buffer[..], 0).context(ZstdEncodeFailureSnafu)?; - - Ok(buffer) + Self::try_from(thrift_file_metadata).map(|opt| opt.data) } /// Decode [Apache Parquet] metadata from [Apache Thrift]-encoded, and [Zstandard]-compressed bytes. @@ -589,6 +576,26 @@ impl IoxParquetMetaData { } } +impl TryFrom for IoxParquetMetaData { + type Error = Error; + + fn try_from(v: parquet_format::FileMetaData) -> Result { + let mut buffer = Vec::new(); + { + let mut protocol = TCompactOutputProtocol::new(&mut buffer); + v.write_to_out_protocol(&mut protocol) + .context(ThriftWriteFailureSnafu {})?; + protocol.flush().context(ThriftWriteFailureSnafu {})?; + } + + // step 3: compress data + // Note: level 0 is the zstd-provided default + let buffer = zstd::encode_all(&buffer[..], 0).context(ZstdEncodeFailureSnafu)?; + + Ok(Self::from_thrift_bytes(buffer)) + } +} + /// Parquet metadata with IOx-specific wrapper, in decoded form. #[derive(Debug)] pub struct DecodedIoxParquetMetaData { @@ -841,6 +848,11 @@ fn extract_iox_statistics( #[cfg(test)] mod tests { + use arrow::{ + array::{ArrayRef, StringBuilder}, + record_batch::RecordBatch, + }; + use super::*; #[test] @@ -874,4 +886,50 @@ mod tests { assert_eq!(iox_metadata, iox_metadata_again); } + + #[tokio::test] + async fn test_metadata_from_parquet_metadata() { + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + time_of_first_write: Time::from_timestamp_nanos(4242), + time_of_last_write: Time::from_timestamp_nanos(424242), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + row_count: 1000, + compaction_level: 1, + sort_key: None, + }; + + let mut builder = StringBuilder::new(1); + builder.append_value("bananas").expect("appending string"); + let data: ArrayRef = Arc::new(builder.finish()); + + let batch = RecordBatch::try_from_iter([("a", data)]).unwrap(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + let (bytes, file_meta) = crate::serialise::to_parquet_bytes(stream, &meta) + .await + .expect("should serialise"); + + // Read the metadata from the file bytes. + // + // This is quite wordy... + let iox_parquet_meta = IoxParquetMetaData::from_file_bytes(Arc::new(bytes)) + .expect("should decode") + .expect("should contain metadata"); + + // And read the metadata directly from the file metadata returned from + // encoding. + let file_meta = IoxParquetMetaData::try_from(file_meta) + .expect("failed to decode IoxParquetMetaData from file metadata"); + assert_eq!(file_meta, iox_parquet_meta); + } } From b9a745d42d6e72705a927aee34baf17572313015 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 20 May 2022 11:00:51 +0100 Subject: [PATCH 3/9] feat: RecordBatch stream to Parquet file upload Implements an upload() method on the ParquetStorage type, consuming a stream of RecordBatch, serialising the Parquet file, and uploading the result to object storage. Returns the IOx-specific file metadata. Currently while the upload() method accepts a stream of RecordBatch, the actual resulting Parquet file is buffered in memory before uploading to object store, due to lack of streaming upload functionality in the ObjectStore abstraction - this isn't the end of the world, as the files tend to be relatively small with our current usage. This impl should be easily modified to be fully streaming once streaming object store puts are implemented: https://github.com/influxdata/object_store_rs/issues/9 --- Cargo.lock | 1 + compactor/src/compact.rs | 59 ++---- ingester/src/persist.rs | 53 +---- iox_tests/Cargo.toml | 1 + iox_tests/src/util.rs | 36 +--- parquet_file/src/chunk.rs | 2 +- parquet_file/src/storage.rs | 393 ++++++++++++++++-------------------- 7 files changed, 203 insertions(+), 342 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8225c42b69..56ecf9a0c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2486,6 +2486,7 @@ dependencies = [ "bytes", "data_types", "datafusion 0.1.0", + "futures", "iox_catalog", "iox_query", "iox_time", diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index deb614c4b8..372b247e2b 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -15,6 +15,7 @@ use data_types::{ Timestamp, Tombstone, TombstoneId, }; use datafusion::error::DataFusionError; +use futures::StreamExt; use iox_catalog::interface::{Catalog, Transaction, INITIAL_COMPACTION_LEVEL}; use iox_query::{ exec::{Executor, ExecutorType}, @@ -29,7 +30,6 @@ use observability_deps::tracing::{debug, info, trace, warn}; use parquet_file::{ metadata::{IoxMetadata, IoxParquetMetaData}, storage::ParquetStorage, - ParquetFilePath, }; use schema::sort::SortKey; use snafu::{ensure, OptionExt, ResultExt, Snafu}; @@ -134,16 +134,6 @@ pub enum Error { source: iox_catalog::interface::Error, }, - #[snafu(display("Error converting the parquet stream to bytes: {}", source))] - ConvertingToBytes { - source: parquet_file::storage::Error, - }, - - #[snafu(display("Error writing to object store: {}", source))] - WritingToObjectStore { - source: parquet_file::storage::Error, - }, - #[snafu(display("Error updating catalog {}", source))] Update { source: iox_catalog::interface::Error, @@ -179,6 +169,11 @@ pub enum Error { #[snafu(display("Could not find partition {:?}", partition_id))] PartitionNotFound { partition_id: PartitionId }, + + #[snafu(display("Could not serialise and persist record batches {}", source))] + Persist { + source: parquet_file::storage::UploadError, + }, } /// A specialized `Error` for Compactor Data errors @@ -788,7 +783,7 @@ impl Compactor { Ok(compacted) } - /// Write the given data to the given location in the given object storage. + /// Write the given data to the parquet store. /// /// Returns the persisted file size (in bytes) and metadata if a file was created. async fn persist( @@ -799,44 +794,14 @@ impl Compactor { if record_batches.is_empty() { return Ok(None); } - // All record batches have the same schema. - let schema = record_batches - .first() - .expect("record_batches.is_empty was just checked") - .schema(); - let data = store - .parquet_bytes(record_batches, schema, metadata) - .await - .context(ConvertingToBytesSnafu)?; + // TODO(4324): Yield the buffered RecordBatch instances as a stream as a + // temporary measure until streaming compaction is complete. + let stream = futures::stream::iter(record_batches).map(Ok); - if data.is_empty() { - return Ok(None); - } + let (meta, file_size) = store.upload(stream, metadata).await.context(PersistSnafu)?; - // extract metadata - let data = Arc::new(data); - let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data)) - .expect("cannot read parquet file metadata") - .expect("no metadata in parquet file"); - let data = Arc::try_unwrap(data).expect("dangling reference to data"); - - let file_size = data.len(); - - let path = ParquetFilePath::new( - metadata.namespace_id, - metadata.table_id, - metadata.sequencer_id, - metadata.partition_id, - metadata.object_store_id, - ); - - store - .to_object_store(data, &path) - .await - .context(WritingToObjectStoreSnafu)?; - - Ok(Some((file_size, md))) + Ok(Some((file_size, meta))) } async fn update_catalog( diff --git a/ingester/src/persist.rs b/ingester/src/persist.rs index 7ac5f01b08..1c4e458be7 100644 --- a/ingester/src/persist.rs +++ b/ingester/src/persist.rs @@ -1,25 +1,19 @@ //! Persist compacted data to parquet files in object storage use arrow::record_batch::RecordBatch; +use futures::StreamExt; use parquet_file::{ metadata::{IoxMetadata, IoxParquetMetaData}, storage::ParquetStorage, - ParquetFilePath, }; use snafu::{ResultExt, Snafu}; -use std::sync::Arc; #[derive(Debug, Snafu)] #[allow(missing_docs)] pub enum Error { - #[snafu(display("Error converting the parquet stream to bytes: {}", source))] - ConvertingToBytes { - source: parquet_file::storage::Error, - }, - - #[snafu(display("Error writing to object store: {}", source))] - WritingToObjectStore { - source: parquet_file::storage::Error, + #[snafu(display("Could not serialise and persist record batches {}", source))] + Persist { + source: parquet_file::storage::UploadError, }, } @@ -37,43 +31,14 @@ pub async fn persist( if record_batches.is_empty() { return Ok(None); } - let schema = record_batches - .first() - .expect("record_batches.is_empty was just checked") - .schema(); - let data = store - .parquet_bytes(record_batches, schema, metadata) - .await - .context(ConvertingToBytesSnafu)?; + // TODO(4324): Yield the buffered RecordBatch instances as a stream as a + // temporary measure until streaming compaction is complete. + let stream = futures::stream::iter(record_batches).map(Ok); - if data.is_empty() { - return Ok(None); - } + let (meta, file_size) = store.upload(stream, metadata).await.context(PersistSnafu)?; - // extract metadata - let data = Arc::new(data); - let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data)) - .expect("cannot read parquet file metadata") - .expect("no metadata in parquet file"); - let data = Arc::try_unwrap(data).expect("dangling reference to data"); - - let file_size = data.len(); - - let path = ParquetFilePath::new( - metadata.namespace_id, - metadata.table_id, - metadata.sequencer_id, - metadata.partition_id, - metadata.object_store_id, - ); - - store - .to_object_store(data, &path) - .await - .context(WritingToObjectStoreSnafu)?; - - Ok(Some((file_size, md))) + Ok(Some((file_size, meta))) } #[cfg(test)] diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index f628b6318e..0a01d56717 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -20,3 +20,4 @@ iox_query = { path = "../iox_query" } schema = { path = "../schema" } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} +futures = "0.3.21" diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 3880287041..3a60ebbf4b 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -18,11 +18,7 @@ use iox_query::{exec::Executor, provider::RecordBatchDeduplicator, util::arrow_s use iox_time::{MockProvider, Time, TimeProvider}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use object_store::{memory::InMemory, DynObjectStore}; -use parquet_file::{ - metadata::{IoxMetadata, IoxParquetMetaData}, - storage::ParquetStorage, - ParquetFilePath, -}; +use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; use schema::{ selection::Selection, sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder}, @@ -650,32 +646,12 @@ async fn create_parquet_file( metadata: &IoxMetadata, record_batch: RecordBatch, ) -> (Vec, usize) { - let schema = record_batch.schema(); - - let data = store - .parquet_bytes(vec![record_batch], schema, metadata) + let stream = futures::stream::once(async { Ok(record_batch) }); + let (meta, file_size) = store + .upload(stream, metadata) .await - .unwrap(); - let data = Arc::new(data); - let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data)) - .unwrap() - .unwrap(); - let parquet_md = md.thrift_bytes().to_vec(); - let data = Arc::try_unwrap(data).expect("dangling reference to data"); - - let file_size = data.len(); - - let path = ParquetFilePath::new( - metadata.namespace_id, - metadata.table_id, - metadata.sequencer_id, - metadata.partition_id, - metadata.object_store_id, - ); - - store.to_object_store(data, &path).await.unwrap(); - - (parquet_md, file_size) + .expect("persisting parquet file should succeed"); + (meta.thrift_bytes().to_vec(), file_size) } /// A test parquet file of the catalog diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 18ee8bb5f8..78be6c5c85 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -19,7 +19,7 @@ pub enum Error { NamedTableNotFoundInChunk { table_name: String }, #[snafu(display("Failed to read parquet: {}", source))] - ReadParquet { source: crate::storage::Error }, + ReadParquet { source: crate::storage::ReadError }, #[snafu(display("Failed to select columns: {}", source))] SelectColumns { source: schema::Error }, diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index a0ffd6837f..29ee69b1b8 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -1,8 +1,9 @@ -//! This module is responsible for writing the given data to the specified object store and reading -//! it back. +//! This module is responsible for writing the given data to the specified +//! object store and reading it back. use crate::{ - metadata::{IoxMetadata, METADATA_KEY}, + metadata::{IoxMetadata, IoxParquetMetaData}, + serialise::{self, CodecError}, ParquetFilePath, }; use arrow::{ @@ -13,228 +14,145 @@ use arrow::{ use bytes::Bytes; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::{AdapterStream, AutoAbortJoinHandle}; -use futures::{stream, StreamExt}; +use futures::Stream; use object_store::{DynObjectStore, GetResult}; use observability_deps::tracing::*; -use parking_lot::Mutex; -use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; -use parquet::file::reader::SerializedFileReader; use parquet::{ - self, - arrow::ArrowWriter, - basic::Compression, - file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone}, + arrow::{ArrowReader, ParquetFileArrowReader}, + file::reader::SerializedFileReader, }; use predicate::Predicate; use schema::selection::Selection; -use snafu::{OptionExt, ResultExt, Snafu}; use std::{ - io::{Cursor, Seek, SeekFrom, Write}, + io::{Seek, Write}, sync::Arc, }; +use thiserror::Error; -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Error opening Parquet Writer: {}", source))] - OpeningParquetWriter { - source: parquet::errors::ParquetError, - }, +/// Errors returned during a Parquet "put" operation, covering [`RecordBatch`] +/// pull from the provided stream, encoding, and finally uploading the bytes to +/// the object store. +#[derive(Debug, Error)] +pub enum UploadError { + /// A codec failure during serialisation. + #[error(transparent)] + Serialise(#[from] CodecError), - #[snafu(display("Error reading stream while creating snapshot: {}", source))] - ReadingStream { source: ArrowError }, + /// An error during Parquet metadata conversion when attempting to + /// instantiate a valid [`IoxParquetMetaData`] instance. + #[error("failed to construct IOx parquet metadata: {0}")] + Metadata(crate::metadata::Error), - #[snafu(display("Error writing Parquet to memory: {}", source))] - WritingParquetToMemory { - source: parquet::errors::ParquetError, - }, - - #[snafu(display("Error closing Parquet Writer: {}", source))] - ClosingParquetWriter { - source: parquet::errors::ParquetError, - }, - - #[snafu(display("Error writing to object store: {}", source))] - WritingToObjectStore { source: object_store::Error }, - - #[snafu(display("Error converting to vec[u8]: Nothing else should have a reference here"))] - WritingToMemWriter {}, - - #[snafu(display("Error opening temp file: {}", source))] - OpenTempFile { source: std::io::Error }, - - #[snafu(display("Error writing to temp file: {}", source))] - WriteTempFile { source: std::io::Error }, - - #[snafu(display("Error reading data from object store: {}", source))] - ReadingObjectStore { source: object_store::Error }, - - #[snafu(display("Cannot extract Parquet metadata from byte array: {}", source))] - ExtractingMetadataFailure { source: crate::metadata::Error }, - - #[snafu(display("Cannot encode metadata: {}", source))] - MetadataEncodeFailure { source: prost::EncodeError }, - - #[snafu(display("Error reading parquet: {}", source))] - ParquetReader { - source: parquet::errors::ParquetError, - }, - - #[snafu(display("No data to convert to parquet"))] - NoData {}, + /// Uploading the Parquet file to object store failed. + #[error("failed to upload to object storage: {0}")] + Upload(#[from] object_store::Error), } -pub type Result = std::result::Result; +/// Errors during Parquet file download & scan. +#[derive(Debug, Error)] +pub enum ReadError { + /// Failed to create the temporary Parquet file on disk to which the + /// downloaded parquet bytes will be spilled. + #[error("failed to create temporary file: {0}")] + TempFile(std::io::Error), + + /// Error writing the bytes fetched from object store to the temporary + /// parquet file on disk. + #[error("i/o error writing downloaded parquet: {0}")] + IO(#[from] std::io::Error), + + /// An error fetching Parquet file bytes from object store. + #[error("failed to read data from object store: {0}")] + ObjectStore(#[from] object_store::Error), + + /// An error reading the downloaded Parquet file. + #[error("invalid parquet file: {0}")] + Parquet(#[from] parquet::errors::ParquetError), +} + +/// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an +/// underlying [`ObjectStore`]. +/// +/// [`RecordBatch`] instances are serialised to Parquet files, with IOx specific +/// metadata ([`IoxParquetMetaData`]) attached. +/// +/// Code that interacts with Parquet files in object storage should utilise this +/// type that encapsulates the storage & retrieval implementation. +/// +/// [`ObjectStore`]: object_store::ObjectStore #[derive(Debug, Clone)] pub struct ParquetStorage { object_store: Arc, } impl ParquetStorage { + /// Initialise a new [`ParquetStorage`] using `object_store` as the + /// persistence layer. pub fn new(object_store: Arc) -> Self { Self { object_store } } - fn writer_props(&self, metadata_bytes: &[u8]) -> WriterProperties { - let builder = WriterProperties::builder() - .set_key_value_metadata(Some(vec![KeyValue { - key: METADATA_KEY.to_string(), - value: Some(base64::encode(&metadata_bytes)), - }])) - .set_compression(Compression::ZSTD); - - builder.build() - } - - /// Convert the given metadata and RecordBatches to parquet file bytes. Used by `ingester`. - pub async fn parquet_bytes( + /// Push `batches`, a stream of [`RecordBatch`] instances, to object + /// storage. + pub async fn upload( &self, - record_batches: Vec, - schema: SchemaRef, - metadata: &IoxMetadata, - ) -> Result> { - let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?; + batches: S, + meta: &IoxMetadata, + ) -> Result<(IoxParquetMetaData, usize), UploadError> + where + S: Stream> + Send, + { + // Stream the record batches into a parquet file. + // + // It would be nice to stream the encoded parquet to disk for this and + // eliminate the buffering in memory, but the lack of a streaming object + // store put negates any benefit of spilling to disk. + // + // This is not a huge concern, as the resulting parquet files are + // currently smallish on average. + let (data, parquet_file_meta) = serialise::to_parquet_bytes(batches, meta).await?; - let mut stream = Box::pin(stream::iter(record_batches.into_iter().map(Ok))); + // Read the IOx-specific parquet metadata from the file metadata + let parquet_meta = + IoxParquetMetaData::try_from(parquet_file_meta).map_err(UploadError::Metadata)?; - let props = self.writer_props(&metadata_bytes); + // Derive the correct object store path from the metadata. + let path = ParquetFilePath::new( + meta.namespace_id, + meta.table_id, + meta.sequencer_id, + meta.partition_id, + meta.object_store_id, + ) + .object_store_path(); - let mem_writer = MemWriter::default(); - { - let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props)) - .context(OpeningParquetWriterSnafu)?; - let mut no_stream_data = true; - while let Some(batch) = stream.next().await { - no_stream_data = false; - let batch = batch.context(ReadingStreamSnafu)?; - writer.write(&batch).context(WritingParquetToMemorySnafu)?; - } - if no_stream_data { - return Ok(vec![]); - } - writer.close().context(ClosingParquetWriterSnafu)?; - } // drop the reference to the MemWriter that the SerializedFileWriter has + let file_size = data.len(); + self.object_store.put(&path, Bytes::from(data)).await?; - mem_writer.into_inner().context(WritingToMemWriterSnafu) + Ok((parquet_meta, file_size)) } - /// Put the given vector of bytes to the specified location - pub async fn to_object_store(&self, data: Vec, path: &ParquetFilePath) -> Result<()> { - let data = Bytes::from(data); - let path = path.object_store_path(); - - self.object_store - .put(&path, data) - .await - .context(WritingToObjectStoreSnafu) - } - - /// Return indices of the schema's fields of the selection columns - pub fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec { - let fields = schema.fields().iter(); - - match selection { - Selection::Some(cols) => fields - .enumerate() - .filter_map(|(p, x)| { - if cols.contains(&x.name().as_str()) { - Some(p) - } else { - None - } - }) - .collect(), - Selection::All => fields.enumerate().map(|(p, _)| p).collect(), - } - } - - /// Downloads the specified parquet file to a local temporary file - /// and uses the `[ParquetExec`] + /// Pull the Parquet-encoded [`RecordBatch`] at the file path derived from + /// the provided [`ParquetFilePath`]. /// - /// The resulting record batches from Parquet are sent back to `tx` - fn download_and_scan_parquet( - projection: Vec, - path: ParquetFilePath, - object_store: Arc, - tx: tokio::sync::mpsc::Sender>, - ) -> Result<()> { - // Size of each batch - let batch_size = 1024; // Todo: make a constant or policy for this - let path = path.object_store_path(); - - let read_stream = futures::executor::block_on(object_store.get(&path)) - .context(ReadingObjectStoreSnafu)?; - - let file = match read_stream { - GetResult::File(f, _) => { - trace!(?path, "Using file directly"); - futures::executor::block_on(f.into_std()) - } - GetResult::Stream(read_stream) => { - // read parquet file to local file - let mut file = tempfile::tempfile().context(OpenTempFileSnafu)?; - - trace!(?path, ?file, "Beginning to read parquet to temp file"); - - for bytes in futures::executor::block_on_stream(read_stream) { - let bytes = bytes.context(ReadingObjectStoreSnafu)?; - trace!(len = bytes.len(), "read bytes from object store"); - file.write_all(&bytes).context(WriteTempFileSnafu)?; - } - - file.rewind().context(WriteTempFileSnafu)?; - - trace!(?path, "Completed read parquet to tempfile"); - file - } - }; - - let file_reader = SerializedFileReader::new(file).context(ParquetReaderSnafu)?; - let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - let record_batch_reader = arrow_reader - .get_record_reader_by_columns(projection, batch_size) - .context(ParquetReaderSnafu)?; - - for batch in record_batch_reader { - if tx.blocking_send(batch).is_err() { - debug!(?path, "Receiver hung up - exiting"); - break; - } - } - - debug!(?path, "Completed parquet download & scan"); - - Ok(()) - } - + /// The `selection` projection is pushed down to the Parquet deserialiser. + /// + /// This impl fetches the associated Parquet file bytes from object storage, + /// temporarily persisting them to a local temp file to feed to the arrow + /// reader. + /// + /// No caching is performed by `read_filter()`, and each call to + /// `read_filter()` will re-download the parquet file unless the underlying + /// object store impl caches the fetched bytes. pub fn read_filter( &self, _predicate: &Predicate, selection: Selection<'_>, schema: SchemaRef, path: ParquetFilePath, - ) -> Result { + ) -> Result { // Indices of columns in the schema needed to read - let projection: Vec = Self::column_indices(selection, Arc::clone(&schema)); + let projection: Vec = column_indices(selection, Arc::clone(&schema)); // Compute final (output) schema after selection let schema = Arc::new(Schema::new( @@ -253,7 +171,7 @@ impl ParquetStorage { let object_store = Arc::clone(&self.object_store); move || { let download_result = - Self::download_and_scan_parquet(projection, path, object_store, tx.clone()); + download_and_scan_parquet(projection, path, object_store, tx.clone()); // If there was an error returned from download_and_scan_parquet send it back to the receiver. if let Err(e) = download_result { @@ -276,44 +194,79 @@ impl ParquetStorage { } } -#[derive(Debug, Default, Clone)] -pub struct MemWriter { - mem: Arc>>>, -} +/// Return indices of the schema's fields of the selection columns +fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec { + let fields = schema.fields().iter(); -impl MemWriter { - /// Returns the inner buffer as long as there are no other references to the - /// Arc. - pub fn into_inner(self) -> Option> { - Arc::try_unwrap(self.mem) - .ok() - .map(|mutex| mutex.into_inner().into_inner()) + match selection { + Selection::Some(cols) => fields + .enumerate() + .filter_map(|(p, x)| { + if cols.contains(&x.name().as_str()) { + Some(p) + } else { + None + } + }) + .collect(), + Selection::All => fields.enumerate().map(|(p, _)| p).collect(), } } -impl Write for MemWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut inner = self.mem.lock(); - inner.write(buf) +/// Downloads the specified parquet file to a local temporary file +/// and push the [`RecordBatch`] contents over `tx`, projecting the specified +/// column indexes. +/// +/// This call MAY download a parquet file from object storage, temporarily +/// spilling it to disk while it is processed. +fn download_and_scan_parquet( + projection: Vec, + path: ParquetFilePath, + object_store: Arc, + tx: tokio::sync::mpsc::Sender>, +) -> Result<(), ReadError> { + // Size of each batch + let batch_size = 1024; // Todo: make a constant or policy for this + let path = path.object_store_path(); + + let read_stream = futures::executor::block_on(object_store.get(&path))?; + + let file = match read_stream { + GetResult::File(f, _) => { + trace!(?path, "Using file directly"); + futures::executor::block_on(f.into_std()) + } + GetResult::Stream(read_stream) => { + // read parquet file to local file + let mut file = tempfile::tempfile().map_err(ReadError::TempFile)?; + + trace!(?path, ?file, "Beginning to read parquet to temp file"); + + for bytes in futures::executor::block_on_stream(read_stream) { + let bytes = bytes?; + trace!(len = bytes.len(), "read bytes from object store"); + file.write_all(&bytes)?; + } + + file.rewind()?; + + trace!(?path, "Completed read parquet to tempfile"); + file + } + }; + + let file_reader = SerializedFileReader::new(file)?; + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + let record_batch_reader = arrow_reader.get_record_reader_by_columns(projection, batch_size)?; + + for batch in record_batch_reader { + if tx.blocking_send(batch).is_err() { + debug!(?path, "Receiver hung up - exiting"); + break; + } } - fn flush(&mut self) -> std::io::Result<()> { - let mut inner = self.mem.lock(); - inner.flush() - } -} + debug!(?path, "Completed parquet download & scan"); -impl Seek for MemWriter { - fn seek(&mut self, pos: SeekFrom) -> std::io::Result { - let mut inner = self.mem.lock(); - inner.seek(pos) - } -} - -impl TryClone for MemWriter { - fn try_clone(&self) -> std::io::Result { - Ok(Self { - mem: Arc::clone(&self.mem), - }) - } + Ok(()) } From 302301659e09c31d5b26b277963d8001caff9fbd Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 20 May 2022 12:22:09 +0100 Subject: [PATCH 4/9] refactor: derive ParquetFilePath from IoxMetadata Allow directly converting an IoxMetadata to a ParquetFilePath. --- parquet_file/src/lib.rs | 12 ++++++++++++ parquet_file/src/storage.rs | 9 +-------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index 5b4896dc03..d05120dbe1 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -72,6 +72,18 @@ impl From<&Self> for ParquetFilePath { } } +impl From<&crate::metadata::IoxMetadata> for ParquetFilePath { + fn from(m: &crate::metadata::IoxMetadata) -> Self { + Self { + namespace_id: m.namespace_id, + table_id: m.table_id, + sequencer_id: m.sequencer_id, + partition_id: m.partition_id, + object_store_id: m.object_store_id, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 29ee69b1b8..559439f504 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -117,14 +117,7 @@ impl ParquetStorage { IoxParquetMetaData::try_from(parquet_file_meta).map_err(UploadError::Metadata)?; // Derive the correct object store path from the metadata. - let path = ParquetFilePath::new( - meta.namespace_id, - meta.table_id, - meta.sequencer_id, - meta.partition_id, - meta.object_store_id, - ) - .object_store_path(); + let path = ParquetFilePath::from(meta).object_store_path(); let file_size = data.len(); self.object_store.put(&path, Bytes::from(data)).await?; From cdb341d45a731da57634015a76a57039f5be82dc Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 20 May 2022 12:23:05 +0100 Subject: [PATCH 5/9] test: ParquetStorage upload() and read_filter() Adds tests for the previously untested (directly at least) Parquet (de)serialisation & persistence layer, provided by the ParquetStorage type. --- parquet_file/src/storage.rs | 77 +++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 559439f504..1cc13ac095 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -263,3 +263,80 @@ fn download_and_scan_parquet( Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + use arrow::array::{ArrayRef, StringBuilder}; + use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId}; + use iox_time::Time; + + #[tokio::test] + async fn test_parquet_round_trip() { + let object_store: Arc = Arc::new(object_store::memory::InMemory::default()); + + let store = ParquetStorage::new(object_store); + + let meta = IoxMetadata { + object_store_id: Default::default(), + creation_timestamp: Time::from_timestamp_nanos(42), + namespace_id: NamespaceId::new(1), + namespace_name: "bananas".into(), + sequencer_id: SequencerId::new(2), + table_id: TableId::new(3), + table_name: "platanos".into(), + partition_id: PartitionId::new(4), + partition_key: "potato".into(), + time_of_first_write: Time::from_timestamp_nanos(4242), + time_of_last_write: Time::from_timestamp_nanos(424242), + min_sequence_number: SequenceNumber::new(10), + max_sequence_number: SequenceNumber::new(11), + row_count: 1000, + compaction_level: 1, + sort_key: None, + }; + let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap(); + let schema = batch.schema(); + let stream = futures::stream::iter([Ok(batch.clone())]); + + // Serialise & upload the record batches. + let (file_meta, _file_size) = store + .upload(stream, &meta) + .await + .expect("should serialise and store sucessfully"); + + // Extract the various bits of metadata. + let file_meta = file_meta.decode().expect("should decode parquet metadata"); + let got_iox_meta = file_meta + .read_iox_metadata_new() + .expect("should read IOx metadata from parquet meta"); + + // Ensure the metadata in the file decodes to the same IOx metadata we + // provided when uploading. + assert_eq!(got_iox_meta, meta); + + // Fetch the record batches and compare them to the input batches. + let path = ParquetFilePath::from(&meta); + let rx = store + .read_filter(&Predicate::default(), Selection::All, schema, path) + .expect("should read record batches from object store"); + + // Drain the retrieved record batch stream + let mut got = datafusion::physical_plan::common::collect(rx) + .await + .expect("failed to drain record stream"); + + // And compare to the original input + assert_eq!(got.len(), 1); + assert_eq!(got.pop().unwrap(), batch); + } + + fn to_string_array(strs: &[&str]) -> ArrayRef { + let mut builder = StringBuilder::new(strs.len()); + for s in strs { + builder.append_value(s).expect("appending string"); + } + Arc::new(builder.finish()) + } +} From 661f8599a68a037ca46dd2402b9776c900723327 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 20 May 2022 14:15:10 +0100 Subject: [PATCH 6/9] refactor: internalise Parquet path generation Derive the ParquetFilePath from the IoxMetadata within the ParquetStorage::read_filter() call. This prevents the "put/get RecordBatches" abstraction from leaking out the object store path generation concern - an implementation detail of the ParquetStorage layer. --- parquet_file/src/chunk.rs | 28 ++++++---------------------- parquet_file/src/storage.rs | 11 ++++++----- 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 78be6c5c85..44d88dc1a8 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -7,7 +7,6 @@ use data_types::{ ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange, }; use datafusion::physical_plan::SendableRecordBatchStream; -use observability_deps::tracing::*; use predicate::Predicate; use schema::{selection::Selection, Schema, TIME_COLUMN_NAME}; use snafu::{ResultExt, Snafu}; @@ -89,15 +88,15 @@ pub struct ParquetChunk { /// Persists the parquet file within a database's relative path store: ParquetStorage, - /// Path in the database's object store. - path: ParquetFilePath, - /// Size of the data, in object store file_size_bytes: usize, /// Parquet metadata that can be used checkpoint the catalog state. parquet_metadata: Arc, + /// The [`IoxMetadata`] data from the [`DecodedParquetFile`]. + iox_metadata: IoxMetadata, + /// Number of rows rows: usize, @@ -112,15 +111,6 @@ impl ParquetChunk { metrics: ChunkMetrics, store: ParquetStorage, ) -> Self { - let iox_metadata = &decoded_parquet_file.iox_metadata; - let path = ParquetFilePath::new( - iox_metadata.namespace_id, - iox_metadata.table_id, - iox_metadata.sequencer_id, - iox_metadata.partition_id, - iox_metadata.object_store_id, - ); - let decoded = decoded_parquet_file .parquet_metadata .as_ref() @@ -138,19 +128,14 @@ impl ParquetChunk { schema, timestamp_min_max, store, - path, file_size_bytes, parquet_metadata: Arc::clone(&decoded_parquet_file.parquet_metadata), + iox_metadata: decoded_parquet_file.iox_metadata.clone(), rows, metrics, } } - /// Return object store path for this chunk - pub fn path(&self) -> &ParquetFilePath { - &self.path - } - /// Returns the summary statistics for this chunk pub fn table_summary(&self) -> &Arc { &self.table_summary @@ -162,7 +147,7 @@ impl ParquetChunk { mem::size_of::() + self.table_summary.size() + mem::size_of_val(&self.schema.as_ref()) - + mem::size_of_val(&self.path) + + mem::size_of_val(&self.iox_metadata) + self.parquet_metadata.size() } @@ -209,13 +194,12 @@ impl ParquetChunk { predicate: &Predicate, selection: Selection<'_>, ) -> Result { - trace!(path=?self.path, "fetching parquet data for filtered read"); self.store .read_filter( predicate, selection, Arc::clone(&self.schema.as_arrow()), - self.path, + &self.iox_metadata, ) .context(ReadParquetSnafu) } diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 1cc13ac095..56d22cc3a6 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -142,8 +142,11 @@ impl ParquetStorage { _predicate: &Predicate, selection: Selection<'_>, schema: SchemaRef, - path: ParquetFilePath, + meta: &IoxMetadata, ) -> Result { + let path = ParquetFilePath::from(meta).object_store_path(); + trace!(path=?path, "fetching parquet data for filtered read"); + // Indices of columns in the schema needed to read let projection: Vec = column_indices(selection, Arc::clone(&schema)); @@ -214,13 +217,12 @@ fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec { /// spilling it to disk while it is processed. fn download_and_scan_parquet( projection: Vec, - path: ParquetFilePath, + path: object_store::path::Path, object_store: Arc, tx: tokio::sync::mpsc::Sender>, ) -> Result<(), ReadError> { // Size of each batch let batch_size = 1024; // Todo: make a constant or policy for this - let path = path.object_store_path(); let read_stream = futures::executor::block_on(object_store.get(&path))?; @@ -317,9 +319,8 @@ mod tests { assert_eq!(got_iox_meta, meta); // Fetch the record batches and compare them to the input batches. - let path = ParquetFilePath::from(&meta); let rx = store - .read_filter(&Predicate::default(), Selection::All, schema, path) + .read_filter(&Predicate::default(), Selection::All, schema, &meta) .expect("should read record batches from object store"); // Drain the retrieved record batch stream From 7df7c4844c6fd69787dfa436d3c5ab89125cd9bd Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 20 May 2022 15:15:06 +0100 Subject: [PATCH 7/9] refactor: remove redundant ParquetChunk errors Eliminates unused / refactors away unnecessary errors for the parquet::chunk module. --- compactor/src/query.rs | 6 ++-- parquet_file/src/chunk.rs | 59 ++++--------------------------- querier/src/chunk/query_access.rs | 2 +- 3 files changed, 12 insertions(+), 55 deletions(-) diff --git a/compactor/src/query.rs b/compactor/src/query.rs index 6ff374c0b2..f2668ec29a 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -20,14 +20,16 @@ use std::sync::Arc; #[allow(missing_copy_implementations, missing_docs)] pub enum Error { #[snafu(display("Failed to read parquet: {}", source))] - ReadParquet { source: parquet_file::chunk::Error }, + ReadParquet { + source: parquet_file::storage::ReadError, + }, #[snafu(display( "Error reading IOx Metadata from Parquet IoxParquetMetadata: {}", source ))] ReadParquetMeta { - source: parquet_file::metadata::Error, + source: parquet_file::storage::ReadError, }, } diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 44d88dc1a8..92ba9d7d91 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,7 +1,6 @@ use crate::{ metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData}, storage::ParquetStorage, - ParquetFilePath, }; use data_types::{ ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange, @@ -9,50 +8,8 @@ use data_types::{ use datafusion::physical_plan::SendableRecordBatchStream; use predicate::Predicate; use schema::{selection::Selection, Schema, TIME_COLUMN_NAME}; -use snafu::{ResultExt, Snafu}; use std::{collections::BTreeSet, mem, sync::Arc}; -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Table '{}' not found in chunk", table_name))] - NamedTableNotFoundInChunk { table_name: String }, - - #[snafu(display("Failed to read parquet: {}", source))] - ReadParquet { source: crate::storage::ReadError }, - - #[snafu(display("Failed to select columns: {}", source))] - SelectColumns { source: schema::Error }, - - #[snafu( - display("Cannot decode parquet metadata from {:?}: {}", path, source), - visibility(pub) - )] - MetadataDecodeFailed { - source: crate::metadata::Error, - path: ParquetFilePath, - }, - - #[snafu( - display("Cannot read schema from {:?}: {}", path, source), - visibility(pub) - )] - SchemaReadFailed { - source: crate::metadata::Error, - path: ParquetFilePath, - }, - - #[snafu( - display("Cannot read statistics from {:?}: {}", path, source), - visibility(pub) - )] - StatisticsReadFailed { - source: crate::metadata::Error, - path: ParquetFilePath, - }, -} - -pub type Result = std::result::Result; - #[derive(Debug)] #[allow(missing_copy_implementations)] pub struct ChunkMetrics { @@ -193,15 +150,13 @@ impl ParquetChunk { &self, predicate: &Predicate, selection: Selection<'_>, - ) -> Result { - self.store - .read_filter( - predicate, - selection, - Arc::clone(&self.schema.as_arrow()), - &self.iox_metadata, - ) - .context(ReadParquetSnafu) + ) -> Result { + self.store.read_filter( + predicate, + selection, + Arc::clone(&self.schema.as_arrow()), + &self.iox_metadata, + ) } /// The total number of rows in all row groups in this chunk. diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 568e1c1610..01e31320b4 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -13,7 +13,7 @@ use std::sync::Arc; pub enum Error { #[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))] ParquetFileChunkError { - source: parquet_file::chunk::Error, + source: parquet_file::storage::ReadError, chunk_id: ChunkId, }, } From 00dc95829d12359c2b72c78fc2505c3a5abd2144 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 20 May 2022 15:15:29 +0100 Subject: [PATCH 8/9] style: enable more lints Enable more lints on the parquet_file crate to keep it a little cleaner - adds the following: clippy::clone_on_ref_ptr, unreachable_pub, missing_docs, clippy::todo, clippy::dbg_macro This commit includes fixes for any new lint failures. --- parquet_file/src/chunk.rs | 19 ++++++++++++++----- parquet_file/src/lib.rs | 9 ++++++++- parquet_file/src/metadata.rs | 5 ++++- parquet_file/src/serialise.rs | 4 ++++ 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 92ba9d7d91..9022dba184 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,3 +1,6 @@ +//! A metadata summary of a Parquet file in object storage, with the ability to +//! download & execute a scan. + use crate::{ metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData}, storage::ParquetStorage, @@ -11,7 +14,7 @@ use schema::{selection::Selection, Schema, TIME_COLUMN_NAME}; use std::{collections::BTreeSet, mem, sync::Arc}; #[derive(Debug)] -#[allow(missing_copy_implementations)] +#[allow(missing_copy_implementations, missing_docs)] pub struct ChunkMetrics { // Placeholder } @@ -25,11 +28,14 @@ impl ChunkMetrics { Self {} } + /// This constructor builds nothing. pub fn new(_metrics: &metric::Registry) -> Self { Self {} } } +/// A abstract representation of a Parquet file in object storage, with +/// associated metadata. #[derive(Debug)] pub struct ParquetChunk { /// Meta data of the table @@ -113,8 +119,8 @@ impl ParquetChunk { Arc::clone(&self.schema) } - // Return true if this chunk contains values within the time - // range, or if the range is `None`. + /// Return true if this chunk contains values within the time range, or if + /// the range is `None`. pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool { match (self.timestamp_min_max, timestamp_range) { (Some(timestamp_min_max), Some(timestamp_range)) => { @@ -127,7 +133,7 @@ impl ParquetChunk { } } - // Return the columns names that belong to the given column selection + /// Return the columns names that belong to the given column selection pub fn column_names(&self, selection: Selection<'_>) -> Option> { let fields = self.schema.inner().fields().iter(); @@ -191,8 +197,10 @@ fn extract_range(table_summary: &TableSummary) -> Option { None }) } -// Parquet file with decoded metadata. + +/// Parquet file with decoded metadata. #[derive(Debug)] +#[allow(missing_docs)] pub struct DecodedParquetFile { pub parquet_file: ParquetFile, pub parquet_metadata: Arc, @@ -201,6 +209,7 @@ pub struct DecodedParquetFile { } impl DecodedParquetFile { + /// initialise a [`DecodedParquetFile`] from the provided file & metadata. pub fn new(parquet_file_with_metadata: ParquetFileWithMetadata) -> Self { let (parquet_file, parquet_metadata) = parquet_file_with_metadata.split_off_metadata(); let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(parquet_metadata)); diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index d05120dbe1..3afcdb053f 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -1,3 +1,5 @@ +//! Parquet file generation, storage, and metadata implementations. + #![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)] #![warn( missing_copy_implementations, @@ -5,8 +7,13 @@ clippy::explicit_iter_loop, clippy::future_not_send, clippy::use_self, - clippy::clone_on_ref_ptr + clippy::clone_on_ref_ptr, + unreachable_pub, + missing_docs, + clippy::todo, + clippy::dbg_macro )] +#![allow(clippy::missing_docs_in_private_items)] pub mod chunk; pub mod metadata; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 043d1f174e..b35c506c70 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -127,6 +127,7 @@ pub const METADATA_VERSION: u32 = 10; pub const METADATA_KEY: &str = "IOX:metadata"; #[derive(Debug, Snafu)] +#[allow(missing_docs)] pub enum Error { #[snafu(display("Cannot read parquet metadata from bytes: {}", source))] ParquetMetaDataRead { @@ -225,6 +226,8 @@ pub enum Error { #[snafu(display("Cannot parse UUID: {}", source))] UuidParse { source: uuid::Error }, } + +#[allow(missing_docs)] pub type Result = std::result::Result; /// IOx-specific metadata. @@ -385,7 +388,7 @@ impl IoxMetadata { uuid == self.object_store_id } - // create a corresponding iox catalog's ParquetFile + /// Create a corresponding iox catalog's ParquetFile pub fn to_parquet_file( &self, file_size_bytes: usize, diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialise.rs index 086aa476d9..763fb1f045 100644 --- a/parquet_file/src/serialise.rs +++ b/parquet_file/src/serialise.rs @@ -1,3 +1,5 @@ +//! Streaming [`RecordBatch`] / Parquet file encoder routines. + use std::{ops::DerefMut, sync::Arc}; use arrow::{error::ArrowError, record_batch::RecordBatch}; @@ -36,9 +38,11 @@ pub enum CodecError { #[error("failed to serialise iox metadata: {0}")] MetadataSerialisation(#[from] prost::EncodeError), + /// Writing the parquet file failed with the specified error. #[error("failed to build parquet file: {0}")] Writer(#[from] ParquetError), + /// Attempting to clone a handle to the provided write sink failed. #[error("failed to obtain writer handle clone: {0}")] CloneSink(std::io::Error), } From af6d3f4d48a2f5fdb43fde939d68125962e67110 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 May 2022 11:46:06 +0100 Subject: [PATCH 9/9] docs: remove clone ref comment --- parquet_file/src/serialise.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet_file/src/serialise.rs b/parquet_file/src/serialise.rs index 763fb1f045..4d27e1b561 100644 --- a/parquet_file/src/serialise.rs +++ b/parquet_file/src/serialise.rs @@ -120,7 +120,7 @@ where { let mut w = InMemoryWriteableCursor::default(); - // w is a ref-counted buffer, so cloning actually passes an owned ref. + // Serialise the record batches into the in-memory buffer let meta = to_parquet(batches, meta, &mut w).await?; let mut bytes = w