fix: fix NULL handling in parquet stats

pull/24376/head
Marco Neumann 2021-06-14 14:24:09 +02:00
parent eae56630fb
commit d6f6ddfdaa
2 changed files with 167 additions and 53 deletions

View File

@ -457,8 +457,8 @@ fn extract_iox_statistics(
match (parquet_stats, iox_type) {
(ParquetStatistics::Boolean(stats), InfluxColumnType::Field(InfluxFieldType::Boolean)) => {
Ok(Statistics::Bool(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
min: min_max_set.then(|| *stats.min()),
max: min_max_set.then(|| *stats.max()),
distinct_count: parquet_stats
.distinct_count()
.and_then(|x| x.try_into().ok()),
@ -467,8 +467,8 @@ fn extract_iox_statistics(
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => {
Ok(Statistics::I64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
min: min_max_set.then(|| *stats.min()),
max: min_max_set.then(|| *stats.max()),
distinct_count: parquet_stats
.distinct_count()
.and_then(|x| x.try_into().ok()),
@ -477,8 +477,8 @@ fn extract_iox_statistics(
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => {
Ok(Statistics::U64(StatValues {
min: Some(*stats.min() as u64),
max: Some(*stats.max() as u64),
min: min_max_set.then(|| *stats.min() as u64),
max: min_max_set.then(|| *stats.max() as u64),
distinct_count: parquet_stats
.distinct_count()
.and_then(|x| x.try_into().ok()),
@ -508,26 +508,30 @@ fn extract_iox_statistics(
(ParquetStatistics::ByteArray(stats), InfluxColumnType::Tag)
| (ParquetStatistics::ByteArray(stats), InfluxColumnType::Field(InfluxFieldType::String)) => {
Ok(Statistics::String(StatValues {
min: Some(
stats
.min()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})?
.to_string(),
),
max: Some(
stats
.max()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})?
.to_string(),
),
min: min_max_set
.then(|| {
stats
.min()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})
.map(|x| x.to_string())
})
.transpose()?,
max: min_max_set
.then(|| {
stats
.max()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})
.map(|x| x.to_string())
})
.transpose()?,
distinct_count: parquet_stats
.distinct_count()
.and_then(|x| x.try_into().ok()),

View File

@ -167,7 +167,7 @@ pub async fn make_chunk_given_record_batch(
fn create_column_tag(
name: &str,
data: Vec<Vec<&str>>,
data: Vec<Vec<Option<&str>>>,
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: &mut SchemaBuilder,
@ -184,9 +184,19 @@ fn create_column_tag(
name: name.to_string(),
influxdb_type: Some(InfluxDbType::Tag),
stats: Statistics::String(StatValues {
min: Some(data.iter().flatten().min().unwrap().to_string()),
max: Some(data.iter().flatten().max().unwrap().to_string()),
count: data.iter().map(Vec::len).sum::<usize>() as u64,
min: data
.iter()
.flatten()
.filter_map(|x| x.as_ref())
.min()
.map(|x| x.to_string()),
max: data
.iter()
.flatten()
.filter_map(|x| x.as_ref())
.max()
.map(|x| x.to_string()),
count: data.iter().flatten().filter_map(|x| x.as_ref()).count() as u64,
distinct_count: None,
}),
});
@ -196,7 +206,7 @@ fn create_column_tag(
fn create_column_field_string(
name: &str,
data: Vec<Vec<&str>>,
data: Vec<Vec<Option<&str>>>,
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: &mut SchemaBuilder,
@ -214,8 +224,8 @@ fn create_column_field_string(
distinct_count,
}| {
Statistics::String(StatValues {
min: Some(min.unwrap().to_string()),
max: Some(max.unwrap().to_string()),
min: min.map(|x| x.to_string()),
max: max.map(|x| x.to_string()),
distinct_count,
count,
})
@ -225,7 +235,7 @@ fn create_column_field_string(
fn create_column_field_i64(
name: &str,
data: Vec<Vec<i64>>,
data: Vec<Vec<Option<i64>>>,
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: &mut SchemaBuilder,
@ -242,7 +252,7 @@ fn create_column_field_i64(
fn create_column_field_u64(
name: &str,
data: Vec<Vec<u64>>,
data: Vec<Vec<Option<u64>>>,
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: &mut SchemaBuilder,
@ -301,7 +311,7 @@ fn create_column_field_f64(
fn create_column_field_bool(
name: &str,
data: Vec<Vec<bool>>,
data: Vec<Vec<Option<bool>>>,
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: &mut SchemaBuilder,
@ -318,14 +328,14 @@ fn create_column_field_bool(
fn create_column_field_generic<A, T, F>(
name: &str,
data: Vec<Vec<T>>,
data: Vec<Vec<Option<T>>>,
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: &mut SchemaBuilder,
f: F,
) where
A: 'static + Array,
A: From<Vec<T>>,
A: From<Vec<Option<T>>>,
T: Clone + Ord,
F: Fn(StatValues<T>) -> Statistics,
{
@ -342,9 +352,19 @@ fn create_column_field_generic<A, T, F>(
name: name.to_string(),
influxdb_type: Some(InfluxDbType::Field),
stats: f(StatValues {
min: data.iter().flatten().min().cloned(),
max: data.iter().flatten().max().cloned(),
count: data.iter().map(Vec::len).sum::<usize>() as u64,
min: data
.iter()
.flatten()
.filter_map(|x| x.as_ref())
.min()
.cloned(),
max: data
.iter()
.flatten()
.filter_map(|x| x.as_ref())
.max()
.cloned(),
count: data.iter().flatten().filter_map(|x| x.as_ref()).count() as u64,
distinct_count: None,
}),
});
@ -400,15 +420,33 @@ pub fn make_record_batch(
// tag
create_column_tag(
&format!("{}_tag_nonempty", column_prefix),
vec![vec!["foo"], vec!["bar"], vec!["baz", "foo"]],
&format!("{}_tag_normal", column_prefix),
vec![
vec![Some("foo")],
vec![Some("bar")],
vec![Some("baz"), Some("foo")],
],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_tag(
&format!("{}_tag_empty", column_prefix),
vec![vec![""], vec![""], vec!["", ""]],
vec![vec![Some("")], vec![Some("")], vec![Some(""), Some("")]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_tag(
&format!("{}_tag_null_some", column_prefix),
vec![vec![None], vec![Some("bar")], vec![Some("baz"), None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_tag(
&format!("{}_tag_null_all", column_prefix),
vec![vec![None], vec![None], vec![None, None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
@ -416,15 +454,33 @@ pub fn make_record_batch(
// field: string
create_column_field_string(
&format!("{}_field_string_nonempty", column_prefix),
vec![vec!["foo"], vec!["bar"], vec!["baz", "foo"]],
&format!("{}_field_string_normal", column_prefix),
vec![
vec![Some("foo")],
vec![Some("bar")],
vec![Some("baz"), Some("foo")],
],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_string(
&format!("{}_field_string_empty", column_prefix),
vec![vec![""], vec![""], vec!["", ""]],
vec![vec![Some("")], vec![Some("")], vec![Some(""), Some("")]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_string(
&format!("{}_field_string_null_some", column_prefix),
vec![vec![None], vec![Some("bar")], vec![Some("baz"), None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_string(
&format!("{}_field_string_null_all", column_prefix),
vec![vec![None], vec![None], vec![None, None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
@ -433,14 +489,32 @@ pub fn make_record_batch(
// field: i64
create_column_field_i64(
&format!("{}_field_i64_normal", column_prefix),
vec![vec![-1], vec![2], vec![3, 4]],
vec![vec![Some(-1)], vec![Some(2)], vec![Some(3), Some(4)]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_i64(
&format!("{}_field_i64_range", column_prefix),
vec![vec![i64::MIN], vec![i64::MAX], vec![i64::MIN, i64::MAX]],
vec![
vec![Some(i64::MIN)],
vec![Some(i64::MAX)],
vec![Some(i64::MIN), Some(i64::MAX)],
],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_i64(
&format!("{}_field_i64_null_some", column_prefix),
vec![vec![None], vec![Some(2)], vec![Some(3), None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_i64(
&format!("{}_field_i64_null_all", column_prefix),
vec![vec![None], vec![None], vec![None, None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
@ -449,14 +523,32 @@ pub fn make_record_batch(
// field: u64
create_column_field_u64(
&format!("{}_field_u64_normal", column_prefix),
vec![vec![1u64], vec![2], vec![3, 4]],
vec![vec![Some(1u64)], vec![Some(2)], vec![Some(3), Some(4)]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_u64(
&format!("{}_field_u64_range", column_prefix),
vec![vec![u64::MIN], vec![u64::MAX], vec![u64::MIN, u64::MAX]],
vec![
vec![Some(u64::MIN)],
vec![Some(u64::MAX)],
vec![Some(u64::MIN), Some(u64::MAX)],
],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_u64(
&format!("{}_field_u64_null_some", column_prefix),
vec![vec![None], vec![Some(2)], vec![Some(3), None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_u64(
&format!("{}_field_u64_null_all", column_prefix),
vec![vec![None], vec![None], vec![None, None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
@ -539,8 +631,26 @@ pub fn make_record_batch(
// field: bool
create_column_field_bool(
&format!("{}_field_bool", column_prefix),
vec![vec![true], vec![false], vec![true, false]],
&format!("{}_field_bool_normal", column_prefix),
vec![
vec![Some(true)],
vec![Some(false)],
vec![Some(true), Some(false)],
],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_bool(
&format!("{}_field_bool_null_some", column_prefix),
vec![vec![None], vec![Some(false)], vec![Some(true), None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,
);
create_column_field_bool(
&format!("{}_field_bool_null_all", column_prefix),
vec![vec![None], vec![None], vec![None, None]],
&mut arrow_cols,
&mut summaries,
&mut schema_builder,