From 4e2d59d9a5a76d5ab2bdd8745b5b00a1e4e1f253 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 14 Apr 2021 16:06:47 -0400 Subject: [PATCH] feat: saimplement a few more functions as part of supporting query dfrom parquet files --- data_types/src/timestamp.rs | 2 +- parquet_file/src/chunk.rs | 23 +++++++++++++++++++++++ parquet_file/src/table.rs | 27 +++++++++++++++++++++++---- server/src/db.rs | 8 +------- server/src/db/chunk.rs | 12 +++++++++--- 5 files changed, 57 insertions(+), 15 deletions(-) diff --git a/data_types/src/timestamp.rs b/data_types/src/timestamp.rs index b79101cfa8..281f6e884b 100644 --- a/data_types/src/timestamp.rs +++ b/data_types/src/timestamp.rs @@ -12,7 +12,7 @@ pub struct TimestampRange { impl TimestampRange { pub fn new(start: i64, end: i64) -> Self { - debug_assert!(end > start); + debug_assert!(end >= start); Self { start, end } } diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 5a884ac81f..892bdbc8de 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -128,6 +128,7 @@ impl Chunk { .context(NamedTableError { table_name }) } + // Return all tables of this chunk whose timestamp overlaps with the give one pub fn table_names( &self, timestamp_range: Option, @@ -140,4 +141,26 @@ impl Chunk { } }) } + + // Return columns names of a given table that belong to the given column + // selection + pub fn column_names( + &self, + table_name: &str, + selection: Selection<'_>, + ) -> Option> { + let table = self + .tables + .iter() + .find(|t| t.has_table(table_name)) + .context(NamedTableNotFoundInChunk { + table_name, + chunk_id: self.id(), + }); + + match table { + Ok(table) => table.column_names(selection), + Err(_) => None, + } + } } diff --git a/parquet_file/src/table.rs b/parquet_file/src/table.rs index 5065df2d4e..ba9bedf943 100644 --- a/parquet_file/src/table.rs +++ b/parquet_file/src/table.rs @@ -1,5 +1,5 @@ use snafu::{ResultExt, Snafu}; -use std::mem; +use std::{collections::BTreeSet, mem}; use data_types::{partition_metadata::TableSummary, timestamp::TimestampRange}; use internal_types::{schema::Schema, selection::Selection}; @@ -74,7 +74,7 @@ impl Table { self.object_store_path.clone() } - /// return schema of this table for specified selection columns + /// Return schema of this table for specified selection columns pub fn schema(&self, selection: Selection<'_>) -> Result { Ok(match selection { Selection::All => self.table_schema.clone(), @@ -85,12 +85,31 @@ impl Table { }) } + // Check if 2 time ranges overlap pub fn matches_predicate(&self, timestamp_range: &Option) -> bool { match (self.timestamp_range, timestamp_range) { (Some(a), Some(b)) => !a.disjoint(b), - (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match */ - // the predicate + (None, Some(_)) => false, /* If this chunk doesn't have a time column it can't match + * the predicate */ (_, None) => true, } } + + // Return columns names of this table that belong to the given column selection + pub fn column_names(&self, selection: Selection<'_>) -> Option> { + let fields = self.table_schema.inner().fields().iter(); + + Some(match selection { + Selection::Some(cols) => fields + .filter_map(|x| { + if cols.contains(&x.name().as_str()) { + Some(x.name().clone()) + } else { + None + } + }) + .collect(), + Selection::All => fields.map(|x| x.name().clone()).collect(), + }) + } } diff --git a/server/src/db.rs b/server/src/db.rs index 83c1bf40cb..4394452788 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -548,13 +548,7 @@ impl Db { .context(SchemaConversion)?; let table_time_range = match time_range { None => None, - Some((start, end)) => { - if start < end { - Some(TimestampRange::new(start, end)) - } else { - None - } - } + Some((start, end)) => Some(TimestampRange::new(start, end)), }; parquet_chunk.add_table(stats, path, schema, table_time_range); } diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 6f1ea57efb..48755f1deb 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -343,8 +343,12 @@ impl PartitionChunk for DBChunk { })?, )) } - Self::ParquetFile { .. } => { - unimplemented!("parquet file not implemented for column_names") + Self::ParquetFile { chunk, .. } => { + if !predicate.is_empty() { + // TODO: Support predicates when MB supports it + return Ok(None); + } + Ok(chunk.column_names(table_name, columns)) } } } @@ -399,7 +403,9 @@ impl PartitionChunk for DBChunk { Ok(Some(values)) } Self::ParquetFile { .. } => { - unimplemented!("parquet file not implemented for column_values") + // Since DataFusion can read Parquet, there is no advantage to + // manually implementing this vs just letting DataFusion do its thing + Ok(None) } } }