refactor: make checking no data happen during reading inout stream

pull/24376/head
Nga Tran 2021-12-07 12:03:41 -05:00
parent c992c82582
commit 561c5ed8e7
2 changed files with 14 additions and 8 deletions

View File

@ -483,12 +483,13 @@ pub struct IoxParquetMetaData {
impl IoxParquetMetaData { impl IoxParquetMetaData {
/// Read parquet metadata from a parquet file. /// Read parquet metadata from a parquet file.
pub fn from_file_bytes(data: Vec<u8>) -> Result<Option<Self>> { pub fn from_file_bytes(data: Vec<u8>) -> Result<Option<Self>> {
if data.is_empty() {
return Ok(None);
}
let cursor = SliceableCursor::new(data); let cursor = SliceableCursor::new(data);
let reader = SerializedFileReader::new(cursor).context(ParquetMetaDataRead {})?; let reader = SerializedFileReader::new(cursor).context(ParquetMetaDataRead {})?;
let parquet_md = reader.metadata().clone(); let parquet_md = reader.metadata().clone();
if parquet_md.num_row_groups() == 0 {
return Ok(None);
}
let data = Self::parquet_md_to_thrift(parquet_md)?; let data = Self::parquet_md_to_thrift(parquet_md)?;
Ok(Some(Self::from_thrift_bytes(data))) Ok(Some(Self::from_thrift_bytes(data)))

View File

@ -109,17 +109,17 @@ impl Storage {
let schema = stream.schema(); let schema = stream.schema();
let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?; let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?;
// no data
if data.is_empty() {
return Ok(None);
}
// TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504) // TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504)
let file_size_bytes = data.len(); let file_size_bytes = data.len();
let md = let md =
IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?; IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?;
// No data
if md.is_none() {
return Ok(None);
}
self.to_object_store(data, &path).await?; self.to_object_store(data, &path).await?;
Ok(Some((path, file_size_bytes, md.unwrap()))) Ok(Some((path, file_size_bytes, md.unwrap())))
@ -149,10 +149,15 @@ impl Storage {
{ {
let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props)) let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props))
.context(OpeningParquetWriter)?; .context(OpeningParquetWriter)?;
let mut no_stream_data = true;
while let Some(batch) = stream.next().await { while let Some(batch) = stream.next().await {
no_stream_data = false;
let batch = batch.context(ReadingStream)?; let batch = batch.context(ReadingStream)?;
writer.write(&batch).context(WritingParquetToMemory)?; writer.write(&batch).context(WritingParquetToMemory)?;
} }
if no_stream_data {
return Ok(vec![]);
}
writer.close().context(ClosingParquetWriter)?; writer.close().context(ClosingParquetWriter)?;
} // drop the reference to the MemWriter that the SerializedFileWriter has } // drop the reference to the MemWriter that the SerializedFileWriter has