diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 12ecefcf94..50f1341e90 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -41,6 +41,8 @@ use std::{ }; use tokio_stream::wrappers::ReceiverStream; +use crate::metadata::read_parquet_metadata_from_file; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Error opening Parquet Writer: {}", source))] @@ -103,6 +105,9 @@ pub enum Error { IoxFromArrowFailure { source: internal_types::schema::Error, }, + + #[snafu(display("Cannot extract Parquet metadata from byte array: {}", source))] + ExtractingMetadataFailure { source: crate::metadata::Error }, } pub type Result = std::result::Result; @@ -177,15 +182,18 @@ impl Storage { chunk_id: u32, table_name: String, stream: SendableRecordBatchStream, - ) -> Result { + ) -> Result<(Path, ParquetMetaData)> { // Create full path location of this file in object store let path = self.location(partition_key, chunk_id, table_name); let schema = stream.schema(); let data = Self::parquet_stream_to_bytes(stream, schema).await?; + // TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504) + let md = + read_parquet_metadata_from_file(data.clone()).context(ExtractingMetadataFailure)?; self.to_object_store(data, &path).await?; - Ok(path.clone()) + Ok((path.clone(), md)) } /// Convert the given stream of RecordBatches to bytes diff --git a/parquet_file/src/utils.rs b/parquet_file/src/utils.rs index be19234e22..41aaacf29a 100644 --- a/parquet_file/src/utils.rs +++ b/parquet_file/src/utils.rs @@ -169,7 +169,7 @@ async fn make_chunk_common( } else { Box::pin(MemoryStream::new(record_batches)) }; - let path = storage + let (path, _metadata) = storage .write_to_object_store( part_key.to_string(), chunk_id,