Merge pull request #4649 from influxdata/dom/codec-object-store

perf: streaming RecordBatch -> parquet encoder
pull/24376/head
kodiakhq[bot] 2022-05-23 14:45:35 +00:00 committed by GitHub
commit 1fccee841b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 634 additions and 437 deletions

2
Cargo.lock generated
View File

@ -2486,6 +2486,7 @@ dependencies = [
"bytes",
"data_types",
"datafusion 0.1.0",
"futures",
"iox_catalog",
"iox_query",
"iox_time",
@ -3716,6 +3717,7 @@ dependencies = [
"schema",
"snafu",
"tempfile",
"thiserror",
"thrift",
"tokio",
"uuid 0.8.2",

View File

@ -15,6 +15,7 @@ use data_types::{
Timestamp, Tombstone, TombstoneId,
};
use datafusion::error::DataFusionError;
use futures::StreamExt;
use iox_catalog::interface::{Catalog, Transaction, INITIAL_COMPACTION_LEVEL};
use iox_query::{
exec::{Executor, ExecutorType},
@ -29,7 +30,6 @@ use observability_deps::tracing::{debug, info, trace, warn};
use parquet_file::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use schema::sort::SortKey;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
@ -134,16 +134,6 @@ pub enum Error {
source: iox_catalog::interface::Error,
},
#[snafu(display("Error converting the parquet stream to bytes: {}", source))]
ConvertingToBytes {
source: parquet_file::storage::Error,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore {
source: parquet_file::storage::Error,
},
#[snafu(display("Error updating catalog {}", source))]
Update {
source: iox_catalog::interface::Error,
@ -179,6 +169,11 @@ pub enum Error {
#[snafu(display("Could not find partition {:?}", partition_id))]
PartitionNotFound { partition_id: PartitionId },
#[snafu(display("Could not serialise and persist record batches {}", source))]
Persist {
source: parquet_file::storage::UploadError,
},
}
/// A specialized `Error` for Compactor Data errors
@ -788,7 +783,7 @@ impl Compactor {
Ok(compacted)
}
/// Write the given data to the given location in the given object storage.
/// Write the given data to the parquet store.
///
/// Returns the persisted file size (in bytes) and metadata if a file was created.
async fn persist(
@ -799,44 +794,14 @@ impl Compactor {
if record_batches.is_empty() {
return Ok(None);
}
// All record batches have the same schema.
let schema = record_batches
.first()
.expect("record_batches.is_empty was just checked")
.schema();
let data = store
.parquet_bytes(record_batches, schema, metadata)
.await
.context(ConvertingToBytesSnafu)?;
// TODO(4324): Yield the buffered RecordBatch instances as a stream as a
// temporary measure until streaming compaction is complete.
let stream = futures::stream::iter(record_batches).map(Ok);
if data.is_empty() {
return Ok(None);
}
let (meta, file_size) = store.upload(stream, metadata).await.context(PersistSnafu)?;
// extract metadata
let data = Arc::new(data);
let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data))
.expect("cannot read parquet file metadata")
.expect("no metadata in parquet file");
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let path = ParquetFilePath::new(
metadata.namespace_id,
metadata.table_id,
metadata.sequencer_id,
metadata.partition_id,
metadata.object_store_id,
);
store
.to_object_store(data, &path)
.await
.context(WritingToObjectStoreSnafu)?;
Ok(Some((file_size, md)))
Ok(Some((file_size, meta)))
}
async fn update_catalog(

View File

@ -20,14 +20,16 @@ use std::sync::Arc;
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Failed to read parquet: {}", source))]
ReadParquet { source: parquet_file::chunk::Error },
ReadParquet {
source: parquet_file::storage::ReadError,
},
#[snafu(display(
"Error reading IOx Metadata from Parquet IoxParquetMetadata: {}",
source
))]
ReadParquetMeta {
source: parquet_file::metadata::Error,
source: parquet_file::storage::ReadError,
},
}

View File

@ -1,25 +1,19 @@
//! Persist compacted data to parquet files in object storage
use arrow::record_batch::RecordBatch;
use futures::StreamExt;
use parquet_file::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Error converting the parquet stream to bytes: {}", source))]
ConvertingToBytes {
source: parquet_file::storage::Error,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore {
source: parquet_file::storage::Error,
#[snafu(display("Could not serialise and persist record batches {}", source))]
Persist {
source: parquet_file::storage::UploadError,
},
}
@ -37,43 +31,14 @@ pub async fn persist(
if record_batches.is_empty() {
return Ok(None);
}
let schema = record_batches
.first()
.expect("record_batches.is_empty was just checked")
.schema();
let data = store
.parquet_bytes(record_batches, schema, metadata)
.await
.context(ConvertingToBytesSnafu)?;
// TODO(4324): Yield the buffered RecordBatch instances as a stream as a
// temporary measure until streaming compaction is complete.
let stream = futures::stream::iter(record_batches).map(Ok);
if data.is_empty() {
return Ok(None);
}
let (meta, file_size) = store.upload(stream, metadata).await.context(PersistSnafu)?;
// extract metadata
let data = Arc::new(data);
let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data))
.expect("cannot read parquet file metadata")
.expect("no metadata in parquet file");
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let path = ParquetFilePath::new(
metadata.namespace_id,
metadata.table_id,
metadata.sequencer_id,
metadata.partition_id,
metadata.object_store_id,
);
store
.to_object_store(data, &path)
.await
.context(WritingToObjectStoreSnafu)?;
Ok(Some((file_size, md)))
Ok(Some((file_size, meta)))
}
#[cfg(test)]

