From c464ffadad3b3035bf5f06f215564d6ac81af69a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 24 May 2021 12:19:44 -0400 Subject: [PATCH] refactor: remove special case timestamp_range in parquet chunk (#1543) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- parquet_file/src/chunk.rs | 8 +- parquet_file/src/metadata.rs | 177 +++++++++++----------------- parquet_file/src/storage_testing.rs | 7 +- parquet_file/src/table.rs | 42 +++++-- parquet_file/src/utils.rs | 58 ++------- server/src/db.rs | 3 +- server/src/db/lifecycle.rs | 1 - 7 files changed, 114 insertions(+), 182 deletions(-) diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index e753ce84b3..27ec96615c 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -77,10 +77,9 @@ impl Chunk { file_location: Path, store: Arc, schema: Schema, - range: Option, metrics: ChunkMetrics, ) -> Self { - let table = Table::new(table_summary, file_location, store, schema, range); + let table = Table::new(table_summary, file_location, store, schema); let mut chunk = Self { partition_key: part_key.into(), @@ -130,11 +129,6 @@ impl Chunk { }) } - /// Return the timestamp range of the table - pub fn timestamp_range(&self) -> Option { - self.table.timestamp_range() - } - // Return all tables of this chunk whose timestamp overlaps with the give one pub fn table_names( &self, diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 67feec455f..30a206c8f9 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -88,9 +88,8 @@ //! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md use std::sync::Arc; -use data_types::{ - partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, - timestamp::TimestampRange, +use data_types::partition_metadata::{ + ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary, }; use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema}; use parquet::{ @@ -193,44 +192,30 @@ pub fn read_statistics_from_parquet_metadata( parquet_md: &ParquetMetaData, schema: &Schema, table_name: &str, -) -> Result<(TableSummary, Option)> { +) -> Result { let mut table_summary_agg: Option = None; - let mut timestamp_range_agg = None; for (row_group_idx, row_group) in parquet_md.row_groups().iter().enumerate() { - let (table_summary, timestamp_range) = + let table_summary = read_statistics_from_parquet_row_group(row_group, row_group_idx, schema, table_name)?; match table_summary_agg.as_mut() { Some(existing) => existing.update_from(&table_summary), None => table_summary_agg = Some(table_summary), } - - timestamp_range_agg = match (timestamp_range_agg, timestamp_range) { - (Some(a), Some(b)) => Some(TimestampRange { - start: a.start.min(b.start), - end: a.end.max(b.end), - }), - (Some(a), None) | (None, Some(a)) => Some(a), - (None, None) => None, - }; } - match table_summary_agg { - Some(table_summary) => Ok((table_summary, timestamp_range_agg)), - None => Err(Error::NoRowGroup {}), - } + table_summary_agg.context(NoRowGroup) } -/// Read IOx statistics (including timestamp range) from parquet row group metadata. +/// Read IOx statistics from parquet row group metadata. fn read_statistics_from_parquet_row_group( row_group: &ParquetRowGroupMetaData, row_group_idx: usize, schema: &Schema, table_name: &str, -) -> Result<(TableSummary, Option)> { +) -> Result { let mut column_summaries = vec![]; - let mut timestamp_range = None; for ((iox_type, field), column_chunk_metadata) in schema.iter().zip(row_group.columns()) { if let Some(iox_type) = iox_type { @@ -252,7 +237,7 @@ fn read_statistics_from_parquet_row_group( let count = (row_group.num_rows().max(0) as u64).saturating_sub(parquet_stats.null_count()); - let (stats, maybe_tsrange) = extract_iox_statistics( + let stats = extract_iox_statistics( parquet_stats, iox_type, count, @@ -268,10 +253,6 @@ fn read_statistics_from_parquet_row_group( }), stats, }); - if let Some(range) = maybe_tsrange { - assert!(timestamp_range.is_none()); - timestamp_range = Some(range); - } } } @@ -280,97 +261,83 @@ fn read_statistics_from_parquet_row_group( columns: column_summaries, }; - Ok((table_summary, timestamp_range)) + Ok(table_summary) } /// Extract IOx statistics from parquet statistics. /// -/// This is required because upstream does not have a mapper from parquet statistics back to arrow or Rust native types. +/// This is required because upstream does not have a mapper from +/// parquet statistics back to arrow or Rust native types. fn extract_iox_statistics( parquet_stats: &ParquetStatistics, iox_type: InfluxColumnType, count: u64, row_group_idx: usize, column_name: &str, -) -> Result<(Statistics, Option)> { +) -> Result { match (parquet_stats, iox_type) { (ParquetStatistics::Boolean(stats), InfluxColumnType::Field(InfluxFieldType::Boolean)) => { - Ok(( - Statistics::Bool(StatValues { - min: Some(*stats.min()), - max: Some(*stats.max()), - count, - }), - None, - )) - } - (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => { - Ok(( - Statistics::I64(StatValues { - min: Some(*stats.min()), - max: Some(*stats.max()), - count, - }), - None, - )) - } - (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => { - // TODO: that's very likely wrong, but blocked by https://github.com/apache/arrow-rs/issues/254 - Ok(( - Statistics::U64(StatValues { - min: Some(*stats.min() as u64), - max: Some(*stats.max() as u64), - count, - }), - None, - )) - } - (ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => { - Ok(( - Statistics::F64(StatValues { - min: Some(*stats.min()), - max: Some(*stats.max()), - count, - }), - None, - )) - } - (ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => Ok(( - Statistics::I64(StatValues { + Ok(Statistics::Bool(StatValues { min: Some(*stats.min()), max: Some(*stats.max()), count, - }), - Some(TimestampRange::new(*stats.min(), *stats.max())), - )), + })) + } + (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::Integer)) => { + Ok(Statistics::I64(StatValues { + min: Some(*stats.min()), + max: Some(*stats.max()), + count, + })) + } + (ParquetStatistics::Int64(stats), InfluxColumnType::Field(InfluxFieldType::UInteger)) => { + // TODO: Likely incorrect for large values until + // https://github.com/apache/arrow-rs/issues/254 + Ok(Statistics::U64(StatValues { + min: Some(*stats.min() as u64), + max: Some(*stats.max() as u64), + count, + })) + } + (ParquetStatistics::Double(stats), InfluxColumnType::Field(InfluxFieldType::Float)) => { + Ok(Statistics::F64(StatValues { + min: Some(*stats.min()), + max: Some(*stats.max()), + count, + })) + } + (ParquetStatistics::Int64(stats), InfluxColumnType::Timestamp) => { + Ok(Statistics::I64(StatValues { + min: Some(*stats.min()), + max: Some(*stats.max()), + count, + })) + } (ParquetStatistics::ByteArray(stats), InfluxColumnType::Tag) | (ParquetStatistics::ByteArray(stats), InfluxColumnType::Field(InfluxFieldType::String)) => { - Ok(( - Statistics::String(StatValues { - min: Some( - stats - .min() - .as_utf8() - .context(StatisticsUtf8Error { - row_group: row_group_idx, - column: column_name.to_string(), - })? - .to_string(), - ), - max: Some( - stats - .max() - .as_utf8() - .context(StatisticsUtf8Error { - row_group: row_group_idx, - column: column_name.to_string(), - })? - .to_string(), - ), - count, - }), - None, - )) + Ok(Statistics::String(StatValues { + min: Some( + stats + .min() + .as_utf8() + .context(StatisticsUtf8Error { + row_group: row_group_idx, + column: column_name.to_string(), + })? + .to_string(), + ), + max: Some( + stats + .max() + .as_utf8() + .context(StatisticsUtf8Error { + row_group: row_group_idx, + column: column_name.to_string(), + })? + .to_string(), + ), + count, + })) } _ => Err(Error::StatisticsTypeMismatch { row_group: row_group_idx, @@ -496,13 +463,11 @@ mod tests { assert_eq!(schema_actual, schema_expected); // step 2: read back statistics - let (table_summary_actual, timestamp_range_actual) = + let table_summary_actual = read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table) .unwrap(); let table_summary_expected = chunk.table_summary(); - let timestamp_range_expected = chunk.timestamp_range(); assert_eq!(table_summary_actual, table_summary_expected); - assert_eq!(timestamp_range_actual, timestamp_range_expected) } #[tokio::test] @@ -521,13 +486,11 @@ mod tests { assert_eq!(schema_actual, schema_expected); // step 2: read back statistics - let (table_summary_actual, timestamp_range_actual) = + let table_summary_actual = read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table) .unwrap(); let table_summary_expected = chunk.table_summary(); - let timestamp_range_expected = chunk.timestamp_range(); assert_eq!(table_summary_actual, table_summary_expected); - assert_eq!(timestamp_range_actual, timestamp_range_expected) } #[tokio::test] diff --git a/parquet_file/src/storage_testing.rs b/parquet_file/src/storage_testing.rs index 208c4747c5..c239f8e942 100644 --- a/parquet_file/src/storage_testing.rs +++ b/parquet_file/src/storage_testing.rs @@ -20,8 +20,7 @@ mod tests { //////////////////// // Create test data which is also the expected data let table = "table1"; - let (record_batches, schema, column_summaries, time_range, num_rows) = - make_record_batch("foo"); + let (record_batches, schema, column_summaries, num_rows) = make_record_batch("foo"); let mut table_summary = TableSummary::new(table); table_summary.columns = column_summaries.clone(); let record_batch = record_batches[0].clone(); // Get the first one to compare key-value meta data that would be the same for all batches @@ -40,7 +39,6 @@ mod tests { schema.clone(), table, column_summaries.clone(), - time_range, ) .await; @@ -61,11 +59,10 @@ mod tests { ); // 2. Check statistics - let (table_summary_actual, timestamp_range_actual) = + let table_summary_actual = read_statistics_from_parquet_metadata(&parquet_metadata, &schema_actual, &table) .unwrap(); assert_eq!(table_summary_actual, table_summary); - assert_eq!(timestamp_range_actual, Some(time_range)); // 3. Check data // Note that the read_data_from_parquet_data function fixes the row-group/batches' level metadata bug in arrow diff --git a/parquet_file/src/table.rs b/parquet_file/src/table.rs index 26c4fec778..c90224a3a0 100644 --- a/parquet_file/src/table.rs +++ b/parquet_file/src/table.rs @@ -2,9 +2,15 @@ use snafu::{ResultExt, Snafu}; use std::{collections::BTreeSet, mem, sync::Arc}; use crate::storage::{self, Storage}; -use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange}; +use data_types::{ + partition_metadata::{Statistics, TableSummary}, + timestamp::TimestampRange, +}; use datafusion::physical_plan::SendableRecordBatchStream; -use internal_types::{schema::Schema, selection::Selection}; +use internal_types::{ + schema::{Schema, TIME_COLUMN_NAME}, + selection::Selection, +}; use object_store::{path::Path, ObjectStore}; use query::predicate::Predicate; @@ -38,24 +44,26 @@ pub struct Table { /// Schema that goes with this table's parquet file table_schema: Schema, - /// Timestamp rang of this table's parquet file + /// Timestamp range of this table's parquet file + /// (extracted from TableSummary) timestamp_range: Option, } impl Table { pub fn new( - meta: TableSummary, + table_summary: TableSummary, path: Path, store: Arc, schema: Schema, - range: Option, ) -> Self { + let timestamp_range = extract_range(&table_summary); + Self { - table_summary: meta, + table_summary, object_store_path: path, object_store: store, table_schema: schema, - timestamp_range: range, + timestamp_range, } } @@ -96,11 +104,6 @@ impl Table { }) } - /// Return timestamp range of this table - pub fn timestamp_range(&self) -> Option { - self.timestamp_range - } - // Check if 2 time ranges overlap pub fn matches_predicate(&self, timestamp_range: &Option) -> bool { match (self.timestamp_range, timestamp_range) { @@ -152,3 +155,18 @@ impl Table { self.table_summary.columns[0].count() as usize } } + +/// Extracts min/max values of the timestamp column, from the TableSummary, if possible +fn extract_range(table_summary: &TableSummary) -> Option { + table_summary + .column(TIME_COLUMN_NAME) + .map(|c| { + if let Statistics::I64(s) = &c.stats { + if let (Some(min), Some(max)) = (s.min, s.max) { + return Some(TimestampRange::new(min, max)); + } + } + None + }) + .flatten() +} diff --git a/parquet_file/src/utils.rs b/parquet_file/src/utils.rs index c3e10f6500..bb43a2f293 100644 --- a/parquet_file/src/utils.rs +++ b/parquet_file/src/utils.rs @@ -13,7 +13,6 @@ use datafusion::physical_plan::SendableRecordBatchStream; use data_types::{ partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, server_id::ServerId, - timestamp::TimestampRange, }; use datafusion_util::MemoryStream; use futures::TryStreamExt; @@ -93,46 +92,20 @@ pub async fn make_chunk_given_record_batch( schema: Schema, table: &str, column_summaries: Vec, - time_range: TimestampRange, ) -> Chunk { - make_chunk_common( - store, - record_batches, - schema, - table, - column_summaries, - time_range, - ) - .await + make_chunk_common(store, record_batches, schema, table, column_summaries).await } /// Same as [`make_chunk`] but parquet file does not contain any row group. pub async fn make_chunk(store: Arc, column_prefix: &str) -> Chunk { - let (record_batches, schema, column_summaries, time_range, _num_rows) = - make_record_batch(column_prefix); - make_chunk_common( - store, - record_batches, - schema, - "table1", - column_summaries, - time_range, - ) - .await + let (record_batches, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); + make_chunk_common(store, record_batches, schema, "table1", column_summaries).await } /// Same as [`make_chunk`] but parquet file does not contain any row group. pub async fn make_chunk_no_row_group(store: Arc, column_prefix: &str) -> Chunk { - let (_, schema, column_summaries, time_range, _num_rows) = make_record_batch(column_prefix); - make_chunk_common( - store, - vec![], - schema, - "table1", - column_summaries, - time_range, - ) - .await + let (_, schema, column_summaries, _num_rows) = make_record_batch(column_prefix); + make_chunk_common(store, vec![], schema, "table1", column_summaries).await } /// Common code for all [`make_chunk`] and [`make_chunk_no_row_group`]. @@ -144,7 +117,6 @@ async fn make_chunk_common( schema: Schema, table: &str, column_summaries: Vec, - time_range: TimestampRange, ) -> Chunk { let server_id = ServerId::new(NonZeroU32::new(1).unwrap()); let db_name = "db1"; @@ -180,7 +152,6 @@ async fn make_chunk_common( path, Arc::clone(&store), schema, - Some(time_range), ChunkMetrics::new_unregistered(), ) } @@ -367,7 +338,7 @@ fn create_column_timestamp( arrow_cols: &mut Vec>, summaries: &mut Vec, schema_builder: SchemaBuilder, -) -> (SchemaBuilder, TimestampRange) { +) -> SchemaBuilder { assert_eq!(data.len(), arrow_cols.len()); for (arrow_cols_sub, data_sub) in arrow_cols.iter_mut().zip(data.iter()) { @@ -389,10 +360,7 @@ fn create_column_timestamp( }), }); - let timestamp_range = TimestampRange::new(min.unwrap(), max.unwrap()); - - let schema_builder = schema_builder.timestamp(); - (schema_builder, timestamp_range) + schema_builder.timestamp() } /// Creates an Arrow RecordBatches with schema and IOx statistics. @@ -404,13 +372,7 @@ fn create_column_timestamp( /// indeed self-contained and can act as a source to recorder schema and statistics. pub fn make_record_batch( column_prefix: &str, -) -> ( - Vec, - Schema, - Vec, - TimestampRange, - usize, -) { +) -> (Vec, Schema, Vec, usize) { // (name, array, nullable) let mut arrow_cols: Vec> = vec![vec![], vec![], vec![]]; let mut summaries = vec![]; @@ -527,7 +489,7 @@ pub fn make_record_batch( ); // time - let (schema_builder, timestamp_range) = create_column_timestamp( + let schema_builder = create_column_timestamp( vec![vec![1000], vec![2000], vec![3000, 4000]], &mut arrow_cols, &mut summaries, @@ -549,7 +511,7 @@ pub fn make_record_batch( record_batches.push(record_batch); } - (record_batches, schema, summaries, timestamp_range, num_rows) + (record_batches, schema, summaries, num_rows) } /// Creates new in-memory object store for testing. diff --git a/server/src/db.rs b/server/src/db.rs index a8938cb9a1..6dbf807596 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1136,7 +1136,7 @@ impl CatalogState for Catalog { read_schema_from_parquet_metadata(&info.metadata).context(SchemaReadFailed { path: info.path.clone(), })?; - let (table_summary, timestamp_range) = + let table_summary = read_statistics_from_parquet_metadata(&info.metadata, &schema, &table_name).context( StatisticsReadFailed { path: info.path.clone(), @@ -1155,7 +1155,6 @@ impl CatalogState for Catalog { object_store.path_from_dirs_and_filename(info.path.clone()), object_store, schema, - timestamp_range, metrics, ); let parquet_chunk = Arc::new(parquet_chunk); diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index a36ff66236..4839963511 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -468,7 +468,6 @@ mod tests { path, object_store, schema, - None, parquet_file::chunk::ChunkMetrics::new_unregistered(), ) }