fix: table summaries must include timestamp as well

pull/24376/head
Marco Neumann 2021-05-11 16:58:01 +02:00
parent ff004c1a51
commit f4d7154746
2 changed files with 74 additions and 58 deletions

View File

@ -252,26 +252,25 @@ fn read_statistics_from_parquet_row_group(
let count =
(row_group.num_rows().max(0) as u64).saturating_sub(parquet_stats.null_count());
match extract_iox_statistics(
let (stats, maybe_tsrange) = extract_iox_statistics(
parquet_stats,
iox_type,
count,
row_group_idx,
field.name(),
)? {
ExtractedStatistics::Statistics(stats) => column_summaries.push(ColumnSummary {
name: field.name().clone(),
influxdb_type: Some(match iox_type {
InfluxColumnType::Tag => InfluxDbType::Tag,
InfluxColumnType::Field(_) => InfluxDbType::Field,
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
}),
stats,
)?;
column_summaries.push(ColumnSummary {
name: field.name().clone(),
influxdb_type: Some(match iox_type {
InfluxColumnType::Tag => InfluxDbType::Tag,
InfluxColumnType::Field(_) => InfluxDbType::Field,
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
}),
ExtractedStatistics::TimestampRange(range) => {
assert!(timestamp_range.is_none());
timestamp_range = Some(range);
}
stats,
});
if let Some(range) = maybe_tsrange {
assert!(timestamp_range.is_none());
timestamp_range = Some(range);
}
}
}
@ -284,15 +283,6 @@ fn read_statistics_from_parquet_row_group(
Ok((table_summary, timestamp_range))
}
/// Result of [`extract_iox_statistics`].
enum ExtractedStatistics {
/// Found statistics.
Statistics(Statistics),
/// Found timestamp range.
TimestampRange(TimestampRange),
}
/// Extract IOx statistics from parquet statistics.
///
/// This is required because upstream does not have a mapper from parquet statistics back to arrow or Rust native types.
@ -302,48 +292,61 @@ fn extract_iox_statistics(
count: u64,
row_group_idx: usize,
column_name: &str,
) -> Result<ExtractedStatistics> {
) -> Result<(Statistics, Option<TimestampRange>)> {
match (parquet_stats, iox_type) {
(ParquetStatistics::Boolean(stats), InfluxColumnType::Field(InfluxFieldType::Boolean)) => {
Ok(ExtractedStatistics::Statistics(Statistics::Bool(
StatValues {
Ok((
Statistics::Bool(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
},
)))
}),
None,
))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => {
Ok((
Statistics::I64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}),
None,
))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => Ok(
ExtractedStatistics::Statistics(Statistics::I64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
})),
),
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => {
// TODO: that's very likely wrong, but blocked by https://github.com/apache/arrow-rs/issues/254
Ok(ExtractedStatistics::Statistics(Statistics::U64(
StatValues {
Ok((
Statistics::U64(StatValues {
min: Some(*stats.min() as u64),
max: Some(*stats.max() as u64),
count,
},
)))
}),
None,
))
}
(ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => Ok(
ExtractedStatistics::Statistics(Statistics::F64(StatValues {
(ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => {
Ok((
Statistics::F64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}),
None,
))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => Ok((
Statistics::I64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
})),
),
(ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => Ok(
ExtractedStatistics::TimestampRange(TimestampRange::new(*stats.min(), *stats.max())),
),
}),
Some(TimestampRange::new(*stats.min(), *stats.max())),
)),
(ParquetStatistics::ByteArray(stats), InfluxColumnType::Tag)
| (ParquetStatistics::ByteArray(stats), InfluxColumnType::Field(InfluxFieldType::String)) => {
Ok(ExtractedStatistics::Statistics(Statistics::String(
StatValues {
Ok((
Statistics::String(StatValues {
min: Some(
stats
.min()
@ -365,8 +368,9 @@ fn extract_iox_statistics(
.to_string(),
),
count,
},
)))
}),
None,
))
}
_ => Err(Error::StatisticsTypeMismatch {
row_group: row_group_idx,
@ -586,9 +590,9 @@ mod tests {
0
);
// column count in summary misses the timestamp column
// column count in summary including the timestamp column
assert_eq!(
chunk.table_summaries().first().unwrap().columns.len() + 1,
chunk.table_summaries().first().unwrap().columns.len(),
parquet_metadata
.file_metadata()
.schema_descr()
@ -625,9 +629,9 @@ mod tests {
);
assert_eq!(parquet_metadata.file_metadata().num_rows(), 0);
// column count in summary misses the timestamp column
// column count in summary including the timestamp column
assert_eq!(
chunk.table_summaries().first().unwrap().columns.len() + 1,
chunk.table_summaries().first().unwrap().columns.len(),
parquet_metadata
.file_metadata()
.schema_descr()

View File

@ -369,6 +369,7 @@ where
fn create_column_timestamp(
data: Vec<Vec<i64>>,
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: SchemaBuilder,
) -> (SchemaBuilder, TimestampRange) {
assert_eq!(data.len(), arrow_cols.len());
@ -379,10 +380,20 @@ fn create_column_timestamp(
arrow_cols_sub.push((TIME_COLUMN_NAME.to_string(), Arc::clone(&array), true));
}
let timestamp_range = TimestampRange::new(
*data.iter().flatten().min().unwrap(),
*data.iter().flatten().max().unwrap(),
);
let min = data.iter().flatten().min().cloned();
let max = data.iter().flatten().max().cloned();
summaries.push(ColumnSummary {
name: TIME_COLUMN_NAME.to_string(),
influxdb_type: Some(InfluxDbType::Timestamp),
stats: Statistics::I64(StatValues {
min,
max,
count: data.iter().map(Vec::len).sum::<usize>() as u64,
}),
});
let timestamp_range = TimestampRange::new(min.unwrap(), max.unwrap());
let schema_builder = schema_builder.timestamp();
(schema_builder, timestamp_range)
@ -523,6 +534,7 @@ pub fn make_record_batch(
let (schema_builder, timestamp_range) = create_column_timestamp(
vec![vec![1000], vec![2000], vec![3000, 4000]],
&mut arrow_cols,
&mut summaries,
schema_builder,
);