From d6f6ddfdaa5d0622c83a07ac36ac96b0e5479899 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 14 Jun 2021 14:24:09 +0200 Subject: [PATCH] fix: fix NULL handling in parquet stats --- parquet_file/src/metadata.rs | 56 +++++------ parquet_file/src/test_utils.rs | 164 +++++++++++++++++++++++++++------ 2 files changed, 167 insertions(+), 53 deletions(-) diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 7fa5a8762b..0d10d2c772 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -457,8 +457,8 @@ fn extract_iox_statistics( match (parquet_stats, iox_type) { (ParquetStatistics::Boolean(stats), InfluxColumnType::Field(InfluxFieldType::Boolean)) => { Ok(Statistics::Bool(StatValues { - min: Some(*stats.min()), - max: Some(*stats.max()), + min: min_max_set.then(|| *stats.min()), + max: min_max_set.then(|| *stats.max()), distinct_count: parquet_stats .distinct_count() .and_then(|x| x.try_into().ok()), @@ -467,8 +467,8 @@ fn extract_iox_statistics( } (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => { Ok(Statistics::I64(StatValues { - min: Some(*stats.min()), - max: Some(*stats.max()), + min: min_max_set.then(|| *stats.min()), + max: min_max_set.then(|| *stats.max()), distinct_count: parquet_stats .distinct_count() .and_then(|x| x.try_into().ok()), @@ -477,8 +477,8 @@ fn extract_iox_statistics( } (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => { Ok(Statistics::U64(StatValues { - min: Some(*stats.min() as u64), - max: Some(*stats.max() as u64), + min: min_max_set.then(|| *stats.min() as u64), + max: min_max_set.then(|| *stats.max() as u64), distinct_count: parquet_stats .distinct_count() .and_then(|x| x.try_into().ok()), @@ -508,26 +508,30 @@ fn extract_iox_statistics( (ParquetStatistics::ByteArray(stats), InfluxColumnType::Tag) | (ParquetStatistics::ByteArray(stats), InfluxColumnType::Field(InfluxFieldType::String)) => { Ok(Statistics::String(StatValues { - min: Some( - stats - .min() - .as_utf8() - .context(StatisticsUtf8Error { - row_group: row_group_idx, - column: column_name.to_string(), - })? - .to_string(), - ), - max: Some( - stats - .max() - .as_utf8() - .context(StatisticsUtf8Error { - row_group: row_group_idx, - column: column_name.to_string(), - })? - .to_string(), - ), + min: min_max_set + .then(|| { + stats + .min() + .as_utf8() + .context(StatisticsUtf8Error { + row_group: row_group_idx, + column: column_name.to_string(), + }) + .map(|x| x.to_string()) + }) + .transpose()?, + max: min_max_set + .then(|| { + stats + .max() + .as_utf8() + .context(StatisticsUtf8Error { + row_group: row_group_idx, + column: column_name.to_string(), + }) + .map(|x| x.to_string()) + }) + .transpose()?, distinct_count: parquet_stats .distinct_count() .and_then(|x| x.try_into().ok()), diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index 9cb2111b69..28d25a0e26 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -167,7 +167,7 @@ pub async fn make_chunk_given_record_batch( fn create_column_tag( name: &str, - data: Vec>, + data: Vec>>, arrow_cols: &mut Vec>, summaries: &mut Vec, schema_builder: &mut SchemaBuilder, @@ -184,9 +184,19 @@ fn create_column_tag( name: name.to_string(), influxdb_type: Some(InfluxDbType::Tag), stats: Statistics::String(StatValues { - min: Some(data.iter().flatten().min().unwrap().to_string()), - max: Some(data.iter().flatten().max().unwrap().to_string()), - count: data.iter().map(Vec::len).sum::() as u64, + min: data + .iter() + .flatten() + .filter_map(|x| x.as_ref()) + .min() + .map(|x| x.to_string()), + max: data + .iter() + .flatten() + .filter_map(|x| x.as_ref()) + .max() + .map(|x| x.to_string()), + count: data.iter().flatten().filter_map(|x| x.as_ref()).count() as u64, distinct_count: None, }), }); @@ -196,7 +206,7 @@ fn create_column_tag( fn create_column_field_string( name: &str, - data: Vec>, + data: Vec>>, arrow_cols: &mut Vec>, summaries: &mut Vec, schema_builder: &mut SchemaBuilder, @@ -214,8 +224,8 @@ fn create_column_field_string( distinct_count, }| { Statistics::String(StatValues { - min: Some(min.unwrap().to_string()), - max: Some(max.unwrap().to_string()), + min: min.map(|x| x.to_string()), + max: max.map(|x| x.to_string()), distinct_count, count, }) @@ -225,7 +235,7 @@ fn create_column_field_string( fn create_column_field_i64( name: &str, - data: Vec>, + data: Vec>>, arrow_cols: &mut Vec>, summaries: &mut Vec, schema_builder: &mut SchemaBuilder, @@ -242,7 +252,7 @@ fn create_column_field_i64( fn create_column_field_u64( name: &str, - data: Vec>, + data: Vec>>, arrow_cols: &mut Vec>, summaries: &mut Vec, schema_builder: &mut SchemaBuilder, @@ -301,7 +311,7 @@ fn create_column_field_f64( fn create_column_field_bool( name: &str, - data: Vec>, + data: Vec>>, arrow_cols: &mut Vec>, summaries: &mut Vec, schema_builder: &mut SchemaBuilder, @@ -318,14 +328,14 @@ fn create_column_field_bool( fn create_column_field_generic( name: &str, - data: Vec>, + data: Vec>>, arrow_cols: &mut Vec>, summaries: &mut Vec, schema_builder: &mut SchemaBuilder, f: F, ) where A: 'static + Array, - A: From>, + A: From>>, T: Clone + Ord, F: Fn(StatValues) -> Statistics, { @@ -342,9 +352,19 @@ fn create_column_field_generic( name: name.to_string(), influxdb_type: Some(InfluxDbType::Field), stats: f(StatValues { - min: data.iter().flatten().min().cloned(), - max: data.iter().flatten().max().cloned(), - count: data.iter().map(Vec::len).sum::() as u64, + min: data + .iter() + .flatten() + .filter_map(|x| x.as_ref()) + .min() + .cloned(), + max: data + .iter() + .flatten() + .filter_map(|x| x.as_ref()) + .max() + .cloned(), + count: data.iter().flatten().filter_map(|x| x.as_ref()).count() as u64, distinct_count: None, }), }); @@ -400,15 +420,33 @@ pub fn make_record_batch( // tag create_column_tag( - &format!("{}_tag_nonempty", column_prefix), - vec![vec!["foo"], vec!["bar"], vec!["baz", "foo"]], + &format!("{}_tag_normal", column_prefix), + vec![ + vec![Some("foo")], + vec![Some("bar")], + vec![Some("baz"), Some("foo")], + ], &mut arrow_cols, &mut summaries, &mut schema_builder, ); create_column_tag( &format!("{}_tag_empty", column_prefix), - vec![vec![""], vec![""], vec!["", ""]], + vec![vec![Some("")], vec![Some("")], vec![Some(""), Some("")]], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_tag( + &format!("{}_tag_null_some", column_prefix), + vec![vec![None], vec![Some("bar")], vec![Some("baz"), None]], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_tag( + &format!("{}_tag_null_all", column_prefix), + vec![vec![None], vec![None], vec![None, None]], &mut arrow_cols, &mut summaries, &mut schema_builder, @@ -416,15 +454,33 @@ pub fn make_record_batch( // field: string create_column_field_string( - &format!("{}_field_string_nonempty", column_prefix), - vec![vec!["foo"], vec!["bar"], vec!["baz", "foo"]], + &format!("{}_field_string_normal", column_prefix), + vec![ + vec![Some("foo")], + vec![Some("bar")], + vec![Some("baz"), Some("foo")], + ], &mut arrow_cols, &mut summaries, &mut schema_builder, ); create_column_field_string( &format!("{}_field_string_empty", column_prefix), - vec![vec![""], vec![""], vec!["", ""]], + vec![vec![Some("")], vec![Some("")], vec![Some(""), Some("")]], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_string( + &format!("{}_field_string_null_some", column_prefix), + vec![vec![None], vec![Some("bar")], vec![Some("baz"), None]], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_string( + &format!("{}_field_string_null_all", column_prefix), + vec![vec![None], vec![None], vec![None, None]], &mut arrow_cols, &mut summaries, &mut schema_builder, @@ -433,14 +489,32 @@ pub fn make_record_batch( // field: i64 create_column_field_i64( &format!("{}_field_i64_normal", column_prefix), - vec![vec![-1], vec![2], vec![3, 4]], + vec![vec![Some(-1)], vec![Some(2)], vec![Some(3), Some(4)]], &mut arrow_cols, &mut summaries, &mut schema_builder, ); create_column_field_i64( &format!("{}_field_i64_range", column_prefix), - vec![vec![i64::MIN], vec![i64::MAX], vec![i64::MIN, i64::MAX]], + vec![ + vec![Some(i64::MIN)], + vec![Some(i64::MAX)], + vec![Some(i64::MIN), Some(i64::MAX)], + ], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_i64( + &format!("{}_field_i64_null_some", column_prefix), + vec![vec![None], vec![Some(2)], vec![Some(3), None]], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_i64( + &format!("{}_field_i64_null_all", column_prefix), + vec![vec![None], vec![None], vec![None, None]], &mut arrow_cols, &mut summaries, &mut schema_builder, @@ -449,14 +523,32 @@ pub fn make_record_batch( // field: u64 create_column_field_u64( &format!("{}_field_u64_normal", column_prefix), - vec![vec![1u64], vec![2], vec![3, 4]], + vec![vec![Some(1u64)], vec![Some(2)], vec![Some(3), Some(4)]], &mut arrow_cols, &mut summaries, &mut schema_builder, ); create_column_field_u64( &format!("{}_field_u64_range", column_prefix), - vec![vec![u64::MIN], vec![u64::MAX], vec![u64::MIN, u64::MAX]], + vec![ + vec![Some(u64::MIN)], + vec![Some(u64::MAX)], + vec![Some(u64::MIN), Some(u64::MAX)], + ], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_u64( + &format!("{}_field_u64_null_some", column_prefix), + vec![vec![None], vec![Some(2)], vec![Some(3), None]], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_u64( + &format!("{}_field_u64_null_all", column_prefix), + vec![vec![None], vec![None], vec![None, None]], &mut arrow_cols, &mut summaries, &mut schema_builder, @@ -539,8 +631,26 @@ pub fn make_record_batch( // field: bool create_column_field_bool( - &format!("{}_field_bool", column_prefix), - vec![vec![true], vec![false], vec![true, false]], + &format!("{}_field_bool_normal", column_prefix), + vec![ + vec![Some(true)], + vec![Some(false)], + vec![Some(true), Some(false)], + ], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_bool( + &format!("{}_field_bool_null_some", column_prefix), + vec![vec![None], vec![Some(false)], vec![Some(true), None]], + &mut arrow_cols, + &mut summaries, + &mut schema_builder, + ); + create_column_field_bool( + &format!("{}_field_bool_null_all", column_prefix), + vec![vec![None], vec![None], vec![None, None]], &mut arrow_cols, &mut summaries, &mut schema_builder,