From f4d715474658184443079a321db89caf8c8ce0b5 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 11 May 2021 16:58:01 +0200 Subject: [PATCH] fix: table summaries must include timestamp as well --- parquet_file/src/metadata.rs | 112 ++++++++++++++++++----------------- parquet_file/src/utils.rs | 20 +++++-- 2 files changed, 74 insertions(+), 58 deletions(-) diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 0f0e9e4ee1..4553fdf8fd 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -252,26 +252,25 @@ fn read_statistics_from_parquet_row_group( let count = (row_group.num_rows().max(0) as u64).saturating_sub(parquet_stats.null_count()); - match extract_iox_statistics( + let (stats, maybe_tsrange) = extract_iox_statistics( parquet_stats, iox_type, count, row_group_idx, field.name(), - )? { - ExtractedStatistics::Statistics(stats) => column_summaries.push(ColumnSummary { - name: field.name().clone(), - influxdb_type: Some(match iox_type { - InfluxColumnType::Tag => InfluxDbType::Tag, - InfluxColumnType::Field(_) => InfluxDbType::Field, - InfluxColumnType::Timestamp => InfluxDbType::Timestamp, - }), - stats, + )?; + column_summaries.push(ColumnSummary { + name: field.name().clone(), + influxdb_type: Some(match iox_type { + InfluxColumnType::Tag => InfluxDbType::Tag, + InfluxColumnType::Field(_) => InfluxDbType::Field, + InfluxColumnType::Timestamp => InfluxDbType::Timestamp, }), - ExtractedStatistics::TimestampRange(range) => { - assert!(timestamp_range.is_none()); - timestamp_range = Some(range); - } + stats, + }); + if let Some(range) = maybe_tsrange { + assert!(timestamp_range.is_none()); + timestamp_range = Some(range); } } } @@ -284,15 +283,6 @@ fn read_statistics_from_parquet_row_group( Ok((table_summary, timestamp_range)) } -/// Result of [`extract_iox_statistics`]. -enum ExtractedStatistics { - /// Found statistics. - Statistics(Statistics), - - /// Found timestamp range. - TimestampRange(TimestampRange), -} - /// Extract IOx statistics from parquet statistics. /// /// This is required because upstream does not have a mapper from parquet statistics back to arrow or Rust native types. @@ -302,48 +292,61 @@ fn extract_iox_statistics( count: u64, row_group_idx: usize, column_name: &str, -) -> Result { +) -> Result<(Statistics, Option)> { match (parquet_stats, iox_type) { (ParquetStatistics::Boolean(stats), InfluxColumnType::Field(InfluxFieldType::Boolean)) => { - Ok(ExtractedStatistics::Statistics(Statistics::Bool( - StatValues { + Ok(( + Statistics::Bool(StatValues { min: Some(*stats.min()), max: Some(*stats.max()), count, - }, - ))) + }), + None, + )) + } + (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => { + Ok(( + Statistics::I64(StatValues { + min: Some(*stats.min()), + max: Some(*stats.max()), + count, + }), + None, + )) } - (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => Ok( - ExtractedStatistics::Statistics(Statistics::I64(StatValues { - min: Some(*stats.min()), - max: Some(*stats.max()), - count, - })), - ), (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => { // TODO: that's very likely wrong, but blocked by https://github.com/apache/arrow-rs/issues/254 - Ok(ExtractedStatistics::Statistics(Statistics::U64( - StatValues { + Ok(( + Statistics::U64(StatValues { min: Some(*stats.min() as u64), max: Some(*stats.max() as u64), count, - }, - ))) + }), + None, + )) } - (ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => Ok( - ExtractedStatistics::Statistics(Statistics::F64(StatValues { + (ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => { + Ok(( + Statistics::F64(StatValues { + min: Some(*stats.min()), + max: Some(*stats.max()), + count, + }), + None, + )) + } + (ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => Ok(( + Statistics::I64(StatValues { min: Some(*stats.min()), max: Some(*stats.max()), count, - })), - ), - (ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => Ok( - ExtractedStatistics::TimestampRange(TimestampRange::new(*stats.min(), *stats.max())), - ), + }), + Some(TimestampRange::new(*stats.min(), *stats.max())), + )), (ParquetStatistics::ByteArray(stats), InfluxColumnType::Tag) | (ParquetStatistics::ByteArray(stats), InfluxColumnType::Field(InfluxFieldType::String)) => { - Ok(ExtractedStatistics::Statistics(Statistics::String( - StatValues { + Ok(( + Statistics::String(StatValues { min: Some( stats .min() @@ -365,8 +368,9 @@ fn extract_iox_statistics( .to_string(), ), count, - }, - ))) + }), + None, + )) } _ => Err(Error::StatisticsTypeMismatch { row_group: row_group_idx, @@ -586,9 +590,9 @@ mod tests { 0 ); - // column count in summary misses the timestamp column + // column count in summary including the timestamp column assert_eq!( - chunk.table_summaries().first().unwrap().columns.len() + 1, + chunk.table_summaries().first().unwrap().columns.len(), parquet_metadata .file_metadata() .schema_descr() @@ -625,9 +629,9 @@ mod tests { ); assert_eq!(parquet_metadata.file_metadata().num_rows(), 0); - // column count in summary misses the timestamp column + // column count in summary including the timestamp column assert_eq!( - chunk.table_summaries().first().unwrap().columns.len() + 1, + chunk.table_summaries().first().unwrap().columns.len(), parquet_metadata .file_metadata() .schema_descr() diff --git a/parquet_file/src/utils.rs b/parquet_file/src/utils.rs index 41aaacf29a..ad12ceda7f 100644 --- a/parquet_file/src/utils.rs +++ b/parquet_file/src/utils.rs @@ -369,6 +369,7 @@ where fn create_column_timestamp( data: Vec>, arrow_cols: &mut Vec>, + summaries: &mut Vec, schema_builder: SchemaBuilder, ) -> (SchemaBuilder, TimestampRange) { assert_eq!(data.len(), arrow_cols.len()); @@ -379,10 +380,20 @@ fn create_column_timestamp( arrow_cols_sub.push((TIME_COLUMN_NAME.to_string(), Arc::clone(&array), true)); } - let timestamp_range = TimestampRange::new( - *data.iter().flatten().min().unwrap(), - *data.iter().flatten().max().unwrap(), - ); + let min = data.iter().flatten().min().cloned(); + let max = data.iter().flatten().max().cloned(); + + summaries.push(ColumnSummary { + name: TIME_COLUMN_NAME.to_string(), + influxdb_type: Some(InfluxDbType::Timestamp), + stats: Statistics::I64(StatValues { + min, + max, + count: data.iter().map(Vec::len).sum::() as u64, + }), + }); + + let timestamp_range = TimestampRange::new(min.unwrap(), max.unwrap()); let schema_builder = schema_builder.timestamp(); (schema_builder, timestamp_range) @@ -523,6 +534,7 @@ pub fn make_record_batch( let (schema_builder, timestamp_range) = create_column_timestamp( vec![vec![1000], vec![2000], vec![3000, 4000]], &mut arrow_cols, + &mut summaries, schema_builder, );