diff --git a/query/src/lib.rs b/query/src/lib.rs index d6675f55af..f6ce6901f5 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -7,7 +7,7 @@ )] use async_trait::async_trait; -use data_types::{chunk_metadata::ChunkSummary, partition_metadata::ColumnSummary}; +use data_types::chunk_metadata::ChunkSummary; use datafusion::physical_plan::SendableRecordBatchStream; use exec::{stringset::StringSet, Executor}; use internal_types::{schema::Schema, selection::Selection}; @@ -66,6 +66,10 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { /// Returns the name of the table stored in this chunk fn table_name(&self) -> &str; + /// Returns true if the chunk may contain a duplicate "primary + /// key" within itself + fn may_contain_pk_duplicates(&self) -> bool; + /// Returns the result of applying the `predicate` to the chunk /// using an efficient, but inexact method, based on metadata. /// @@ -119,12 +123,9 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync { selection: Selection<'_>, ) -> Result; - /// Returns true if this chunk has duplicates - fn has_duplicates(&self) -> bool; - /// Returns true if data of this chunk is sorted fn is_sorted(&self) -> bool; - fn primary_key_columns(&self) -> Vec<&ColumnSummary>; + //fn primary_key_columns(&self) -> Vec<&ColumnSummary>; } #[async_trait] diff --git a/query/src/provider.rs b/query/src/provider.rs index 1155e63b0f..817ec2f329 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -2,16 +2,26 @@ use std::sync::Arc; -use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef}; +use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError}; // use data_types::partition_metadata::ColumnSummary; -use datafusion::{datasource::{ +use datafusion::{ + datasource::{ datasource::{Statistics, TableProviderFilterPushDown}, TableProvider, - }, error::{DataFusionError, Result as DataFusionResult}, logical_plan::Expr, physical_plan::{ExecutionPlan, expressions::{PhysicalSortExpr, col}, sort::SortExec}}; + }, + error::{DataFusionError, Result as DataFusionResult}, + logical_plan::Expr, + physical_plan::{ + expressions::{col, PhysicalSortExpr}, + sort::SortExec, + ExecutionPlan, + }, +}; use internal_types::schema::{builder::SchemaMerger, Schema}; use observability_deps::tracing::debug; use crate::{ + duplicate::group_potential_duplicates, predicate::{Predicate, PredicateBuilder}, util::project_schema, PartitionChunk, @@ -59,9 +69,26 @@ pub enum Error { InternalSort { source: datafusion::error::DataFusionError, }, + + #[snafu(display("Internal error: Can not group chunks '{}'", source,))] + InternalChunkGrouping { source: crate::duplicate::Error }, } pub type Result = std::result::Result; +impl From for ArrowError { + // Wrap an error into an arrow error + fn from(e: Error) -> Self { + Self::ExternalError(Box::new(e)) + } +} + +impl From for DataFusionError { + // Wrap an error into a datafusion error + fn from(e: Error) -> Self { + Self::ArrowError(e.into()) + } +} + /// Something that can prune chunks based on their metadata pub trait ChunkPruner: Sync + Send + std::fmt::Debug { /// prune `chunks`, if possible, based on predicate. @@ -250,7 +277,7 @@ impl TableProvider for ChunkTableProvider { scan_schema, chunks, predicate, - ); + )?; Ok(plan) } @@ -344,9 +371,18 @@ impl Deduplicater { schema: ArrowSchemaRef, chunks: Vec>, predicate: Predicate, - ) -> Arc { - //finding overlapped chunks and put them into the right group - self.split_overlapped_chunks(chunks.to_vec()); + ) -> Result> { + // findi overlapped chunks and put them into the right group + self.split_overlapped_chunks(chunks.to_vec())?; + + // TEMP until the rest of this module's code is complete: + // merge all plans into the same + self.no_duplicates_chunks + .append(&mut self.in_chunk_duplicates_chunks); + for mut group in &mut self.overlapped_chunks_set { + self.no_duplicates_chunks.append(&mut group); + } + self.overlapped_chunks_set.clear(); // Building plans let mut plans = vec![]; @@ -377,7 +413,7 @@ impl Deduplicater { Arc::clone(&schema), chunk_with_duplicates.to_owned(), predicate.clone(), - )); + )?); } // Go over non_duplicates_chunks, build a plan for it @@ -397,22 +433,32 @@ impl Deduplicater { // There are still plan, add UnionExec if !plans.is_empty() { // final_plan = union_plan - panic!("Unexpected error: There should be only one output for scan plan"); + panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans); } - final_plan + Ok(final_plan) } /// discover overlaps and split them into three groups: /// 1. vector of vector of overlapped chunks /// 2. vector of non-overlapped chunks, each have duplicates in itself /// 3. vectors of non-overlapped chunks without duplicates - fn split_overlapped_chunks(&mut self, chunks: Vec>) { - // TODO: need to discover overlaps and split them - // The current behavior is just like neither overlaps nor having duplicates in its own chunk - //self.overlapped_chunks_set = vec![]; - //self.in_chunk_duplicates_chunks = vec![]; - self.no_duplicates_chunks.append(&mut chunks.to_vec()); + fn split_overlapped_chunks(&mut self, chunks: Vec>) -> Result<()> { + // Find all groups based on statstics + let groups = group_potential_duplicates(chunks).context(InternalChunkGrouping)?; + + for mut group in groups { + if group.len() == 1 { + if group[0].may_contain_pk_duplicates() { + self.in_chunk_duplicates_chunks.append(&mut group); + } else { + self.no_duplicates_chunks.append(&mut group); + } + } else { + self.overlapped_chunks_set.push(group) + } + } + Ok(()) } /// Return true if all chunks are neither overlap nor has duplicates in itself @@ -488,184 +534,56 @@ impl Deduplicater { schema: ArrowSchemaRef, chunk: Arc, // This chunk is identified having duplicates predicate: Predicate, - ) -> Arc { - + ) -> Result> { // Create the bottom node IOxRedFilterNode for this chunk - let mut input: Arc = Arc::new(IOxReadFilterNode::new( + let input: Arc = Arc::new(IOxReadFilterNode::new( Arc::clone(&table_name), schema, vec![Arc::clone(&chunk)], predicate, )); - // Add the sort operator, SortExec, if needed - if !chunk.is_sorted() { + //let plan = Self::build_sort_plan(chunk, input); + Self::build_sort_plan(chunk, input) - let key_summaries = chunk.primary_key_columns(); + // Create DeduplicateExc + // TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646 + //plan = add_deduplicate_exec(plan); + + //plan + } + + /// Add SortExec operator on top of the input plan of the given chunk + /// The plan will be sorted on the chunk's primary key + fn build_sort_plan( + chunk: Arc, + input: Arc, + ) -> Result> { + if !chunk.is_sorted() { + let key_summaries = chunk.summary().primary_key_columns(); // build sort expression let mut sort_exprs = vec![]; for key in key_summaries { - sort_exprs.push( - PhysicalSortExpr{ - expr: col(key.name.as_str()), - options: SortOptions { - descending: false, - nulls_first: false, - } - } - ); + sort_exprs.push(PhysicalSortExpr { + expr: col(key.name.as_str()), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }); } - // Create SortExec operator on top of the IOxReadFilterNode - let sort_exec = SortExec::try_new(sort_exprs, input); - let sort_exec = match sort_exec { - Ok(plan) => plan, - Err(e) => panic!("Internal error while adding SortExec: {}", e) // This should never happens - }; - input = Arc::new(sort_exec); + // Create SortExec operator + Ok(Arc::new( + SortExec::try_new(sort_exprs, input).context(InternalSort)?, + )) + } else { + Ok(input) } - - // Create DeduplicateExc - // TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646 - - input } - - ////////// - - // let sort_expr = expr - // .iter() - // .map(|e| match e { - // Expr::Sort { - // expr, - // asc, - // nulls_first, - // } => self.create_physical_sort_expr( - // expr, - // &input_schema, - // SortOptions { - // descending: !*asc, - // nulls_first: *nulls_first, - // }, - // ctx_state, - // ), - // _ => Err(DataFusionError::Plan( - // "Sort only accepts sort expressions".to_string(), - // )), - // }) - // .collect::>>()?; - - // Ok(Arc::new(SortExec::try_new(sort_expr, input)?)) - - /////////// - /////// TEST: create SortExec plan - // #[tokio::test] - // async fn test_lex_sort_by_float() -> Result<()> { - // let schema = Arc::new(Schema::new(vec![ - // Field::new("a", DataType::Float32, true), - // Field::new("b", DataType::Float64, true), - // ])); - - // // define data. - // let batch = RecordBatch::try_new( - // schema.clone(), - // vec![ - // Arc::new(Float32Array::from(vec![ - // Some(f32::NAN), - // None, - // None, - // Some(f32::NAN), - // Some(1.0_f32), - // Some(1.0_f32), - // Some(2.0_f32), - // Some(3.0_f32), - // ])), - // Arc::new(Float64Array::from(vec![ - // Some(200.0_f64), - // Some(20.0_f64), - // Some(10.0_f64), - // Some(100.0_f64), - // Some(f64::NAN), - // None, - // None, - // Some(f64::NAN), - // ])), - // ], - // )?; - - // let sort_exec = Arc::new(SortExec::try_new( - // vec![ - // PhysicalSortExpr { - // expr: col("a"), - // options: SortOptions { - // descending: true, - // nulls_first: true, - // }, - // }, - // PhysicalSortExpr { - // expr: col("b"), - // options: SortOptions { - // descending: false, - // nulls_first: false, - // }, - // }, - // ], - // Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), - // )?); - - // assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); - // assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); - - // let result: Vec = collect(sort_exec.clone()).await?; - // assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0); - // assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8); - // assert_eq!(result.len(), 1); - - // let columns = result[0].columns(); - - // assert_eq!(DataType::Float32, *columns[0].data_type()); - // assert_eq!(DataType::Float64, *columns[1].data_type()); - - // let a = as_primitive_array::(&columns[0]); - // let b = as_primitive_array::(&columns[1]); - - // // convert result to strings to allow comparing to expected result containing NaN - // let result: Vec<(Option, Option)> = (0..result[0].num_rows()) - // .map(|i| { - // let aval = if a.is_valid(i) { - // Some(a.value(i).to_string()) - // } else { - // None - // }; - // let bval = if b.is_valid(i) { - // Some(b.value(i).to_string()) - // } else { - // None - // }; - // (aval, bval) - // }) - // .collect(); - - // let expected: Vec<(Option, Option)> = vec![ - // (None, Some("10".to_owned())), - // (None, Some("20".to_owned())), - // (Some("NaN".to_owned()), Some("100".to_owned())), - // (Some("NaN".to_owned()), Some("200".to_owned())), - // (Some("3".to_owned()), Some("NaN".to_owned())), - // (Some("2".to_owned()), None), - // (Some("1".to_owned()), Some("NaN".to_owned())), - // (Some("1".to_owned()), None), - // ]; - - // assert_eq!(expected, result); - - // Ok(()) - // } - ///////////// - /// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode /// ```text /// ┌─────────────────┐ @@ -718,8 +636,134 @@ impl ChunkPruner for NoOpPruner { } } +#[cfg(test)] +mod test { + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use arrow_util::assert_batches_eq; + use datafusion::physical_plan::collect; + use internal_types::schema::TIME_COLUMN_NAME; -#[tokio::test] -async fn test_sort() { - -} \ No newline at end of file + use crate::test::TestChunk; + + use super::*; + + #[test] + fn chunk_grouping() { + // This test just ensures that all the plumbing is connected + // for chunk grouping. The logic of the grouping is tested + // in the duplicate module + + // c1: no overlaps + let c1 = Arc::new(TestChunk::new(1).with_tag_column_with_stats("t", "tag1", "a", "b")); + + // c2: over lap with c3 + let c2 = Arc::new(TestChunk::new(2).with_tag_column_with_stats("t", "tag1", "c", "d")); + + // c3: overlap with c2 + let c3 = Arc::new(TestChunk::new(3).with_tag_column_with_stats("t", "tag1", "c", "d")); + + // c4: self overlap + let c4 = Arc::new( + TestChunk::new(4) + .with_tag_column_with_stats("t", "tag1", "e", "f") + .with_may_contain_pk_duplicates(true), + ); + + let mut deduplicator = Deduplicater::new(); + deduplicator + .split_overlapped_chunks(vec![c1, c2, c3, c4]) + .expect("split chunks"); + + assert_eq!( + chunk_group_ids(&deduplicator.overlapped_chunks_set), + vec!["Group 0: 2, 3"] + ); + assert_eq!(chunk_ids(&deduplicator.in_chunk_duplicates_chunks), "4"); + assert_eq!(chunk_ids(&deduplicator.no_duplicates_chunks), "1"); + } + + #[tokio::test] + async fn sort_planning() { + // Chunk 1 with 5 rows of data + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column("t") + .with_int_field_column("t", "field_int") + .with_tag_column("t", "tag2") + .with_tag_column("t", "tag1") + .with_five_row_of_null_data("t"), + ); + + // Datafusion schema of the chunk + let schema = Arc::new(Schema::new(vec![ + Field::new( + "tag1", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + Field::new( + "tag2", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + ), + Field::new("field_int", DataType::Int64, true), + Field::new( + TIME_COLUMN_NAME, + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + ])); + + // IOx scan operator + let input: Arc = Arc::new(IOxReadFilterNode::new( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + )); + let batch = collect(Arc::clone(&input)).await.unwrap(); + // data in its original non-sorted form + let expected = vec![ + "+------+------+-----------+-------------------------------+", + "| tag1 | tag2 | field_int | time |", + "+------+------+-----------+-------------------------------+", + "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + "+------+------+-----------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + + // Add Sort operator on top of IOx scan + let sort_plan = Deduplicater::build_sort_plan(chunk, input); + let batch = collect(sort_plan.unwrap()).await.unwrap(); + // data is not sorted on primary key(tag1, tag2, time) + let expected = vec![ + "+------+------+-----------+-------------------------------+", + "| tag1 | tag2 | field_int | time |", + "+------+------+-----------+-------------------------------+", + "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", + "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", + "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + "+------+------+-----------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + fn chunk_ids(group: &[Arc]) -> String { + let ids = group.iter().map(|c| c.id().to_string()).collect::>(); + ids.join(", ") + } + + fn chunk_group_ids(groups: &[Vec>]) -> Vec { + groups + .iter() + .enumerate() + .map(|(idx, group)| format!("Group {}: {}", idx, chunk_ids(group))) + .collect() + } +} diff --git a/query/src/pruning.rs b/query/src/pruning.rs index 850ae65700..4e49afa957 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -1,4 +1,6 @@ //! Implementation of statistics based pruning +use std::sync::Arc; + use arrow::{array::ArrayRef, datatypes::SchemaRef}; use data_types::partition_metadata::{ColumnSummary, Statistics, TableSummary}; use datafusion::{ @@ -20,6 +22,19 @@ pub trait Prunable: Sized { fn schema(&self) -> SchemaRef; } +impl

Prunable for Arc

+where + P: Prunable, +{ + fn summary(&self) -> &TableSummary { + self.as_ref().summary() + } + + fn schema(&self) -> SchemaRef { + self.as_ref().schema() + } +} + /// Something that cares to be notified when pruning of chunks occurs pub trait PruningObserver { type Observed; diff --git a/query/src/test.rs b/query/src/test.rs index 1f55770dd8..c40a519a3f 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -8,7 +8,10 @@ use arrow::{ datatypes::{DataType, Int32Type, TimeUnit}, record_batch::RecordBatch, }; -use data_types::{chunk_metadata::ChunkSummary, partition_metadata::{ColumnSummary, TableSummary}}; +use data_types::{ + chunk_metadata::ChunkSummary, + partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, +}; use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; use crate::{ @@ -20,7 +23,7 @@ use crate::{exec::Executor, pruning::Prunable}; use internal_types::{ schema::{ builder::{SchemaBuilder, SchemaMerger}, - Schema, + InfluxColumnType, Schema, }, selection::Selection, }; @@ -119,6 +122,9 @@ impl Database for TestDatabase { pub struct TestChunk { id: u32, + /// Set the flag if this chunk might contain duplicates + may_contain_pk_duplicates: bool, + /// A copy of the captured predicates passed predicates: Mutex>, @@ -136,6 +142,9 @@ pub struct TestChunk { /// Return value for apply_predicate, if desired predicate_match: Option, + + /// Return value for summary(), if desired + table_summary: Option, } impl TestChunk { @@ -173,6 +182,12 @@ impl TestChunk { self.with_tag_column(table_name, "dummy_col") } + /// Set the `may_contain_pk_duplicates` flag + pub fn with_may_contain_pk_duplicates(mut self, v: bool) -> Self { + self.may_contain_pk_duplicates = v; + self + } + /// Register an tag column with the test chunk pub fn with_tag_column( self, @@ -189,6 +204,38 @@ impl TestChunk { self.add_schema_to_table(table_name, new_column_schema) } + /// Register an tag column with the test chunk + pub fn with_tag_column_with_stats( + self, + table_name: impl Into, + column_name: impl Into, + min: &str, + max: &str, + ) -> Self { + let table_name = table_name.into(); + let column_name = column_name.into(); + + let mut new_self = self.with_tag_column(&table_name, &column_name); + + // Now, find the appropriate column summary and update the stats + let column_summary: &mut ColumnSummary = new_self + .table_summary + .as_mut() + .expect("had table summary") + .columns + .iter_mut() + .find(|c| c.name == column_name) + .expect("had column"); + + column_summary.stats = Statistics::String(StatValues { + min: Some(min.to_string()), + max: Some(max.to_string()), + ..Default::default() + }); + + new_self + } + /// Register a timetamp column with the test chunk pub fn with_time_column(self, table_name: impl Into) -> Self { let table_name = table_name.into(); @@ -226,7 +273,37 @@ impl TestChunk { if let Some(existing_name) = &self.table_name { assert_eq!(&table_name, existing_name); } - self.table_name = Some(table_name); + self.table_name = Some(table_name.clone()); + + // assume the new schema has exactly a single table + assert_eq!(new_column_schema.len(), 1); + let (col_type, new_field) = new_column_schema.field(0); + + let influxdb_type = col_type.map(|t| match t { + InfluxColumnType::Tag => InfluxDbType::Tag, + InfluxColumnType::Field(_) => InfluxDbType::Field, + InfluxColumnType::Timestamp => InfluxDbType::Timestamp, + }); + + let stats = match new_field.data_type() { + DataType::Boolean => Statistics::Bool(StatValues::default()), + DataType::Int64 => Statistics::I64(StatValues::default()), + DataType::UInt64 => Statistics::U64(StatValues::default()), + DataType::Utf8 => Statistics::String(StatValues::default()), + DataType::Dictionary(_, value_type) => { + assert!(matches!(**value_type, DataType::Utf8)); + Statistics::String(StatValues::default()) + } + DataType::Float64 => Statistics::String(StatValues::default()), + DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()), + _ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()), + }; + + let column_summary = ColumnSummary { + name: new_field.name().clone(), + influxdb_type, + stats, + }; let mut merger = SchemaMerger::new().merge(new_column_schema).unwrap(); @@ -238,6 +315,14 @@ impl TestChunk { let new_schema = merger.build().unwrap(); self.table_schema = Some(new_schema); + + let mut table_summary = self + .table_summary + .take() + .unwrap_or_else(|| TableSummary::new(table_name)); + table_summary.columns.push(column_summary); + self.table_summary = Some(table_summary); + self } @@ -278,6 +363,59 @@ impl TestChunk { .collect::>(); let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + println!("TestChunk batch data: {:#?}", batch); + + self.table_data.push(Arc::new(batch)); + self + } + + /// Prepares this chunk to return a specific record batch with five + /// rows of non null data that look like + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", + /// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + /// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", + /// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + /// "+------+------+-----------+-------------------------------+", + pub fn with_five_row_of_null_data(mut self, _table_name: impl Into) -> Self { + //let table_name = table_name.into(); + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => { + Arc::new(Int64Array::from(vec![1000, 10, 70, 100, 5])) as ArrayRef + } + DataType::Utf8 => { + Arc::new(StringArray::from(vec!["MA", "MT", "CT", "AL", "MT"])) as ArrayRef + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( + TimestampNanosecondArray::from_vec(vec![1000, 7000, 100, 50, 5000], None), + ) as ArrayRef, + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + let dict: DictionaryArray = + vec!["MA", "MT", "CT", "AL", "MT"].into_iter().collect(); + Arc::new(dict) as ArrayRef + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + println!("TestChunk batch data: {:#?}", batch); self.table_data.push(Arc::new(batch)); self @@ -320,6 +458,10 @@ impl PartitionChunk for TestChunk { self.table_name.as_deref().unwrap() } + fn may_contain_pk_duplicates(&self) -> bool { + self.may_contain_pk_duplicates + } + fn read_filter( &self, predicate: &Predicate, @@ -334,17 +476,11 @@ impl PartitionChunk for TestChunk { let stream = SizedRecordBatchStream::new(batches[0].schema(), batches); Ok(Box::pin(stream)) } - fn has_duplicates(&self) -> bool { - false - } /// Returns true if data of this chunk is sorted fn is_sorted(&self) -> bool { false } - fn primary_key_columns(&self) -> Vec<&ColumnSummary> { - vec![] - } fn apply_predicate(&self, predicate: &Predicate) -> Result { self.check_error()?; @@ -414,7 +550,9 @@ impl PartitionChunk for TestChunk { impl Prunable for TestChunk { fn summary(&self) -> &TableSummary { - unimplemented!(); + self.table_summary + .as_ref() + .expect("Table summary not configured for TestChunk") } fn schema(&self) -> arrow::datatypes::SchemaRef { diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index d55e5c9944..6188061d41 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -4,7 +4,7 @@ use std::{ }; use arrow::datatypes::SchemaRef; -use data_types::partition_metadata::{self, ColumnSummary}; +use data_types::partition_metadata; use partition_metadata::TableSummary; use snafu::{ResultExt, Snafu}; @@ -217,6 +217,13 @@ impl PartitionChunk for DbChunk { self.table_name.as_ref() } + fn may_contain_pk_duplicates(&self) -> bool { + // Assume that the MUB can contain duplicates as it has the + // raw incoming stream of writes, but that all other types of + // chunks are deduplicated as part of creation + matches!(self.state, State::ReadBuffer { .. }) + } + fn apply_predicate(&self, predicate: &Predicate) -> Result { if !predicate.should_include_table(self.table_name().as_ref()) { return Ok(PredicateMatch::Zero); @@ -443,25 +450,14 @@ impl PartitionChunk for DbChunk { } } - fn has_duplicates(&self) -> bool { - match &self.state { - State::MutableBuffer { .. } => true, - State::ReadBuffer { .. } => true, // TODO: should be false after compaction - State::ParquetFile { .. } => true, // TODO: should be false after compaction - } - } - // TODOs: return the right value. For now the chunk is assumed to be not sorted fn is_sorted(&self) -> bool { match &self.state { State::MutableBuffer { .. } => false, - State::ReadBuffer { .. } => false, + State::ReadBuffer { .. } => false, State::ParquetFile { .. } => false, } } - fn primary_key_columns(&self) -> Vec<&ColumnSummary> { - self.meta.table_summary.primary_key_columns() - } } impl Prunable for DbChunk {