refactor: Share parquet_file::storage code between new and old metadata

pull/24376/head
Carol (Nichols || Goulding) 2022-01-28 13:11:54 -05:00
parent bf89162fa5
commit 8f81ce5501
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 33 additions and 29 deletions

View File

@ -27,7 +27,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Write the given data to the given location in the given object storage /// Write the given data to the given location in the given object storage
pub async fn persist( pub async fn persist(
metadata: &IoxMetadata, metadata: &IoxMetadata,
record_batches: &[RecordBatch], record_batches: Vec<RecordBatch>,
object_store: &ObjectStore, object_store: &ObjectStore,
) -> Result<()> { ) -> Result<()> {
if record_batches.is_empty() { if record_batches.is_empty() {
@ -120,7 +120,7 @@ mod tests {
}; };
let object_store = object_store(); let object_store = object_store();
persist(&metadata, &[], &object_store).await.unwrap(); persist(&metadata, vec![], &object_store).await.unwrap();
assert!(list_all(&object_store).await.unwrap().is_empty()); assert!(list_all(&object_store).await.unwrap().is_empty());
} }
@ -156,7 +156,7 @@ mod tests {
let object_store = object_store(); let object_store = object_store();
persist(&metadata, &batches, &object_store).await.unwrap(); persist(&metadata, batches, &object_store).await.unwrap();
let obj_store_paths = list_all(&object_store).await.unwrap(); let obj_store_paths = list_all(&object_store).await.unwrap();
assert_eq!(obj_store_paths.len(), 1); assert_eq!(obj_store_paths.len(), 1);

View File

@ -9,7 +9,7 @@ use bytes::Bytes;
use data_types::chunk_metadata::ChunkAddr; use data_types::chunk_metadata::ChunkAddr;
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::AdapterStream; use datafusion_util::AdapterStream;
use futures::StreamExt; use futures::{stream, Stream, StreamExt};
use iox_object_store::{IoxObjectStore, ParquetFilePath}; use iox_object_store::{IoxObjectStore, ParquetFilePath};
use object_store::GetResult; use object_store::GetResult;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
@ -27,6 +27,7 @@ use schema::selection::Selection;
use snafu::{OptionExt, ResultExt, Snafu}; use snafu::{OptionExt, ResultExt, Snafu};
use std::{ use std::{
io::{Cursor, Seek, SeekFrom, Write}, io::{Cursor, Seek, SeekFrom, Write},
marker::Unpin,
sync::Arc, sync::Arc,
}; };
@ -137,15 +138,40 @@ impl Storage {
.build() .build()
} }
/// Convert the given stream of RecordBatches to bytes /// Convert the given stream of RecordBatches to bytes. This should be deleted when switching
/// over to use `ingester` only.
async fn parquet_stream_to_bytes( async fn parquet_stream_to_bytes(
mut stream: SendableRecordBatchStream, stream: SendableRecordBatchStream,
schema: SchemaRef, schema: SchemaRef,
metadata: IoxMetadataOld, metadata: IoxMetadataOld,
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?; let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?;
let props = Self::writer_props(&metadata_bytes); Self::record_batches_to_parquet_bytes(stream, schema, &metadata_bytes).await
}
/// Convert the given metadata and RecordBatches to parquet file bytes. Used by `ingester`.
pub async fn parquet_bytes(
record_batches: Vec<RecordBatch>,
schema: SchemaRef,
metadata: &IoxMetadata,
) -> Result<Vec<u8>> {
let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?;
let stream = Box::pin(stream::iter(record_batches.into_iter().map(Ok)));
Self::record_batches_to_parquet_bytes(stream, schema, &metadata_bytes).await
}
/// Share code between `parquet_stream_to_bytes` and `parquet_bytes`. When
/// `parquet_stream_to_bytes` is deleted, this code can be moved into `parquet_bytes` and
/// made simpler by using a plain `Iter` rather than a `Stream`.
async fn record_batches_to_parquet_bytes(
mut stream: impl Stream<Item = ArrowResult<RecordBatch>> + Send + Sync + Unpin,
schema: SchemaRef,
metadata_bytes: &[u8],
) -> Result<Vec<u8>> {
let props = Self::writer_props(metadata_bytes);
let mem_writer = MemWriter::default(); let mem_writer = MemWriter::default();
{ {
@ -166,28 +192,6 @@ impl Storage {
mem_writer.into_inner().context(WritingToMemWriterSnafu) mem_writer.into_inner().context(WritingToMemWriterSnafu)
} }
/// Convert the given metadata and RecordBatches to parquet file bytes
pub async fn parquet_bytes(
record_batches: &[RecordBatch],
schema: SchemaRef,
metadata: &IoxMetadata,
) -> Result<Vec<u8>> {
let metadata_bytes = metadata.to_protobuf().context(MetadataEncodeFailureSnafu)?;
let props = Self::writer_props(&metadata_bytes);
let mem_writer = MemWriter::default();
{
let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props))
.context(OpeningParquetWriterSnafu)?;
for batch in record_batches {
writer.write(batch).context(WritingParquetToMemorySnafu)?;
}
writer.close().context(ClosingParquetWriterSnafu)?;
} // drop the reference to the MemWriter that the SerializedFileWriter has
mem_writer.into_inner().context(WritingToMemWriterSnafu)
}
/// Put the given vector of bytes to the specified location /// Put the given vector of bytes to the specified location
pub async fn to_object_store(&self, data: Vec<u8>, path: &ParquetFilePath) -> Result<()> { pub async fn to_object_store(&self, data: Vec<u8>, path: &ParquetFilePath) -> Result<()> {
let data = Bytes::from(data); let data = Bytes::from(data);