refactor: remove special case timestamp_range in parquet chunk (#1543)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-05-24 12:19:44 -04:00 committed by GitHub
parent 14ba25f86d
commit c464ffadad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 114 additions and 182 deletions

View File

@ -77,10 +77,9 @@ impl Chunk {
file_location: Path,
store: Arc<ObjectStore>,
schema: Schema,
range: Option<TimestampRange>,
metrics: ChunkMetrics,
) -> Self {
let table = Table::new(table_summary, file_location, store, schema, range);
let table = Table::new(table_summary, file_location, store, schema);
let mut chunk = Self {
partition_key: part_key.into(),
@ -130,11 +129,6 @@ impl Chunk {
})
}
/// Return the timestamp range of the table
pub fn timestamp_range(&self) -> Option<TimestampRange> {
self.table.timestamp_range()
}
// Return all tables of this chunk whose timestamp overlaps with the give one
pub fn table_names(
&self,

View File

@ -88,9 +88,8 @@
//! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
use std::sync::Arc;
use data_types::{
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
timestamp::TimestampRange,
use data_types::partition_metadata::{
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary,
};
use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use parquet::{
@ -193,44 +192,30 @@ pub fn read_statistics_from_parquet_metadata(
parquet_md: &ParquetMetaData,
schema: &Schema,
table_name: &str,
) -> Result<(TableSummary, Option<TimestampRange>)> {
) -> Result<TableSummary> {
let mut table_summary_agg: Option<TableSummary> = None;
let mut timestamp_range_agg = None;
for (row_group_idx, row_group) in parquet_md.row_groups().iter().enumerate() {
let (table_summary, timestamp_range) =
let table_summary =
read_statistics_from_parquet_row_group(row_group, row_group_idx, schema, table_name)?;
match table_summary_agg.as_mut() {
Some(existing) => existing.update_from(&table_summary),
None => table_summary_agg = Some(table_summary),
}
timestamp_range_agg = match (timestamp_range_agg, timestamp_range) {
(Some(a), Some(b)) => Some(TimestampRange {
start: a.start.min(b.start),
end: a.end.max(b.end),
}),
(Some(a), None) | (None, Some(a)) => Some(a),
(None, None) => None,
};
}
match table_summary_agg {
Some(table_summary) => Ok((table_summary, timestamp_range_agg)),
None => Err(Error::NoRowGroup {}),
}
table_summary_agg.context(NoRowGroup)
}
/// Read IOx statistics (including timestamp range) from parquet row group metadata.
/// Read IOx statistics from parquet row group metadata.
fn read_statistics_from_parquet_row_group(
row_group: &ParquetRowGroupMetaData,
row_group_idx: usize,
schema: &Schema,
table_name: &str,
) -> Result<(TableSummary, Option<TimestampRange>)> {
) -> Result<TableSummary> {
let mut column_summaries = vec![];
let mut timestamp_range = None;
for ((iox_type, field), column_chunk_metadata) in schema.iter().zip(row_group.columns()) {
if let Some(iox_type) = iox_type {
@ -252,7 +237,7 @@ fn read_statistics_from_parquet_row_group(
let count =
(row_group.num_rows().max(0) as u64).saturating_sub(parquet_stats.null_count());
let (stats, maybe_tsrange) = extract_iox_statistics(
let stats = extract_iox_statistics(
parquet_stats,
iox_type,
count,
@ -268,10 +253,6 @@ fn read_statistics_from_parquet_row_group(
}),
stats,
});
if let Some(range) = maybe_tsrange {
assert!(timestamp_range.is_none());
timestamp_range = Some(range);
}
}
}
@ -280,97 +261,83 @@ fn read_statistics_from_parquet_row_group(
columns: column_summaries,
};
Ok((table_summary, timestamp_range))
Ok(table_summary)
}
/// Extract IOx statistics from parquet statistics.
///
/// This is required because upstream does not have a mapper from parquet statistics back to arrow or Rust native types.
/// This is required because upstream does not have a mapper from
/// parquet statistics back to arrow or Rust native types.
fn extract_iox_statistics(
parquet_stats: &ParquetStatistics,
iox_type: InfluxColumnType,
count: u64,
row_group_idx: usize,
column_name: &str,
) -> Result<(Statistics, Option<TimestampRange>)> {
) -> Result<Statistics> {
match (parquet_stats, iox_type) {
(ParquetStatistics::Boolean(stats), InfluxColumnType::Field(InfluxFieldType::Boolean)) => {
Ok((
Statistics::Bool(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}),
None,
))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => {
Ok((
Statistics::I64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}),
None,
))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => {
// TODO: that's very likely wrong, but blocked by https://github.com/apache/arrow-rs/issues/254
Ok((
Statistics::U64(StatValues {
min: Some(*stats.min() as u64),
max: Some(*stats.max() as u64),
count,
}),
None,
))
}
(ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => {
Ok((
Statistics::F64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}),
None,
))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => Ok((
Statistics::I64(StatValues {
Ok(Statistics::Bool(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}),
Some(TimestampRange::new(*stats.min(), *stats.max())),
)),
}))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => {
Ok(Statistics::I64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => {
// TODO: Likely incorrect for large values until
// https://github.com/apache/arrow-rs/issues/254
Ok(Statistics::U64(StatValues {
min: Some(*stats.min() as u64),
max: Some(*stats.max() as u64),
count,
}))
}
(ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => {
Ok(Statistics::F64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}))
}
(ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => {
Ok(Statistics::I64(StatValues {
min: Some(*stats.min()),
max: Some(*stats.max()),
count,
}))
}
(ParquetStatistics::ByteArray(stats), InfluxColumnType::Tag)
| (ParquetStatistics::ByteArray(stats), InfluxColumnType::Field(InfluxFieldType::String)) => {
Ok((
Statistics::String(StatValues {
min: Some(
stats
.min()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})?
.to_string(),
),
max: Some(
stats
.max()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})?
.to_string(),
),
count,
}),
None,
))
Ok(Statistics::String(StatValues {
min: Some(
stats
.min()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})?
.to_string(),
),
max: Some(
stats
.max()
.as_utf8()
.context(StatisticsUtf8Error {
row_group: row_group_idx,
column: column_name.to_string(),
})?
.to_string(),
),
count,
}))
}
_ => Err(Error::StatisticsTypeMismatch {
row_group: row_group_idx,
@ -496,13 +463,11 @@ mod tests {
assert_eq!(schema_actual, schema_expected);
// step 2: read back statistics
let (table_summary_actual, timestamp_range_actual) =
let table_summary_actual =
read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table)
.unwrap();
let table_summary_expected = chunk.table_summary();
let timestamp_range_expected = chunk.timestamp_range();
assert_eq!(table_summary_actual, table_summary_expected);
assert_eq!(timestamp_range_actual, timestamp_range_expected)
}
#[tokio::test]
@ -521,13 +486,11 @@ mod tests {
assert_eq!(schema_actual, schema_expected);
// step 2: read back statistics
let (table_summary_actual, timestamp_range_actual) =
let table_summary_actual =
read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table)
.unwrap();
let table_summary_expected = chunk.table_summary();
let timestamp_range_expected = chunk.timestamp_range();
assert_eq!(table_summary_actual, table_summary_expected);
assert_eq!(timestamp_range_actual, timestamp_range_expected)
}
#[tokio::test]

