fix: Capture the right 'no data' while parquet has no data

pull/24376/head
Nga Tran 2021-12-03 12:19:48 -05:00
parent 2c04215c1f
commit 152281e428
5 changed files with 37 additions and 18 deletions

View File

@ -26,6 +26,9 @@ pub enum Error {
path: ParquetFilePath,
},
#[snafu(display("No row groups from parquet file ({:?})", path))]
NoRowGroups { path: ParquetFilePath },
#[snafu(display("Cannot add file to transaction: {}", source))]
FileRecordFailure {
source: crate::interface::CatalogStateAddError,
@ -144,7 +147,12 @@ async fn read_parquet(
let file_size_bytes = data.len();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(data)
.context(MetadataReadFailure { path: path.clone() })?;
.context(MetadataReadFailure { path: path.clone() })?; // Error reading metadata
if parquet_metadata.is_none() {
return NoRowGroups { path: path.clone() }.fail();
} // No data and hence no metadata
let parquet_metadata = parquet_metadata.unwrap();
// validate IOxMetadata
parquet_metadata
@ -418,7 +426,9 @@ mod tests {
} // drop the reference to the MemWriter that the SerializedFileWriter has
let data = mem_writer.into_inner().unwrap();
let md = IoxParquetMetaData::from_file_bytes(data.clone()).unwrap();
let md = IoxParquetMetaData::from_file_bytes(data.clone())
.unwrap()
.unwrap();
let storage = Storage::new(Arc::clone(iox_object_store));
let chunk_addr = ChunkAddr {
db_name: Arc::clone(db_name),

View File

@ -482,12 +482,16 @@ pub struct IoxParquetMetaData {
impl IoxParquetMetaData {
/// Read parquet metadata from a parquet file.
pub fn from_file_bytes(data: Vec<u8>) -> Result<Self> {
pub fn from_file_bytes(data: Vec<u8>) -> Result<Option<Self>> {
let cursor = SliceableCursor::new(data);
let reader = SerializedFileReader::new(cursor).context(ParquetMetaDataRead {})?;
let parquet_md = reader.metadata().clone();
if parquet_md.num_row_groups() == 0 {
return Ok(None);
}
let data = Self::parquet_md_to_thrift(parquet_md)?;
Ok(Self::from_thrift_bytes(data))
Ok(Some(Self::from_thrift_bytes(data)))
}
/// Read parquet metadata from thrift bytes.

View File

@ -155,16 +155,19 @@ impl Storage {
let schema = stream.schema();
let data = Self::parquet_stream_to_bytes(stream, schema, metadata).await?;
// TODO: make this work w/o cloning the byte vector (https://github.com/influxdata/influxdb_iox/issues/1504)
if data.is_empty() {
return Ok(None);
}
let file_size_bytes = data.len();
let md =
IoxParquetMetaData::from_file_bytes(data.clone()).context(ExtractingMetadataFailure)?;
// No data
if md.is_none() {
return Ok(None);
}
self.to_object_store(data, &path).await?;
Ok(Some((path, file_size_bytes, md)))
Ok(Some((path, file_size_bytes, md.unwrap())))
}
fn writer_props(metadata_bytes: &[u8]) -> WriterProperties {
@ -482,7 +485,7 @@ mod tests {
.unwrap();
// extract metadata
let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap();
let md = IoxParquetMetaData::from_file_bytes(bytes).unwrap().unwrap();
let metadata_roundtrip = md.decode().unwrap().read_iox_metadata().unwrap();
// compare with input
@ -602,7 +605,9 @@ mod tests {
let parquet_data = load_parquet_from_store(&chunk, Arc::clone(generator.store()))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
.unwrap()
.unwrap();
let decoded = parquet_metadata.decode().unwrap();
//
// 1. Check metadata at file level: Everything is correct

View File

@ -2111,7 +2111,9 @@ mod tests {
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
.unwrap()
.unwrap();
// Read metadata at file level
let schema = parquet_metadata.decode().unwrap().read_schema().unwrap();
// Read data
@ -2238,7 +2240,9 @@ mod tests {
load_parquet_from_store_for_path(&path_list[0], Arc::clone(&db.iox_object_store))
.await
.unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone()).unwrap();
let parquet_metadata = IoxParquetMetaData::from_file_bytes(parquet_data.clone())
.unwrap()
.unwrap();
// Read metadata at file level
let schema = parquet_metadata.decode().unwrap().read_schema().unwrap();
// Read data

View File

@ -821,7 +821,6 @@ mod tests {
assert_eq!(summary_chunks[1].row_count, 2);
}
#[ignore]
#[tokio::test]
async fn test_compact_os_on_chunk_delete_all() {
test_helpers::maybe_start_logging();
@ -869,15 +868,12 @@ mod tests {
// compact the only OS chunk
let partition = partition.upgrade();
let chunk1 = chunks[0].write();
let compacted_chunk = compact_object_store_chunks(partition, vec![chunk1])
compact_object_store_chunks(partition, vec![chunk1])
.unwrap()
.1
.await
.unwrap()
.unwrap();
//.unwrap();
let err = compacted_chunk.unwrap_err();
println!("{}", err.to_string());
// verify results
let partition = db.partition("cpu", partition_key).unwrap();