View File

@ -20,3 +20,4 @@ iox_query = { path = "../iox_query" }
schema = { path = "../schema" }
uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
futures = "0.3.21"

View File

@ -18,11 +18,7 @@ use iox_query::{exec::Executor, provider::RecordBatchDeduplicator, util::arrow_s
use iox_time::{MockProvider, Time, TimeProvider};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use object_store::{memory::InMemory, DynObjectStore};
use parquet_file::{
metadata::{IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
use schema::{
selection::Selection,
sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder},
@ -650,32 +646,12 @@ async fn create_parquet_file(
metadata: &IoxMetadata,
record_batch: RecordBatch,
) -> (Vec<u8>, usize) {
let schema = record_batch.schema();
let data = store
.parquet_bytes(vec![record_batch], schema, metadata)
let stream = futures::stream::once(async { Ok(record_batch) });
let (meta, file_size) = store
.upload(stream, metadata)
.await
.unwrap();
let data = Arc::new(data);
let md = IoxParquetMetaData::from_file_bytes(Arc::clone(&data))
.unwrap()
.unwrap();
let parquet_md = md.thrift_bytes().to_vec();
let data = Arc::try_unwrap(data).expect("dangling reference to data");
let file_size = data.len();
let path = ParquetFilePath::new(
metadata.namespace_id,
metadata.table_id,
metadata.sequencer_id,
metadata.partition_id,
metadata.object_store_id,
);
store.to_object_store(data, &path).await.unwrap();
(parquet_md, file_size)
.expect("persisting parquet file should succeed");
(meta.thrift_bytes().to_vec(), file_size)
}
/// A test parquet file of the catalog

View File

@ -31,3 +31,4 @@ tokio = { version = "1.18", features = ["macros", "parking_lot", "rt", "rt-multi
uuid = { version = "0.8", features = ["v4"] }
zstd = "0.11"
workspace-hack = { path = "../workspace-hack"}
thiserror = "1.0.31"

View File

@ -1,61 +1,20 @@
//! A metadata summary of a Parquet file in object storage, with the ability to
//! download & execute a scan.
use crate::{
metadata::{DecodedIoxParquetMetaData, IoxMetadata, IoxParquetMetaData},
storage::ParquetStorage,
ParquetFilePath,
};
use data_types::{
ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use observability_deps::tracing::*;
use predicate::Predicate;
use schema::{selection::Selection, Schema, TIME_COLUMN_NAME};
use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, mem, sync::Arc};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Table '{}' not found in chunk", table_name))]
NamedTableNotFoundInChunk { table_name: String },
#[snafu(display("Failed to read parquet: {}", source))]
ReadParquet { source: crate::storage::Error },
#[snafu(display("Failed to select columns: {}", source))]
SelectColumns { source: schema::Error },
#[snafu(
display("Cannot decode parquet metadata from {:?}: {}", path, source),
visibility(pub)
)]
MetadataDecodeFailed {
source: crate::metadata::Error,
path: ParquetFilePath,
},
#[snafu(
display("Cannot read schema from {:?}: {}", path, source),
visibility(pub)
)]
SchemaReadFailed {
source: crate::metadata::Error,
path: ParquetFilePath,
},
#[snafu(
display("Cannot read statistics from {:?}: {}", path, source),
visibility(pub)
)]
StatisticsReadFailed {
source: crate::metadata::Error,
path: ParquetFilePath,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
#[allow(missing_copy_implementations)]
#[allow(missing_copy_implementations, missing_docs)]
pub struct ChunkMetrics {
// Placeholder
}
@ -69,11 +28,14 @@ impl ChunkMetrics {
Self {}
}
/// This constructor builds nothing.
pub fn new(_metrics: &metric::Registry) -> Self {
Self {}
}
}
/// A abstract representation of a Parquet file in object storage, with
/// associated metadata.
#[derive(Debug)]
pub struct ParquetChunk {
/// Meta data of the table
@ -89,15 +51,15 @@ pub struct ParquetChunk {
/// Persists the parquet file within a database's relative path
store: ParquetStorage,
/// Path in the database's object store.
path: ParquetFilePath,
/// Size of the data, in object store
file_size_bytes: usize,
/// Parquet metadata that can be used checkpoint the catalog state.
parquet_metadata: Arc<IoxParquetMetaData>,
/// The [`IoxMetadata`] data from the [`DecodedParquetFile`].
iox_metadata: IoxMetadata,
/// Number of rows
rows: usize,
@ -112,15 +74,6 @@ impl ParquetChunk {
metrics: ChunkMetrics,
store: ParquetStorage,
) -> Self {
let iox_metadata = &decoded_parquet_file.iox_metadata;
let path = ParquetFilePath::new(
iox_metadata.namespace_id,
iox_metadata.table_id,
iox_metadata.sequencer_id,
iox_metadata.partition_id,
iox_metadata.object_store_id,
);
let decoded = decoded_parquet_file
.parquet_metadata
.as_ref()
@ -138,19 +91,14 @@ impl ParquetChunk {
schema,
timestamp_min_max,
store,
path,
file_size_bytes,
parquet_metadata: Arc::clone(&decoded_parquet_file.parquet_metadata),
iox_metadata: decoded_parquet_file.iox_metadata.clone(),
rows,
metrics,
}
}
/// Return object store path for this chunk
pub fn path(&self) -> &ParquetFilePath {
&self.path
}
/// Returns the summary statistics for this chunk
pub fn table_summary(&self) -> &Arc<TableSummary> {
&self.table_summary
@ -162,7 +110,7 @@ impl ParquetChunk {
mem::size_of::<Self>()
+ self.table_summary.size()
+ mem::size_of_val(&self.schema.as_ref())
+ mem::size_of_val(&self.path)
+ mem::size_of_val(&self.iox_metadata)
+ self.parquet_metadata.size()
}
@ -171,8 +119,8 @@ impl ParquetChunk {
Arc::clone(&self.schema)
}
// Return true if this chunk contains values within the time
// range, or if the range is `None`.
/// Return true if this chunk contains values within the time range, or if
/// the range is `None`.
pub fn has_timerange(&self, timestamp_range: Option<&TimestampRange>) -> bool {
match (self.timestamp_min_max, timestamp_range) {
(Some(timestamp_min_max), Some(timestamp_range)) => {
@ -185,7 +133,7 @@ impl ParquetChunk {
}
}
// Return the columns names that belong to the given column selection
/// Return the columns names that belong to the given column selection
pub fn column_names(&self, selection: Selection<'_>) -> Option<BTreeSet<String>> {
let fields = self.schema.inner().fields().iter();
@ -208,16 +156,13 @@ impl ParquetChunk {
&self,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream> {
trace!(path=?self.path, "fetching parquet data for filtered read");
self.store
.read_filter(
predicate,
selection,
Arc::clone(&self.schema.as_arrow()),
self.path,
)
.context(ReadParquetSnafu)
) -> Result<SendableRecordBatchStream, crate::storage::ReadError> {
self.store.read_filter(
predicate,
selection,
Arc::clone(&self.schema.as_arrow()),
&self.iox_metadata,
)
}
/// The total number of rows in all row groups in this chunk.
@ -252,8 +197,10 @@ fn extract_range(table_summary: &TableSummary) -> Option<TimestampMinMax> {
None
})
}
// Parquet file with decoded metadata.
/// Parquet file with decoded metadata.
#[derive(Debug)]
#[allow(missing_docs)]
pub struct DecodedParquetFile {
pub parquet_file: ParquetFile,
pub parquet_metadata: Arc<IoxParquetMetaData>,
@ -262,6 +209,7 @@ pub struct DecodedParquetFile {
}
impl DecodedParquetFile {
/// initialise a [`DecodedParquetFile`] from the provided file & metadata.
pub fn new(parquet_file_with_metadata: ParquetFileWithMetadata) -> Self {
let (parquet_file, parquet_metadata) = parquet_file_with_metadata.split_off_metadata();
let parquet_metadata = Arc::new(IoxParquetMetaData::from_thrift_bytes(parquet_metadata));

View File

@ -1,3 +1,5 @@
//! Parquet file generation, storage, and metadata implementations.
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
@ -5,11 +7,17 @@
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
clippy::clone_on_ref_ptr,
unreachable_pub,
missing_docs,
clippy::todo,
clippy::dbg_macro
)]
#![allow(clippy::missing_docs_in_private_items)]
pub mod chunk;
pub mod metadata;
pub mod serialise;
pub mod storage;
use data_types::{NamespaceId, PartitionId, SequencerId, TableId};
@ -71,6 +79,18 @@ impl From<&Self> for ParquetFilePath {
}
}
impl From<&crate::metadata::IoxMetadata> for ParquetFilePath {
fn from(m: &crate::metadata::IoxMetadata) -> Self {
Self {
namespace_id: m.namespace_id,
table_id: m.table_id,
sequencer_id: m.sequencer_id,
partition_id: m.partition_id,
object_store_id: m.object_store_id,
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -127,6 +127,7 @@ pub const METADATA_VERSION: u32 = 10;
pub const METADATA_KEY: &str = "IOX:metadata";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Cannot read parquet metadata from bytes: {}", source))]
ParquetMetaDataRead {
@ -225,6 +226,8 @@ pub enum Error {
#[snafu(display("Cannot parse UUID: {}", source))]
UuidParse { source: uuid::Error },
}
#[allow(missing_docs)]
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// IOx-specific metadata.
@ -385,7 +388,7 @@ impl IoxMetadata {
uuid == self.object_store_id
}
// create a corresponding iox catalog's ParquetFile
/// Create a corresponding iox catalog's ParquetFile
pub fn to_parquet_file(
&self,
file_size_bytes: usize,
@ -434,7 +437,7 @@ fn decode_timestamp_from_field(
}
/// Parquet metadata with IOx-specific wrapper.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct IoxParquetMetaData {
/// [Apache Parquet] metadata as freestanding [Apache Thrift]-encoded, and [Zstandard]-compressed bytes.
///
@ -523,20 +526,7 @@ impl IoxParquetMetaData {
};
// step 2: serialize the thrift struct into bytes
let mut buffer = Vec::new();
{
let mut protocol = TCompactOutputProtocol::new(&mut buffer);
thrift_file_metadata
.write_to_out_protocol(&mut protocol)
.context(ThriftWriteFailureSnafu {})?;
protocol.flush().context(ThriftWriteFailureSnafu {})?;
}
// step 3: compress data
// Note: level 0 is the zstd-provided default
let buffer = zstd::encode_all(&buffer[..], 0).context(ZstdEncodeFailureSnafu)?;
Ok(buffer)
Self::try_from(thrift_file_metadata).map(|opt| opt.data)
}
/// Decode [Apache Parquet] metadata from [Apache Thrift]-encoded, and [Zstandard]-compressed bytes.
@ -589,6 +579,26 @@ impl IoxParquetMetaData {
}
}
impl TryFrom<parquet_format::FileMetaData> for IoxParquetMetaData {
type Error = Error;
fn try_from(v: parquet_format::FileMetaData) -> Result<Self, Self::Error> {
let mut buffer = Vec::new();
{
let mut protocol = TCompactOutputProtocol::new(&mut buffer);
v.write_to_out_protocol(&mut protocol)
.context(ThriftWriteFailureSnafu {})?;
protocol.flush().context(ThriftWriteFailureSnafu {})?;
}
// step 3: compress data
// Note: level 0 is the zstd-provided default
let buffer = zstd::encode_all(&buffer[..], 0).context(ZstdEncodeFailureSnafu)?;
Ok(Self::from_thrift_bytes(buffer))
}
}
/// Parquet metadata with IOx-specific wrapper, in decoded form.
#[derive(Debug)]
pub struct DecodedIoxParquetMetaData {
@ -841,6 +851,11 @@ fn extract_iox_statistics(
#[cfg(test)]
mod tests {
use arrow::{
array::{ArrayRef, StringBuilder},
record_batch::RecordBatch,
};
use super::*;
#[test]
@ -874,4 +889,50 @@ mod tests {
assert_eq!(iox_metadata, iox_metadata_again);
}
#[tokio::test]
async fn test_metadata_from_parquet_metadata() {
let meta = IoxMetadata {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(42),
namespace_id: NamespaceId::new(1),
namespace_name: "bananas".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
row_count: 1000,
compaction_level: 1,
sort_key: None,
};
let mut builder = StringBuilder::new(1);
builder.append_value("bananas").expect("appending string");
let data: ArrayRef = Arc::new(builder.finish());
let batch = RecordBatch::try_from_iter([("a", data)]).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let (bytes, file_meta) = crate::serialise::to_parquet_bytes(stream, &meta)
.await
.expect("should serialise");
// Read the metadata from the file bytes.
//
// This is quite wordy...
let iox_parquet_meta = IoxParquetMetaData::from_file_bytes(Arc::new(bytes))
.expect("should decode")
.expect("should contain metadata");
// And read the metadata directly from the file metadata returned from
// encoding.
let file_meta = IoxParquetMetaData::try_from(file_meta)
.expect("failed to decode IoxParquetMetaData from file metadata");
assert_eq!(file_meta, iox_parquet_meta);
}
}

View File

@ -0,0 +1,232 @@
//! Streaming [`RecordBatch`] / Parquet file encoder routines.
use std::{ops::DerefMut, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use futures::{pin_mut, Stream, StreamExt};
use parquet::{
arrow::ArrowWriter,
basic::Compression,
errors::ParquetError,
file::{
metadata::KeyValue,
properties::WriterProperties,
writer::{InMemoryWriteableCursor, ParquetWriter},
},
};
use thiserror::Error;
use crate::metadata::{IoxMetadata, METADATA_KEY};
/// [`RecordBatch`] to Parquet serialisation errors.
#[derive(Debug, Error)]
pub enum CodecError {
/// The result stream contained no batches.
#[error("no record batches to convert")]
NoRecordBatches,
/// The codec could not infer the schema for the stream as the first stream
/// item contained an [`ArrowError`].
#[error("failed to peek record stream schema")]
SchemaPeek,
/// An arrow error during the plan execution.
#[error(transparent)]
Arrow(#[from] ArrowError),
/// Serialising the [`IoxMetadata`] to protobuf-encoded bytes failed.
#[error("failed to serialise iox metadata: {0}")]
MetadataSerialisation(#[from] prost::EncodeError),
/// Writing the parquet file failed with the specified error.
#[error("failed to build parquet file: {0}")]
Writer(#[from] ParquetError),
/// Attempting to clone a handle to the provided write sink failed.
#[error("failed to obtain writer handle clone: {0}")]
CloneSink(std::io::Error),
}
/// An IOx-specific, streaming [`RecordBatch`] to parquet file encoder.
///
/// This encoder discovers the schema from the first item in `batches`, and
/// encodes each [`RecordBatch`] one by one into `W`. All [`RecordBatch`]
/// yielded by the stream must be of the same schema, or this call will return
/// an error.
///
/// IOx metadata is encoded into the parquet file's metadata under the key
/// [`METADATA_KEY`], with a base64-wrapped, protobuf serialised
/// [`proto::IoxMetadata`] structure.
///
/// Returns the serialised [`FileMetaData`] for the encoded parquet file, from
/// which an [`IoxParquetMetaData`] can be derived.
///
/// [`proto::IoxMetadata`]: generated_types::influxdata::iox::ingester::v1
/// [`FileMetaData`]: parquet_format::FileMetaData
/// [`IoxParquetMetaData`]: crate::metadata::IoxParquetMetaData
pub async fn to_parquet<S, W, T>(
batches: S,
meta: &IoxMetadata,
sink: W,
) -> Result<parquet_format::FileMetaData, CodecError>
where
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
W: DerefMut<Target = T> + Send,
T: ParquetWriter + Send + 'static,
{
// Let the caller pass a mutable ref to something that impls ParquetWriter
// while still providing the necessary owned impl to the ArrowWriter.
let sink = sink.deref().try_clone().map_err(CodecError::CloneSink)?;
let stream = batches.peekable();
pin_mut!(stream);
// Peek into the stream and extract the schema from the first record batch.
//
// The ArrowWriter::write() call will return an error if any subsequent
// batch does not match this schema, enforcing schema uniformity.
let schema = stream
.as_mut()
.peek()
.await
.ok_or(CodecError::NoRecordBatches)?
.as_ref()
.ok()
.map(|v| v.schema())
.ok_or(CodecError::SchemaPeek)?;
// Serialise the IoxMetadata to the protobuf bytes.
let props = writer_props(meta)?;
// Construct the arrow serialiser with the metadata as part of the parquet
// file properties.
let mut writer = ArrowWriter::try_new(sink, Arc::clone(&schema), Some(props))?;
while let Some(maybe_batch) = stream.next().await {
writer.write(&maybe_batch?)?;
}
writer.close().map_err(CodecError::from)
}
/// A helper function that calls [`to_parquet()`], serialising the parquet file
/// into an in-memory buffer and returning the resulting bytes.
pub async fn to_parquet_bytes<S>(
batches: S,
meta: &IoxMetadata,
) -> Result<(Vec<u8>, parquet_format::FileMetaData), CodecError>
where
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
{
let mut w = InMemoryWriteableCursor::default();
// Serialise the record batches into the in-memory buffer
let meta = to_parquet(batches, meta, &mut w).await?;
let mut bytes = w
.into_inner()
.expect("mem writer has outstanding reference");
bytes.shrink_to_fit();
Ok((bytes, meta))
}
/// Helper to construct [`WriterProperties`] for the [`ArrowWriter`],
/// serialising the given [`IoxMetadata`] and embedding it as a key=value
/// property keyed by [`METADATA_KEY`].
fn writer_props(meta: &IoxMetadata) -> Result<WriterProperties, prost::EncodeError> {
let bytes = meta.to_protobuf()?;
let builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue {
key: METADATA_KEY.to_string(),
value: Some(base64::encode(&bytes)),
}]))
.set_compression(Compression::ZSTD);
Ok(builder.build())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::{ArrayRef, StringBuilder};
use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId};
use iox_time::Time;
use parquet::{
arrow::{ArrowReader, ParquetFileArrowReader},
file::serialized_reader::{SerializedFileReader, SliceableCursor},
};
use crate::metadata::IoxParquetMetaData;
use super::*;
#[tokio::test]
async fn test_encode_stream() {
let meta = IoxMetadata {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(42),
namespace_id: NamespaceId::new(1),
namespace_name: "bananas".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
row_count: 1000,
compaction_level: 1,
sort_key: None,
};
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let stream = futures::stream::iter([Ok(batch.clone())]);
let (bytes, _file_meta) = to_parquet_bytes(stream, &meta)
.await
.expect("should serialise");
// Read the metadata from the file bytes.
//
// This is quite wordy...
let iox_parquet_meta = IoxParquetMetaData::from_file_bytes(Arc::new(bytes.clone()))
.expect("should decode")
.expect("should contain metadata")
.decode()
.expect("should decode IOx metadata")
.read_iox_metadata_new()
.expect("should read IOxMetadata");
assert_eq!(iox_parquet_meta, meta);
// Read the parquet file back to arrow records
let cursor = SliceableCursor::new(bytes);
let file_reader = SerializedFileReader::new(cursor).expect("should init reader");
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let mut record_batches = arrow_reader
.get_record_reader(100)
.expect("should read")
.into_iter()
.collect::<Vec<_>>();
assert_eq!(record_batches.len(), 1);
assert_eq!(
record_batches.pop().unwrap().expect("should be OK batch"),
batch
);
}
fn to_string_array(strs: &[&str]) -> ArrayRef {
let mut builder = StringBuilder::new(strs.len());
for s in strs {
builder.append_value(s).expect("appending string");
}
Arc::new(builder.finish())
}
}

View File

@ -1,8 +1,9 @@
//! This module is responsible for writing the given data to the specified object store and reading
//! it back.
//! This module is responsible for writing the given data to the specified
//! object store and reading it back.
use crate::{
metadata::{IoxMetadata, METADATA_KEY},
metadata::{IoxMetadata, IoxParquetMetaData},
serialise::{self, CodecError},
ParquetFilePath,
};
use arrow::{
@ -13,228 +14,141 @@ use arrow::{
use bytes::Bytes;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::{AdapterStream, AutoAbortJoinHandle};
use futures::{stream, StreamExt};
use futures::Stream;
use object_store::{DynObjectStore, GetResult};
use observability_deps::tracing::*;
use parking_lot::Mutex;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::SerializedFileReader;
use parquet::{
self,
arrow::ArrowWriter,
basic::Compression,
file::{metadata::KeyValue, properties::WriterProperties, writer::TryClone},
arrow::{ArrowReader, ParquetFileArrowReader},
file::reader::SerializedFileReader,
};
use predicate::Predicate;
use schema::selection::Selection;
use snafu::{OptionExt, ResultExt, Snafu};
use std::{
io::{Cursor, Seek, SeekFrom, Write},
io::{Seek, Write},
sync::Arc,
};
use thiserror::Error;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error opening Parquet Writer: {}", source))]
OpeningParquetWriter {
source: parquet::errors::ParquetError,
},
/// Errors returned during a Parquet "put" operation, covering [`RecordBatch`]
/// pull from the provided stream, encoding, and finally uploading the bytes to
/// the object store.
#[derive(Debug, Error)]
pub enum UploadError {
/// A codec failure during serialisation.
#[error(transparent)]
Serialise(#[from] CodecError),
#[snafu(display("Error reading stream while creating snapshot: {}", source))]
ReadingStream { source: ArrowError },
/// An error during Parquet metadata conversion when attempting to
/// instantiate a valid [`IoxParquetMetaData`] instance.
#[error("failed to construct IOx parquet metadata: {0}")]
Metadata(crate::metadata::Error),
#[snafu(display("Error writing Parquet to memory: {}", source))]
WritingParquetToMemory {
source: parquet::errors::ParquetError,
},
#[snafu(display("Error closing Parquet Writer: {}", source))]
ClosingParquetWriter {
source: parquet::errors::ParquetError,
},
#[snafu(display("Error writing to object store: {}", source))]
WritingToObjectStore { source: object_store::Error },
#[snafu(display("Error converting to vec[u8]: Nothing else should have a reference here"))]
WritingToMemWriter {},
#[snafu(display("Error opening temp file: {}", source))]
OpenTempFile { source: std::io::Error },
#[snafu(display("Error writing to temp file: {}", source))]
WriteTempFile { source: std::io::Error },
#[snafu(display("Error reading data from object store: {}", source))]
ReadingObjectStore { source: object_store::Error },
#[snafu(display("Cannot extract Parquet metadata from byte array: {}", source))]
ExtractingMetadataFailure { source: crate::metadata::Error },
#[snafu(display("Cannot encode metadata: {}", source))]
MetadataEncodeFailure { source: prost::EncodeError },
#[snafu(display("Error reading parquet: {}", source))]
ParquetReader {
source: parquet::errors::ParquetError,
},
#[snafu(display("No data to convert to parquet"))]
NoData {},
/// Uploading the Parquet file to object store failed.
#[error("failed to upload to object storage: {0}")]
Upload(#[from] object_store::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Errors during Parquet file download & scan.
#[derive(Debug, Error)]
pub enum ReadError {
/// Failed to create the temporary Parquet file on disk to which the
/// downloaded parquet bytes will be spilled.
#[error("failed to create temporary file: {0}")]
TempFile(std::io::Error),
/// Error writing the bytes fetched from object store to the temporary
/// parquet file on disk.
#[error("i/o error writing downloaded parquet: {0}")]
IO(#[from] std::io::Error),
/// An error fetching Parquet file bytes from object store.
#[error("failed to read data from object store: {0}")]
ObjectStore(#[from] object_store::Error),
/// An error reading the downloaded Parquet file.
#[error("invalid parquet file: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
}
/// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an
/// underlying [`ObjectStore`].
///
/// [`RecordBatch`] instances are serialised to Parquet files, with IOx specific
/// metadata ([`IoxParquetMetaData`]) attached.
///
/// Code that interacts with Parquet files in object storage should utilise this
/// type that encapsulates the storage & retrieval implementation.
///
/// [`ObjectStore`]: object_store::ObjectStore
#[derive(Debug, Clone)]
pub struct ParquetStorage {
object_store: Arc<DynObjectStore>,
}
impl ParquetStorage {
/// Initialise a new [`ParquetStorage`] using `object_store` as the
/// persistence layer.
pub fn new(object_store: Arc<DynObjectStore>) -> Self {
Self { object_store }
}
fn writer_props(&self, metadata_bytes: &[u8]) -> WriterProperties {
let builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![KeyValue {
key: METADATA_KEY.to_string(),
value: Some(base64::encode(&metadata_bytes)),
}]))
.set_compression(Compression::ZSTD);
builder.build()
}
/// Convert the given metadata and RecordBatches to parquet file bytes. Used by `ingester`.
pub async fn parquet_bytes(
/// Push `batches`, a stream of [`RecordBatch`] instances, to object
/// storage.
pub async fn upload<S>(
&self,
record_batches: Vec<RecordBatch>,
schema: SchemaRef,
metadata: &IoxMetadata,
) -> Result<Vec<u8>> {
let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?;
batches: S,
meta: &IoxMetadata,
) -> Result<(IoxParquetMetaData, usize), UploadError>
where
S: Stream<Item = Result<RecordBatch, ArrowError>> + Send,
{
// Stream the record batches into a parquet file.
//
// It would be nice to stream the encoded parquet to disk for this and
// eliminate the buffering in memory, but the lack of a streaming object
// store put negates any benefit of spilling to disk.
//
// This is not a huge concern, as the resulting parquet files are
// currently smallish on average.
let (data, parquet_file_meta) = serialise::to_parquet_bytes(batches, meta).await?;
let mut stream = Box::pin(stream::iter(record_batches.into_iter().map(Ok)));
// Read the IOx-specific parquet metadata from the file metadata
let parquet_meta =
IoxParquetMetaData::try_from(parquet_file_meta).map_err(UploadError::Metadata)?;
let props = self.writer_props(&metadata_bytes);
// Derive the correct object store path from the metadata.
let path = ParquetFilePath::from(meta).object_store_path();
let mem_writer = MemWriter::default();
{
let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props))
.context(OpeningParquetWriterSnafu)?;
let mut no_stream_data = true;
while let Some(batch) = stream.next().await {
no_stream_data = false;
let batch = batch.context(ReadingStreamSnafu)?;
writer.write(&batch).context(WritingParquetToMemorySnafu)?;
}
if no_stream_data {
return Ok(vec![]);
}
writer.close().context(ClosingParquetWriterSnafu)?;
} // drop the reference to the MemWriter that the SerializedFileWriter has
let file_size = data.len();
self.object_store.put(&path, Bytes::from(data)).await?;
mem_writer.into_inner().context(WritingToMemWriterSnafu)
Ok((parquet_meta, file_size))
}
/// Put the given vector of bytes to the specified location
pub async fn to_object_store(&self, data: Vec<u8>, path: &ParquetFilePath) -> Result<()> {
let data = Bytes::from(data);
let path = path.object_store_path();
self.object_store
.put(&path, data)
.await
.context(WritingToObjectStoreSnafu)
}
/// Return indices of the schema's fields of the selection columns
pub fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec<usize> {
let fields = schema.fields().iter();
match selection {
Selection::Some(cols) => fields
.enumerate()
.filter_map(|(p, x)| {
if cols.contains(&x.name().as_str()) {
Some(p)
} else {
None
}
})
.collect(),
Selection::All => fields.enumerate().map(|(p, _)| p).collect(),
}
}
/// Downloads the specified parquet file to a local temporary file
/// and uses the `[ParquetExec`]
/// Pull the Parquet-encoded [`RecordBatch`] at the file path derived from
/// the provided [`ParquetFilePath`].
///
/// The resulting record batches from Parquet are sent back to `tx`
fn download_and_scan_parquet(
projection: Vec<usize>,
path: ParquetFilePath,
object_store: Arc<DynObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
) -> Result<()> {
// Size of each batch
let batch_size = 1024; // Todo: make a constant or policy for this
let path = path.object_store_path();
let read_stream = futures::executor::block_on(object_store.get(&path))
.context(ReadingObjectStoreSnafu)?;
let file = match read_stream {
GetResult::File(f, _) => {
trace!(?path, "Using file directly");
futures::executor::block_on(f.into_std())
}
GetResult::Stream(read_stream) => {
// read parquet file to local file
let mut file = tempfile::tempfile().context(OpenTempFileSnafu)?;
trace!(?path, ?file, "Beginning to read parquet to temp file");
for bytes in futures::executor::block_on_stream(read_stream) {
let bytes = bytes.context(ReadingObjectStoreSnafu)?;
trace!(len = bytes.len(), "read bytes from object store");
file.write_all(&bytes).context(WriteTempFileSnafu)?;
}
file.rewind().context(WriteTempFileSnafu)?;
trace!(?path, "Completed read parquet to tempfile");
file
}
};
let file_reader = SerializedFileReader::new(file).context(ParquetReaderSnafu)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let record_batch_reader = arrow_reader
.get_record_reader_by_columns(projection, batch_size)
.context(ParquetReaderSnafu)?;
for batch in record_batch_reader {
if tx.blocking_send(batch).is_err() {
debug!(?path, "Receiver hung up - exiting");
break;
}
}
debug!(?path, "Completed parquet download & scan");
Ok(())
}
/// The `selection` projection is pushed down to the Parquet deserialiser.
///
/// This impl fetches the associated Parquet file bytes from object storage,
/// temporarily persisting them to a local temp file to feed to the arrow
/// reader.
///
/// No caching is performed by `read_filter()`, and each call to
/// `read_filter()` will re-download the parquet file unless the underlying
/// object store impl caches the fetched bytes.
pub fn read_filter(
&self,
_predicate: &Predicate,
selection: Selection<'_>,
schema: SchemaRef,
path: ParquetFilePath,
) -> Result<SendableRecordBatchStream> {
meta: &IoxMetadata,
) -> Result<SendableRecordBatchStream, ReadError> {
let path = ParquetFilePath::from(meta).object_store_path();
trace!(path=?path, "fetching parquet data for filtered read");
// Indices of columns in the schema needed to read
let projection: Vec<usize> = Self::column_indices(selection, Arc::clone(&schema));
let projection: Vec<usize> = column_indices(selection, Arc::clone(&schema));
// Compute final (output) schema after selection
let schema = Arc::new(Schema::new(
@ -253,7 +167,7 @@ impl ParquetStorage {
let object_store = Arc::clone(&self.object_store);
move || {
let download_result =
Self::download_and_scan_parquet(projection, path, object_store, tx.clone());
download_and_scan_parquet(projection, path, object_store, tx.clone());
// If there was an error returned from download_and_scan_parquet send it back to the receiver.
if let Err(e) = download_result {
@ -276,44 +190,154 @@ impl ParquetStorage {
}
}
#[derive(Debug, Default, Clone)]
pub struct MemWriter {
mem: Arc<Mutex<Cursor<Vec<u8>>>>,
}
/// Return indices of the schema's fields of the selection columns
fn column_indices(selection: Selection<'_>, schema: SchemaRef) -> Vec<usize> {
let fields = schema.fields().iter();
impl MemWriter {
/// Returns the inner buffer as long as there are no other references to the
/// Arc.
pub fn into_inner(self) -> Option<Vec<u8>> {
Arc::try_unwrap(self.mem)
.ok()
.map(|mutex| mutex.into_inner().into_inner())
match selection {
Selection::Some(cols) => fields
.enumerate()
.filter_map(|(p, x)| {
if cols.contains(&x.name().as_str()) {
Some(p)
} else {
None
}
})
.collect(),
Selection::All => fields.enumerate().map(|(p, _)| p).collect(),
}
}
impl Write for MemWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut inner = self.mem.lock();
inner.write(buf)
/// Downloads the specified parquet file to a local temporary file
/// and push the [`RecordBatch`] contents over `tx`, projecting the specified
/// column indexes.
///
/// This call MAY download a parquet file from object storage, temporarily
/// spilling it to disk while it is processed.
fn download_and_scan_parquet(
projection: Vec<usize>,
path: object_store::path::Path,
object_store: Arc<DynObjectStore>,
tx: tokio::sync::mpsc::Sender<ArrowResult<RecordBatch>>,
) -> Result<(), ReadError> {
// Size of each batch
let batch_size = 1024; // Todo: make a constant or policy for this
let read_stream = futures::executor::block_on(object_store.get(&path))?;
let file = match read_stream {
GetResult::File(f, _) => {
trace!(?path, "Using file directly");
futures::executor::block_on(f.into_std())
}
GetResult::Stream(read_stream) => {
// read parquet file to local file
let mut file = tempfile::tempfile().map_err(ReadError::TempFile)?;
trace!(?path, ?file, "Beginning to read parquet to temp file");
for bytes in futures::executor::block_on_stream(read_stream) {
let bytes = bytes?;
trace!(len = bytes.len(), "read bytes from object store");
file.write_all(&bytes)?;
}
file.rewind()?;
trace!(?path, "Completed read parquet to tempfile");
file
}
};
let file_reader = SerializedFileReader::new(file)?;
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
let record_batch_reader = arrow_reader.get_record_reader_by_columns(projection, batch_size)?;
for batch in record_batch_reader {
if tx.blocking_send(batch).is_err() {
debug!(?path, "Receiver hung up - exiting");
break;
}
}
fn flush(&mut self) -> std::io::Result<()> {
let mut inner = self.mem.lock();
inner.flush()
}
debug!(?path, "Completed parquet download & scan");
Ok(())
}
impl Seek for MemWriter {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let mut inner = self.mem.lock();
inner.seek(pos)
}
}
#[cfg(test)]
mod tests {
use super::*;
impl TryClone for MemWriter {
fn try_clone(&self) -> std::io::Result<Self> {
Ok(Self {
mem: Arc::clone(&self.mem),
})
use arrow::array::{ArrayRef, StringBuilder};
use data_types::{NamespaceId, PartitionId, SequenceNumber, SequencerId, TableId};
use iox_time::Time;
#[tokio::test]
async fn test_parquet_round_trip() {
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::default());
let store = ParquetStorage::new(object_store);
let meta = IoxMetadata {
object_store_id: Default::default(),
creation_timestamp: Time::from_timestamp_nanos(42),
namespace_id: NamespaceId::new(1),
namespace_name: "bananas".into(),
sequencer_id: SequencerId::new(2),
table_id: TableId::new(3),
table_name: "platanos".into(),
partition_id: PartitionId::new(4),
partition_key: "potato".into(),
time_of_first_write: Time::from_timestamp_nanos(4242),
time_of_last_write: Time::from_timestamp_nanos(424242),
min_sequence_number: SequenceNumber::new(10),
max_sequence_number: SequenceNumber::new(11),
row_count: 1000,
compaction_level: 1,
sort_key: None,
};
let batch = RecordBatch::try_from_iter([("a", to_string_array(&["value"]))]).unwrap();
let schema = batch.schema();
let stream = futures::stream::iter([Ok(batch.clone())]);
// Serialise & upload the record batches.
let (file_meta, _file_size) = store
.upload(stream, &meta)
.await
.expect("should serialise and store sucessfully");
// Extract the various bits of metadata.
let file_meta = file_meta.decode().expect("should decode parquet metadata");
let got_iox_meta = file_meta
.read_iox_metadata_new()
.expect("should read IOx metadata from parquet meta");
// Ensure the metadata in the file decodes to the same IOx metadata we
// provided when uploading.
assert_eq!(got_iox_meta, meta);
// Fetch the record batches and compare them to the input batches.
let rx = store
.read_filter(&Predicate::default(), Selection::All, schema, &meta)
.expect("should read record batches from object store");
// Drain the retrieved record batch stream
let mut got = datafusion::physical_plan::common::collect(rx)
.await
.expect("failed to drain record stream");
// And compare to the original input
assert_eq!(got.len(), 1);
assert_eq!(got.pop().unwrap(), batch);
}
fn to_string_array(strs: &[&str]) -> ArrayRef {
let mut builder = StringBuilder::new(strs.len());
for s in strs {
builder.append_value(s).expect("appending string");
}
Arc::new(builder.finish())
}
}

View File

@ -13,7 +13,7 @@ use std::sync::Arc;
pub enum Error {
#[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))]
ParquetFileChunkError {
source: parquet_file::chunk::Error,
source: parquet_file::storage::ReadError,
chunk_id: ChunkId,
},
}