View File

@ -20,8 +20,7 @@ mod tests {
////////////////////
// Create test data which is also the expected data
let table = "table1";
let (record_batches, schema, column_summaries, time_range, num_rows) =
make_record_batch("foo");
let (record_batches, schema, column_summaries, num_rows) = make_record_batch("foo");
let mut table_summary = TableSummary::new(table);
table_summary.columns = column_summaries.clone();
let record_batch = record_batches[0].clone(); // Get the first one to compare key-value meta data that would be the same for all batches
@ -40,7 +39,6 @@ mod tests {
schema.clone(),
table,
column_summaries.clone(),
time_range,
)
.await;
@ -61,11 +59,10 @@ mod tests {
);
// 2. Check statistics
let (table_summary_actual, timestamp_range_actual) =
let table_summary_actual =
read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table)
.unwrap();
assert_eq!(table_summary_actual, table_summary);
assert_eq!(timestamp_range_actual, Some(time_range));
// 3. Check data
// Note that the read_data_from_parquet_data function fixes the row-group/batches' level metadata bug in arrow

View File

@ -2,9 +2,15 @@ use snafu::{ResultExt, Snafu};
use std::{collections::BTreeSet, mem, sync::Arc};
use crate::storage::{self, Storage};
use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange};
use data_types::{
partition_metadata::{Statistics, TableSummary},
timestamp::TimestampRange,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use internal_types::{schema::Schema, selection::Selection};
use internal_types::{
schema::{Schema, TIME_COLUMN_NAME},
selection::Selection,
};
use object_store::{path::Path, ObjectStore};
use query::predicate::Predicate;
@ -38,24 +44,26 @@ pub struct Table {
/// Schema that goes with this table's parquet file
table_schema: Schema,
/// Timestamp rang of this table's parquet file
/// Timestamp range of this table's parquet file
/// (extracted from TableSummary)
timestamp_range: Option<TimestampRange>,
}
impl Table {
pub fn new(
meta: TableSummary,
table_summary: TableSummary,
path: Path,
store: Arc<ObjectStore>,
schema: Schema,
range: Option<TimestampRange>,
) -> Self {
let timestamp_range = extract_range(&table_summary);
Self {
table_summary: meta,
table_summary,
object_store_path: path,
object_store: store,
table_schema: schema,
timestamp_range: range,
timestamp_range,
}
}
@ -96,11 +104,6 @@ impl Table {
})
}
/// Return timestamp range of this table
pub fn timestamp_range(&self) -> Option<TimestampRange> {
self.timestamp_range
}
// Check if 2 time ranges overlap
pub fn matches_predicate(&self, timestamp_range: &Option<TimestampRange>) -> bool {
match (self.timestamp_range, timestamp_range) {
@ -152,3 +155,18 @@ impl Table {
self.table_summary.columns[0].count() as usize
}
}
/// Extracts min/max values of the timestamp column, from the TableSummary, if possible
fn extract_range(table_summary: &TableSummary) -> Option<TimestampRange> {
table_summary
.column(TIME_COLUMN_NAME)
.map(|c| {
if let Statistics::I64(s) = &c.stats {
if let (Some(min), Some(max)) = (s.min, s.max) {
return Some(TimestampRange::new(min, max));
}
}
None
})
.flatten()
}

View File

@ -13,7 +13,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use data_types::{
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
server_id::ServerId,
timestamp::TimestampRange,
};
use datafusion_util::MemoryStream;
use futures::TryStreamExt;
@ -93,46 +92,20 @@ pub async fn make_chunk_given_record_batch(
schema: Schema,
table: &str,
column_summaries: Vec<ColumnSummary>,
time_range: TimestampRange,
) -> Chunk {
make_chunk_common(
store,
record_batches,
schema,
table,
column_summaries,
time_range,
)
.await
make_chunk_common(store, record_batches, schema, table, column_summaries).await
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk(store: Arc<ObjectStore>, column_prefix: &str) -> Chunk {
let (record_batches, schema, column_summaries, time_range, _num_rows) =
make_record_batch(column_prefix);
make_chunk_common(
store,
record_batches,
schema,
"table1",
column_summaries,
time_range,
)
.await
let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix);
make_chunk_common(store, record_batches, schema, "table1", column_summaries).await
}
/// Same as [`make_chunk`] but parquet file does not contain any row group.
pub async fn make_chunk_no_row_group(store: Arc<ObjectStore>, column_prefix: &str) -> Chunk {
let (_, schema, column_summaries, time_range, _num_rows) = make_record_batch(column_prefix);
make_chunk_common(
store,
vec![],
schema,
"table1",
column_summaries,
time_range,
)
.await
let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix);
make_chunk_common(store, vec![], schema, "table1", column_summaries).await
}
/// Common code for all [`make_chunk`] and [`make_chunk_no_row_group`].
@ -144,7 +117,6 @@ async fn make_chunk_common(
schema: Schema,
table: &str,
column_summaries: Vec<ColumnSummary>,
time_range: TimestampRange,
) -> Chunk {
let server_id = ServerId::new(NonZeroU32::new(1).unwrap());
let db_name = "db1";
@ -180,7 +152,6 @@ async fn make_chunk_common(
path,
Arc::clone(&store),
schema,
Some(time_range),
ChunkMetrics::new_unregistered(),
)
}
@ -367,7 +338,7 @@ fn create_column_timestamp(
arrow_cols: &mut Vec<Vec<(String, ArrayRef, bool)>>,
summaries: &mut Vec<ColumnSummary>,
schema_builder: SchemaBuilder,
) -> (SchemaBuilder, TimestampRange) {
) -> SchemaBuilder {
assert_eq!(data.len(), arrow_cols.len());
for (arrow_cols_sub, data_sub) in arrow_cols.iter_mut().zip(data.iter()) {
@ -389,10 +360,7 @@ fn create_column_timestamp(
}),
});
let timestamp_range = TimestampRange::new(min.unwrap(), max.unwrap());
let schema_builder = schema_builder.timestamp();
(schema_builder, timestamp_range)
schema_builder.timestamp()
}
/// Creates an Arrow RecordBatches with schema and IOx statistics.
@ -404,13 +372,7 @@ fn create_column_timestamp(
/// indeed self-contained and can act as a source to recorder schema and statistics.
pub fn make_record_batch(
column_prefix: &str,
) -> (
Vec<RecordBatch>,
Schema,
Vec<ColumnSummary>,
TimestampRange,
usize,
) {
) -> (Vec<RecordBatch>, Schema, Vec<ColumnSummary>, usize) {
// (name, array, nullable)
let mut arrow_cols: Vec<Vec<(String, ArrayRef, bool)>> = vec![vec![], vec![], vec![]];
let mut summaries = vec![];
@ -527,7 +489,7 @@ pub fn make_record_batch(
);
// time
let (schema_builder, timestamp_range) = create_column_timestamp(
let schema_builder = create_column_timestamp(
vec![vec![1000], vec![2000], vec![3000, 4000]],
&mut arrow_cols,
&mut summaries,
@ -549,7 +511,7 @@ pub fn make_record_batch(
record_batches.push(record_batch);
}
(record_batches, schema, summaries, timestamp_range, num_rows)
(record_batches, schema, summaries, num_rows)
}
/// Creates new in-memory object store for testing.

View File

@ -1136,7 +1136,7 @@ impl CatalogState for Catalog {
read_schema_from_parquet_metadata(&info.metadata).context(SchemaReadFailed {
path: info.path.clone(),
})?;
let (table_summary, timestamp_range) =
let table_summary =
read_statistics_from_parquet_metadata(&info.metadata, &schema, &table_name).context(
StatisticsReadFailed {
path: info.path.clone(),
@ -1155,7 +1155,6 @@ impl CatalogState for Catalog {
object_store.path_from_dirs_and_filename(info.path.clone()),
object_store,
schema,
timestamp_range,
metrics,
);
let parquet_chunk = Arc::new(parquet_chunk);

View File

@ -468,7 +468,6 @@ mod tests {
path,
object_store,
schema,
None,
parquet_file::chunk::ChunkMetrics::new_unregistered(),
)
}