diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 403785603e..8507ad1e17 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -483,12 +483,13 @@ pub struct IoxParquetMetaData { impl IoxParquetMetaData { /// Read parquet metadata from a parquet file. pub fn from_file_bytes(data: Vec) -> Result> { + if data.is_empty() { + return Ok(None); + } + let cursor = SliceableCursor::new(data); let reader = SerializedFileReader::new(cursor).context(ParquetMetaDataRead {})?; let parquet_md = reader.metadata().clone(); - if parquet_md.num_row_groups() == 0 { - return Ok(None); - } let data = Self::parquet_md_to_thrift(parquet_md)?; Ok(Some(Self::from_thrift_bytes(data))) diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index a47fb601be..f858ed3d17 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -109,17 +109,17 @@ impl Storage { let schema = stream.schema(); let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?; + // no data + if data.is_empty() { + return Ok(None); + } + // TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504) let file_size_bytes = data.len(); let md = IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?; - // No data - if md.is_none() { - return Ok(None); - } - self.to_object_store(data, &path).await?; Ok(Some((path, file_size_bytes, md.unwrap()))) @@ -149,10 +149,15 @@ impl Storage { { let mut writer = ArrowWriter::try_new(mem_writer.clone(), schema, Some(props)) .context(OpeningParquetWriter)?; + let mut no_stream_data = true; while let Some(batch) = stream.next().await { + no_stream_data = false; let batch = batch.context(ReadingStream)?; writer.write(&batch).context(WritingParquetToMemory)?; } + if no_stream_data { + return Ok(vec![]); + } writer.close().context(ClosingParquetWriter)?; } // drop the reference to the MemWriter that the SerializedFileWriter has