feat: return parquet metadata from `write_to_object_store`

pull/24376/head
Marco Neumann 2021-05-11 10:47:22 +02:00
parent 07db4932ee
commit 5969caccb0
2 changed files with 11 additions and 3 deletions

View File

@ -41,6 +41,8 @@ use std::{
};
use tokio_stream::wrappers::ReceiverStream;
use crate::metadata::read_parquet_metadata_from_file;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error opening Parquet Writer: {}", source))]
@ -103,6 +105,9 @@ pub enum Error {
IoxFromArrowFailure {
source: internal_types::schema::Error,
},
#[snafu(display("Cannot extract Parquet metadata from byte array: {}", source))]
ExtractingMetadataFailure { source: crate::metadata::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -177,15 +182,18 @@ impl Storage {
chunk_id: u32,
table_name: String,
stream: SendableRecordBatchStream,
) -> Result<Path> {
) -> Result<(Path, ParquetMetaData)> {
// Create full path location of this file in object store
let path = self.location(partition_key, chunk_id, table_name);
let schema = stream.schema();
let data = Self::parquet_stream_to_bytes(stream, schema).await?;
// TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504)
let md =
read_parquet_metadata_from_file(data.clone()).context(ExtractingMetadataFailure)?;
self.to_object_store(data, &path).await?;
Ok(path.clone())
Ok((path.clone(), md))
}
/// Convert the given stream of RecordBatches to bytes

View File

@ -169,7 +169,7 @@ async fn make_chunk_common(
} else {
Box::pin(MemoryStream::new(record_batches))
};
let path = storage
let (path, _metadata) = storage
.write_to_object_store(
part_key.to_string(),
chunk_id,