From 152281e428a255fa61d26948fcb2c450912afb0f Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 3 Dec 2021 12:19:48 -0500 Subject: [PATCH] fix: Capture the right 'no data' while parquet has no data --- parquet_catalog/src/rebuild.rs | 14 ++++++++++++-- parquet_file/src/metadata.rs | 8 ++++++-- parquet_file/src/storage.rs | 17 +++++++++++------ server/src/db.rs | 8 ++++++-- server/src/db/lifecycle/compact_object_store.rs | 8 ++------ 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/parquet_catalog/src/rebuild.rs b/parquet_catalog/src/rebuild.rs index 10f819bb9e..d8016ad5b3 100644 --- a/parquet_catalog/src/rebuild.rs +++ b/parquet_catalog/src/rebuild.rs @@ -26,6 +26,9 @@ pub enum Error { path: ParquetFilePath, }, + #[snafu(display("No row groups from parquet file ({:?})", path))] + NoRowGroups { path: ParquetFilePath }, + #[snafu(display("Cannot add file to transaction: {}", source))] FileRecordFailure { source: crate::interface::CatalogStateAddError, @@ -144,7 +147,12 @@ async fn read_parquet( let file_size_bytes = data.len(); let parquet_metadata = IoxParquetMetaData::from_file_bytes(data) - .context(MetadataReadFailure { path: path.clone() })?; + .context(MetadataReadFailure { path: path.clone() })?; // Error reading metadata + + if parquet_metadata.is_none() { + return NoRowGroups { path: path.clone() }.fail(); + } // No data and hence no metadata + let parquet_metadata = parquet_metadata.unwrap(); // validate IOxMetadata parquet_metadata @@ -418,7 +426,9 @@ mod tests { } // drop the reference to the MemWriter that the SerializedFileWriter has let data = mem_writer.into_inner().unwrap(); - let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap(); + let md = IoxParquetMetaData::from_file_bytes(data.clone()) + .unwrap() + .unwrap(); let storage = Storage::new(Arc::clone(iox_object_store)); let chunk_addr = ChunkAddr { db_name: Arc::clone(db_name), diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 47b1492829..5be5b1e9b6 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -482,12 +482,16 @@ pub struct IoxParquetMetaData { impl IoxParquetMetaData { /// Read parquet metadata from a parquet file. - pub fn from_file_bytes(data: Vec) -> Result { + pub fn from_file_bytes(data: Vec) -> Result> { let cursor = SliceableCursor::new(data); let reader = SerializedFileReader::new(cursor).context(ParquetMetaDataRead {})?; let parquet_md = reader.metadata().clone(); + if parquet_md.num_row_groups() == 0 { + return Ok(None); + } + let data = Self::parquet_md_to_thrift(parquet_md)?; - Ok(Self::from_thrift_bytes(data)) + Ok(Some(Self::from_thrift_bytes(data))) } /// Read parquet metadata from thrift bytes. diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 6f02cb77fa..e66ead6c8a 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -155,16 +155,19 @@ impl Storage { let schema = stream.schema(); let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?; // TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504) - if data.is_empty() { - return Ok(None); - } let file_size_bytes = data.len(); let md = IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?; + + // No data + if md.is_none() { + return Ok(None); + } + self.to_object_store(data, &path).await?; - Ok(Some((path, file_size_bytes, md))) + Ok(Some((path, file_size_bytes, md.unwrap()))) } fn writer_props(metadata_bytes: &[u8]) -> WriterProperties { @@ -482,7 +485,7 @@ mod tests { .unwrap(); // extract metadata - let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap(); + let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap().unwrap(); let metadata_roundtrip = md.decode().unwrap().read_iox_metadata().unwrap(); // compare with input @@ -602,7 +605,9 @@ mod tests { let parquet_data = load_parquet_from_store(&chunk, Arc::clone(generator.store())) .await .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap(); + let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()) + .unwrap() + .unwrap(); let decoded = parquet_metadata.decode().unwrap(); // // 1. Check metadata at file level: Everything is correct diff --git a/server/src/db.rs b/server/src/db.rs index f614911bdd..f8e48b1a0b 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2111,7 +2111,9 @@ mod tests { .await .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap(); + let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()) + .unwrap() + .unwrap(); // Read metadata at file level let schema = parquet_metadata.decode().unwrap().read_schema().unwrap(); // Read data @@ -2238,7 +2240,9 @@ mod tests { load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store)) .await .unwrap(); - let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap(); + let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()) + .unwrap() + .unwrap(); // Read metadata at file level let schema = parquet_metadata.decode().unwrap().read_schema().unwrap(); // Read data diff --git a/server/src/db/lifecycle/compact_object_store.rs b/server/src/db/lifecycle/compact_object_store.rs index 69f62bd85b..eb588f18e7 100644 --- a/server/src/db/lifecycle/compact_object_store.rs +++ b/server/src/db/lifecycle/compact_object_store.rs @@ -821,7 +821,6 @@ mod tests { assert_eq!(summary_chunks[1].row_count, 2); } - #[ignore] #[tokio::test] async fn test_compact_os_on_chunk_delete_all() { test_helpers::maybe_start_logging(); @@ -869,15 +868,12 @@ mod tests { // compact the only OS chunk let partition = partition.upgrade(); let chunk1 = chunks[0].write(); - let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1]) + compact_object_store_chunks(partition, vec![chunk1]) .unwrap() .1 .await + .unwrap() .unwrap(); - //.unwrap(); - - let err = compacted_chunk.unwrap_err(); - println!("{}", err.to_string()); // verify results let partition = db.partition("cpu", partition_key).unwrap();