test: add tests for the sort plan
commit
3e10351538
|
@ -7,7 +7,7 @@
|
|||
)]
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{chunk_metadata::ChunkSummary, partition_metadata::ColumnSummary};
|
||||
use data_types::chunk_metadata::ChunkSummary;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use exec::{stringset::StringSet, Executor};
|
||||
use internal_types::{schema::Schema, selection::Selection};
|
||||
|
@ -66,6 +66,10 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
|
|||
/// Returns the name of the table stored in this chunk
|
||||
fn table_name(&self) -> &str;
|
||||
|
||||
/// Returns true if the chunk may contain a duplicate "primary
|
||||
/// key" within itself
|
||||
fn may_contain_pk_duplicates(&self) -> bool;
|
||||
|
||||
/// Returns the result of applying the `predicate` to the chunk
|
||||
/// using an efficient, but inexact method, based on metadata.
|
||||
///
|
||||
|
@ -119,12 +123,9 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
|
|||
selection: Selection<'_>,
|
||||
) -> Result<SendableRecordBatchStream, Self::Error>;
|
||||
|
||||
/// Returns true if this chunk has duplicates
|
||||
fn has_duplicates(&self) -> bool;
|
||||
|
||||
/// Returns true if data of this chunk is sorted
|
||||
fn is_sorted(&self) -> bool;
|
||||
fn primary_key_columns(&self) -> Vec<&ColumnSummary>;
|
||||
//fn primary_key_columns(&self) -> Vec<&ColumnSummary>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
|
@ -2,16 +2,26 @@
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef};
|
||||
use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
|
||||
// use data_types::partition_metadata::ColumnSummary;
|
||||
use datafusion::{datasource::{
|
||||
use datafusion::{
|
||||
datasource::{
|
||||
datasource::{Statistics, TableProviderFilterPushDown},
|
||||
TableProvider,
|
||||
}, error::{DataFusionError, Result as DataFusionResult}, logical_plan::Expr, physical_plan::{ExecutionPlan, expressions::{PhysicalSortExpr, col}, sort::SortExec}};
|
||||
},
|
||||
error::{DataFusionError, Result as DataFusionResult},
|
||||
logical_plan::Expr,
|
||||
physical_plan::{
|
||||
expressions::{col, PhysicalSortExpr},
|
||||
sort::SortExec,
|
||||
ExecutionPlan,
|
||||
},
|
||||
};
|
||||
use internal_types::schema::{builder::SchemaMerger, Schema};
|
||||
use observability_deps::tracing::debug;
|
||||
|
||||
use crate::{
|
||||
duplicate::group_potential_duplicates,
|
||||
predicate::{Predicate, PredicateBuilder},
|
||||
util::project_schema,
|
||||
PartitionChunk,
|
||||
|
@ -59,9 +69,26 @@ pub enum Error {
|
|||
InternalSort {
|
||||
source: datafusion::error::DataFusionError,
|
||||
},
|
||||
|
||||
#[snafu(display("Internal error: Can not group chunks '{}'", source,))]
|
||||
InternalChunkGrouping { source: crate::duplicate::Error },
|
||||
}
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
impl From<Error> for ArrowError {
|
||||
// Wrap an error into an arrow error
|
||||
fn from(e: Error) -> Self {
|
||||
Self::ExternalError(Box::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for DataFusionError {
|
||||
// Wrap an error into a datafusion error
|
||||
fn from(e: Error) -> Self {
|
||||
Self::ArrowError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Something that can prune chunks based on their metadata
|
||||
pub trait ChunkPruner<C: PartitionChunk>: Sync + Send + std::fmt::Debug {
|
||||
/// prune `chunks`, if possible, based on predicate.
|
||||
|
@ -250,7 +277,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
|||
scan_schema,
|
||||
chunks,
|
||||
predicate,
|
||||
);
|
||||
)?;
|
||||
|
||||
Ok(plan)
|
||||
}
|
||||
|
@ -344,9 +371,18 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
|||
schema: ArrowSchemaRef,
|
||||
chunks: Vec<Arc<C>>,
|
||||
predicate: Predicate,
|
||||
) -> Arc<dyn ExecutionPlan> {
|
||||
//finding overlapped chunks and put them into the right group
|
||||
self.split_overlapped_chunks(chunks.to_vec());
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
// findi overlapped chunks and put them into the right group
|
||||
self.split_overlapped_chunks(chunks.to_vec())?;
|
||||
|
||||
// TEMP until the rest of this module's code is complete:
|
||||
// merge all plans into the same
|
||||
self.no_duplicates_chunks
|
||||
.append(&mut self.in_chunk_duplicates_chunks);
|
||||
for mut group in &mut self.overlapped_chunks_set {
|
||||
self.no_duplicates_chunks.append(&mut group);
|
||||
}
|
||||
self.overlapped_chunks_set.clear();
|
||||
|
||||
// Building plans
|
||||
let mut plans = vec![];
|
||||
|
@ -377,7 +413,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
|||
Arc::clone(&schema),
|
||||
chunk_with_duplicates.to_owned(),
|
||||
predicate.clone(),
|
||||
));
|
||||
)?);
|
||||
}
|
||||
|
||||
// Go over non_duplicates_chunks, build a plan for it
|
||||
|
@ -397,22 +433,32 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
|||
// There are still plan, add UnionExec
|
||||
if !plans.is_empty() {
|
||||
// final_plan = union_plan
|
||||
panic!("Unexpected error: There should be only one output for scan plan");
|
||||
panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans);
|
||||
}
|
||||
|
||||
final_plan
|
||||
Ok(final_plan)
|
||||
}
|
||||
|
||||
/// discover overlaps and split them into three groups:
|
||||
/// 1. vector of vector of overlapped chunks
|
||||
/// 2. vector of non-overlapped chunks, each have duplicates in itself
|
||||
/// 3. vectors of non-overlapped chunks without duplicates
|
||||
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<C>>) {
|
||||
// TODO: need to discover overlaps and split them
|
||||
// The current behavior is just like neither overlaps nor having duplicates in its own chunk
|
||||
//self.overlapped_chunks_set = vec![];
|
||||
//self.in_chunk_duplicates_chunks = vec![];
|
||||
self.no_duplicates_chunks.append(&mut chunks.to_vec());
|
||||
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<C>>) -> Result<()> {
|
||||
// Find all groups based on statstics
|
||||
let groups = group_potential_duplicates(chunks).context(InternalChunkGrouping)?;
|
||||
|
||||
for mut group in groups {
|
||||
if group.len() == 1 {
|
||||
if group[0].may_contain_pk_duplicates() {
|
||||
self.in_chunk_duplicates_chunks.append(&mut group);
|
||||
} else {
|
||||
self.no_duplicates_chunks.append(&mut group);
|
||||
}
|
||||
} else {
|
||||
self.overlapped_chunks_set.push(group)
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return true if all chunks are neither overlap nor has duplicates in itself
|
||||
|
@ -488,184 +534,56 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
|||
schema: ArrowSchemaRef,
|
||||
chunk: Arc<C>, // This chunk is identified having duplicates
|
||||
predicate: Predicate,
|
||||
) -> Arc<dyn ExecutionPlan> {
|
||||
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
// Create the bottom node IOxRedFilterNode for this chunk
|
||||
let mut input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
|
||||
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
|
||||
Arc::clone(&table_name),
|
||||
schema,
|
||||
vec![Arc::clone(&chunk)],
|
||||
predicate,
|
||||
));
|
||||
|
||||
|
||||
// Add the sort operator, SortExec, if needed
|
||||
if !chunk.is_sorted() {
|
||||
//let plan = Self::build_sort_plan(chunk, input);
|
||||
Self::build_sort_plan(chunk, input)
|
||||
|
||||
let key_summaries = chunk.primary_key_columns();
|
||||
// Create DeduplicateExc
|
||||
// TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646
|
||||
//plan = add_deduplicate_exec(plan);
|
||||
|
||||
//plan
|
||||
}
|
||||
|
||||
/// Add SortExec operator on top of the input plan of the given chunk
|
||||
/// The plan will be sorted on the chunk's primary key
|
||||
fn build_sort_plan(
|
||||
chunk: Arc<C>,
|
||||
input: Arc<dyn ExecutionPlan>,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
if !chunk.is_sorted() {
|
||||
let key_summaries = chunk.summary().primary_key_columns();
|
||||
|
||||
// build sort expression
|
||||
let mut sort_exprs = vec![];
|
||||
for key in key_summaries {
|
||||
sort_exprs.push(
|
||||
PhysicalSortExpr{
|
||||
expr: col(key.name.as_str()),
|
||||
options: SortOptions {
|
||||
descending: false,
|
||||
nulls_first: false,
|
||||
}
|
||||
}
|
||||
);
|
||||
sort_exprs.push(PhysicalSortExpr {
|
||||
expr: col(key.name.as_str()),
|
||||
options: SortOptions {
|
||||
descending: false,
|
||||
nulls_first: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Create SortExec operator on top of the IOxReadFilterNode
|
||||
let sort_exec = SortExec::try_new(sort_exprs, input);
|
||||
let sort_exec = match sort_exec {
|
||||
Ok(plan) => plan,
|
||||
Err(e) => panic!("Internal error while adding SortExec: {}", e) // This should never happens
|
||||
};
|
||||
input = Arc::new(sort_exec);
|
||||
// Create SortExec operator
|
||||
Ok(Arc::new(
|
||||
SortExec::try_new(sort_exprs, input).context(InternalSort)?,
|
||||
))
|
||||
} else {
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
// Create DeduplicateExc
|
||||
// TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646
|
||||
|
||||
input
|
||||
}
|
||||
|
||||
|
||||
//////////
|
||||
|
||||
// let sort_expr = expr
|
||||
// .iter()
|
||||
// .map(|e| match e {
|
||||
// Expr::Sort {
|
||||
// expr,
|
||||
// asc,
|
||||
// nulls_first,
|
||||
// } => self.create_physical_sort_expr(
|
||||
// expr,
|
||||
// &input_schema,
|
||||
// SortOptions {
|
||||
// descending: !*asc,
|
||||
// nulls_first: *nulls_first,
|
||||
// },
|
||||
// ctx_state,
|
||||
// ),
|
||||
// _ => Err(DataFusionError::Plan(
|
||||
// "Sort only accepts sort expressions".to_string(),
|
||||
// )),
|
||||
// })
|
||||
// .collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// Ok(Arc::new(SortExec::try_new(sort_expr, input)?))
|
||||
|
||||
///////////
|
||||
/////// TEST: create SortExec plan
|
||||
// #[tokio::test]
|
||||
// async fn test_lex_sort_by_float() -> Result<()> {
|
||||
// let schema = Arc::new(Schema::new(vec![
|
||||
// Field::new("a", DataType::Float32, true),
|
||||
// Field::new("b", DataType::Float64, true),
|
||||
// ]));
|
||||
|
||||
// // define data.
|
||||
// let batch = RecordBatch::try_new(
|
||||
// schema.clone(),
|
||||
// vec![
|
||||
// Arc::new(Float32Array::from(vec![
|
||||
// Some(f32::NAN),
|
||||
// None,
|
||||
// None,
|
||||
// Some(f32::NAN),
|
||||
// Some(1.0_f32),
|
||||
// Some(1.0_f32),
|
||||
// Some(2.0_f32),
|
||||
// Some(3.0_f32),
|
||||
// ])),
|
||||
// Arc::new(Float64Array::from(vec![
|
||||
// Some(200.0_f64),
|
||||
// Some(20.0_f64),
|
||||
// Some(10.0_f64),
|
||||
// Some(100.0_f64),
|
||||
// Some(f64::NAN),
|
||||
// None,
|
||||
// None,
|
||||
// Some(f64::NAN),
|
||||
// ])),
|
||||
// ],
|
||||
// )?;
|
||||
|
||||
// let sort_exec = Arc::new(SortExec::try_new(
|
||||
// vec![
|
||||
// PhysicalSortExpr {
|
||||
// expr: col("a"),
|
||||
// options: SortOptions {
|
||||
// descending: true,
|
||||
// nulls_first: true,
|
||||
// },
|
||||
// },
|
||||
// PhysicalSortExpr {
|
||||
// expr: col("b"),
|
||||
// options: SortOptions {
|
||||
// descending: false,
|
||||
// nulls_first: false,
|
||||
// },
|
||||
// },
|
||||
// ],
|
||||
// Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?),
|
||||
// )?);
|
||||
|
||||
// assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type());
|
||||
// assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type());
|
||||
|
||||
// let result: Vec<RecordBatch> = collect(sort_exec.clone()).await?;
|
||||
// assert!(sort_exec.metrics().get("sortTime").unwrap().value() > 0);
|
||||
// assert_eq!(sort_exec.metrics().get("outputRows").unwrap().value(), 8);
|
||||
// assert_eq!(result.len(), 1);
|
||||
|
||||
// let columns = result[0].columns();
|
||||
|
||||
// assert_eq!(DataType::Float32, *columns[0].data_type());
|
||||
// assert_eq!(DataType::Float64, *columns[1].data_type());
|
||||
|
||||
// let a = as_primitive_array::<Float32Type>(&columns[0]);
|
||||
// let b = as_primitive_array::<Float64Type>(&columns[1]);
|
||||
|
||||
// // convert result to strings to allow comparing to expected result containing NaN
|
||||
// let result: Vec<(Option<String>, Option<String>)> = (0..result[0].num_rows())
|
||||
// .map(|i| {
|
||||
// let aval = if a.is_valid(i) {
|
||||
// Some(a.value(i).to_string())
|
||||
// } else {
|
||||
// None
|
||||
// };
|
||||
// let bval = if b.is_valid(i) {
|
||||
// Some(b.value(i).to_string())
|
||||
// } else {
|
||||
// None
|
||||
// };
|
||||
// (aval, bval)
|
||||
// })
|
||||
// .collect();
|
||||
|
||||
// let expected: Vec<(Option<String>, Option<String>)> = vec![
|
||||
// (None, Some("10".to_owned())),
|
||||
// (None, Some("20".to_owned())),
|
||||
// (Some("NaN".to_owned()), Some("100".to_owned())),
|
||||
// (Some("NaN".to_owned()), Some("200".to_owned())),
|
||||
// (Some("3".to_owned()), Some("NaN".to_owned())),
|
||||
// (Some("2".to_owned()), None),
|
||||
// (Some("1".to_owned()), Some("NaN".to_owned())),
|
||||
// (Some("1".to_owned()), None),
|
||||
// ];
|
||||
|
||||
// assert_eq!(expected, result);
|
||||
|
||||
// Ok(())
|
||||
// }
|
||||
/////////////
|
||||
|
||||
/// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode
|
||||
/// ```text
|
||||
/// ┌─────────────────┐
|
||||
|
@ -718,8 +636,134 @@ impl<C: PartitionChunk> ChunkPruner<C> for NoOpPruner {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
|
||||
use arrow_util::assert_batches_eq;
|
||||
use datafusion::physical_plan::collect;
|
||||
use internal_types::schema::TIME_COLUMN_NAME;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sort() {
|
||||
|
||||
}
|
||||
use crate::test::TestChunk;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn chunk_grouping() {
|
||||
// This test just ensures that all the plumbing is connected
|
||||
// for chunk grouping. The logic of the grouping is tested
|
||||
// in the duplicate module
|
||||
|
||||
// c1: no overlaps
|
||||
let c1 = Arc::new(TestChunk::new(1).with_tag_column_with_stats("t", "tag1", "a", "b"));
|
||||
|
||||
// c2: over lap with c3
|
||||
let c2 = Arc::new(TestChunk::new(2).with_tag_column_with_stats("t", "tag1", "c", "d"));
|
||||
|
||||
// c3: overlap with c2
|
||||
let c3 = Arc::new(TestChunk::new(3).with_tag_column_with_stats("t", "tag1", "c", "d"));
|
||||
|
||||
// c4: self overlap
|
||||
let c4 = Arc::new(
|
||||
TestChunk::new(4)
|
||||
.with_tag_column_with_stats("t", "tag1", "e", "f")
|
||||
.with_may_contain_pk_duplicates(true),
|
||||
);
|
||||
|
||||
let mut deduplicator = Deduplicater::new();
|
||||
deduplicator
|
||||
.split_overlapped_chunks(vec![c1, c2, c3, c4])
|
||||
.expect("split chunks");
|
||||
|
||||
assert_eq!(
|
||||
chunk_group_ids(&deduplicator.overlapped_chunks_set),
|
||||
vec!["Group 0: 2, 3"]
|
||||
);
|
||||
assert_eq!(chunk_ids(&deduplicator.in_chunk_duplicates_chunks), "4");
|
||||
assert_eq!(chunk_ids(&deduplicator.no_duplicates_chunks), "1");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sort_planning() {
|
||||
// Chunk 1 with 5 rows of data
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_five_row_of_null_data("t"),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new(
|
||||
"tag1",
|
||||
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
|
||||
true,
|
||||
),
|
||||
Field::new(
|
||||
"tag2",
|
||||
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
|
||||
true,
|
||||
),
|
||||
Field::new("field_int", DataType::Int64, true),
|
||||
Field::new(
|
||||
TIME_COLUMN_NAME,
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, None),
|
||||
true,
|
||||
),
|
||||
]));
|
||||
|
||||
// IOx scan operator
|
||||
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
|
||||
Arc::from("t"),
|
||||
schema,
|
||||
vec![Arc::clone(&chunk)],
|
||||
Predicate::default(),
|
||||
));
|
||||
let batch = collect(Arc::clone(&input)).await.unwrap();
|
||||
// data in its original non-sorted form
|
||||
let expected = vec![
|
||||
"+------+------+-----------+-------------------------------+",
|
||||
"| tag1 | tag2 | field_int | time |",
|
||||
"+------+------+-----------+-------------------------------+",
|
||||
"| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
|
||||
"| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
|
||||
"| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
||||
"| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
|
||||
"| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
|
||||
"+------+------+-----------+-------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &batch);
|
||||
|
||||
// Add Sort operator on top of IOx scan
|
||||
let sort_plan = Deduplicater::build_sort_plan(chunk, input);
|
||||
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
||||
// data is not sorted on primary key(tag1, tag2, time)
|
||||
let expected = vec![
|
||||
"+------+------+-----------+-------------------------------+",
|
||||
"| tag1 | tag2 | field_int | time |",
|
||||
"+------+------+-----------+-------------------------------+",
|
||||
"| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
|
||||
"| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
||||
"| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
|
||||
"| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
|
||||
"| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
|
||||
"+------+------+-----------+-------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &batch);
|
||||
}
|
||||
|
||||
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
|
||||
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
|
||||
ids.join(", ")
|
||||
}
|
||||
|
||||
fn chunk_group_ids(groups: &[Vec<Arc<TestChunk>>]) -> Vec<String> {
|
||||
groups
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, group)| format!("Group {}: {}", idx, chunk_ids(group)))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
//! Implementation of statistics based pruning
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::{array::ArrayRef, datatypes::SchemaRef};
|
||||
use data_types::partition_metadata::{ColumnSummary, Statistics, TableSummary};
|
||||
use datafusion::{
|
||||
|
@ -20,6 +22,19 @@ pub trait Prunable: Sized {
|
|||
fn schema(&self) -> SchemaRef;
|
||||
}
|
||||
|
||||
impl<P> Prunable for Arc<P>
|
||||
where
|
||||
P: Prunable,
|
||||
{
|
||||
fn summary(&self) -> &TableSummary {
|
||||
self.as_ref().summary()
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.as_ref().schema()
|
||||
}
|
||||
}
|
||||
|
||||
/// Something that cares to be notified when pruning of chunks occurs
|
||||
pub trait PruningObserver {
|
||||
type Observed;
|
||||
|
|
|
@ -8,7 +8,10 @@ use arrow::{
|
|||
datatypes::{DataType, Int32Type, TimeUnit},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use data_types::{chunk_metadata::ChunkSummary, partition_metadata::{ColumnSummary, TableSummary}};
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkSummary,
|
||||
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
|
||||
};
|
||||
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream};
|
||||
|
||||
use crate::{
|
||||
|
@ -20,7 +23,7 @@ use crate::{exec::Executor, pruning::Prunable};
|
|||
use internal_types::{
|
||||
schema::{
|
||||
builder::{SchemaBuilder, SchemaMerger},
|
||||
Schema,
|
||||
InfluxColumnType, Schema,
|
||||
},
|
||||
selection::Selection,
|
||||
};
|
||||
|
@ -119,6 +122,9 @@ impl Database for TestDatabase {
|
|||
pub struct TestChunk {
|
||||
id: u32,
|
||||
|
||||
/// Set the flag if this chunk might contain duplicates
|
||||
may_contain_pk_duplicates: bool,
|
||||
|
||||
/// A copy of the captured predicates passed
|
||||
predicates: Mutex<Vec<Predicate>>,
|
||||
|
||||
|
@ -136,6 +142,9 @@ pub struct TestChunk {
|
|||
|
||||
/// Return value for apply_predicate, if desired
|
||||
predicate_match: Option<PredicateMatch>,
|
||||
|
||||
/// Return value for summary(), if desired
|
||||
table_summary: Option<TableSummary>,
|
||||
}
|
||||
|
||||
impl TestChunk {
|
||||
|
@ -173,6 +182,12 @@ impl TestChunk {
|
|||
self.with_tag_column(table_name, "dummy_col")
|
||||
}
|
||||
|
||||
/// Set the `may_contain_pk_duplicates` flag
|
||||
pub fn with_may_contain_pk_duplicates(mut self, v: bool) -> Self {
|
||||
self.may_contain_pk_duplicates = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Register an tag column with the test chunk
|
||||
pub fn with_tag_column(
|
||||
self,
|
||||
|
@ -189,6 +204,38 @@ impl TestChunk {
|
|||
self.add_schema_to_table(table_name, new_column_schema)
|
||||
}
|
||||
|
||||
/// Register an tag column with the test chunk
|
||||
pub fn with_tag_column_with_stats(
|
||||
self,
|
||||
table_name: impl Into<String>,
|
||||
column_name: impl Into<String>,
|
||||
min: &str,
|
||||
max: &str,
|
||||
) -> Self {
|
||||
let table_name = table_name.into();
|
||||
let column_name = column_name.into();
|
||||
|
||||
let mut new_self = self.with_tag_column(&table_name, &column_name);
|
||||
|
||||
// Now, find the appropriate column summary and update the stats
|
||||
let column_summary: &mut ColumnSummary = new_self
|
||||
.table_summary
|
||||
.as_mut()
|
||||
.expect("had table summary")
|
||||
.columns
|
||||
.iter_mut()
|
||||
.find(|c| c.name == column_name)
|
||||
.expect("had column");
|
||||
|
||||
column_summary.stats = Statistics::String(StatValues {
|
||||
min: Some(min.to_string()),
|
||||
max: Some(max.to_string()),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
new_self
|
||||
}
|
||||
|
||||
/// Register a timetamp column with the test chunk
|
||||
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
|
||||
let table_name = table_name.into();
|
||||
|
@ -226,7 +273,37 @@ impl TestChunk {
|
|||
if let Some(existing_name) = &self.table_name {
|
||||
assert_eq!(&table_name, existing_name);
|
||||
}
|
||||
self.table_name = Some(table_name);
|
||||
self.table_name = Some(table_name.clone());
|
||||
|
||||
// assume the new schema has exactly a single table
|
||||
assert_eq!(new_column_schema.len(), 1);
|
||||
let (col_type, new_field) = new_column_schema.field(0);
|
||||
|
||||
let influxdb_type = col_type.map(|t| match t {
|
||||
InfluxColumnType::Tag => InfluxDbType::Tag,
|
||||
InfluxColumnType::Field(_) => InfluxDbType::Field,
|
||||
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
|
||||
});
|
||||
|
||||
let stats = match new_field.data_type() {
|
||||
DataType::Boolean => Statistics::Bool(StatValues::default()),
|
||||
DataType::Int64 => Statistics::I64(StatValues::default()),
|
||||
DataType::UInt64 => Statistics::U64(StatValues::default()),
|
||||
DataType::Utf8 => Statistics::String(StatValues::default()),
|
||||
DataType::Dictionary(_, value_type) => {
|
||||
assert!(matches!(**value_type, DataType::Utf8));
|
||||
Statistics::String(StatValues::default())
|
||||
}
|
||||
DataType::Float64 => Statistics::String(StatValues::default()),
|
||||
DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()),
|
||||
_ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()),
|
||||
};
|
||||
|
||||
let column_summary = ColumnSummary {
|
||||
name: new_field.name().clone(),
|
||||
influxdb_type,
|
||||
stats,
|
||||
};
|
||||
|
||||
let mut merger = SchemaMerger::new().merge(new_column_schema).unwrap();
|
||||
|
||||
|
@ -238,6 +315,14 @@ impl TestChunk {
|
|||
let new_schema = merger.build().unwrap();
|
||||
|
||||
self.table_schema = Some(new_schema);
|
||||
|
||||
let mut table_summary = self
|
||||
.table_summary
|
||||
.take()
|
||||
.unwrap_or_else(|| TableSummary::new(table_name));
|
||||
table_summary.columns.push(column_summary);
|
||||
self.table_summary = Some(table_summary);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -278,6 +363,59 @@ impl TestChunk {
|
|||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||
println!("TestChunk batch data: {:#?}", batch);
|
||||
|
||||
self.table_data.push(Arc::new(batch));
|
||||
self
|
||||
}
|
||||
|
||||
/// Prepares this chunk to return a specific record batch with five
|
||||
/// rows of non null data that look like
|
||||
/// "+------+------+-----------+-------------------------------+",
|
||||
/// "| tag1 | tag2 | field_int | time |",
|
||||
/// "+------+------+-----------+-------------------------------+",
|
||||
/// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
|
||||
/// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
|
||||
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
||||
/// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
|
||||
/// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
|
||||
/// "+------+------+-----------+-------------------------------+",
|
||||
pub fn with_five_row_of_null_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||
//let table_name = table_name.into();
|
||||
let schema = self
|
||||
.table_schema
|
||||
.as_ref()
|
||||
.expect("table must exist in TestChunk");
|
||||
|
||||
// create arrays
|
||||
let columns = schema
|
||||
.iter()
|
||||
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||
DataType::Int64 => {
|
||||
Arc::new(Int64Array::from(vec![1000, 10, 70, 100, 5])) as ArrayRef
|
||||
}
|
||||
DataType::Utf8 => {
|
||||
Arc::new(StringArray::from(vec!["MA", "MT", "CT", "AL", "MT"])) as ArrayRef
|
||||
}
|
||||
DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new(
|
||||
TimestampNanosecondArray::from_vec(vec![1000, 7000, 100, 50, 5000], None),
|
||||
) as ArrayRef,
|
||||
DataType::Dictionary(key, value)
|
||||
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||
{
|
||||
let dict: DictionaryArray<Int32Type> =
|
||||
vec!["MA", "MT", "CT", "AL", "MT"].into_iter().collect();
|
||||
Arc::new(dict) as ArrayRef
|
||||
}
|
||||
_ => unimplemented!(
|
||||
"Unimplemented data type for test database: {:?}",
|
||||
field.data_type()
|
||||
),
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||
println!("TestChunk batch data: {:#?}", batch);
|
||||
|
||||
self.table_data.push(Arc::new(batch));
|
||||
self
|
||||
|
@ -320,6 +458,10 @@ impl PartitionChunk for TestChunk {
|
|||
self.table_name.as_deref().unwrap()
|
||||
}
|
||||
|
||||
fn may_contain_pk_duplicates(&self) -> bool {
|
||||
self.may_contain_pk_duplicates
|
||||
}
|
||||
|
||||
fn read_filter(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
|
@ -334,17 +476,11 @@ impl PartitionChunk for TestChunk {
|
|||
let stream = SizedRecordBatchStream::new(batches[0].schema(), batches);
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
fn has_duplicates(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns true if data of this chunk is sorted
|
||||
fn is_sorted(&self) -> bool {
|
||||
false
|
||||
}
|
||||
fn primary_key_columns(&self) -> Vec<&ColumnSummary> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
|
||||
self.check_error()?;
|
||||
|
@ -414,7 +550,9 @@ impl PartitionChunk for TestChunk {
|
|||
|
||||
impl Prunable for TestChunk {
|
||||
fn summary(&self) -> &TableSummary {
|
||||
unimplemented!();
|
||||
self.table_summary
|
||||
.as_ref()
|
||||
.expect("Table summary not configured for TestChunk")
|
||||
}
|
||||
|
||||
fn schema(&self) -> arrow::datatypes::SchemaRef {
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::{
|
|||
};
|
||||
|
||||
use arrow::datatypes::SchemaRef;
|
||||
use data_types::partition_metadata::{self, ColumnSummary};
|
||||
use data_types::partition_metadata;
|
||||
use partition_metadata::TableSummary;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
|
@ -217,6 +217,13 @@ impl PartitionChunk for DbChunk {
|
|||
self.table_name.as_ref()
|
||||
}
|
||||
|
||||
fn may_contain_pk_duplicates(&self) -> bool {
|
||||
// Assume that the MUB can contain duplicates as it has the
|
||||
// raw incoming stream of writes, but that all other types of
|
||||
// chunks are deduplicated as part of creation
|
||||
matches!(self.state, State::ReadBuffer { .. })
|
||||
}
|
||||
|
||||
fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
|
||||
if !predicate.should_include_table(self.table_name().as_ref()) {
|
||||
return Ok(PredicateMatch::Zero);
|
||||
|
@ -443,25 +450,14 @@ impl PartitionChunk for DbChunk {
|
|||
}
|
||||
}
|
||||
|
||||
fn has_duplicates(&self) -> bool {
|
||||
match &self.state {
|
||||
State::MutableBuffer { .. } => true,
|
||||
State::ReadBuffer { .. } => true, // TODO: should be false after compaction
|
||||
State::ParquetFile { .. } => true, // TODO: should be false after compaction
|
||||
}
|
||||
}
|
||||
|
||||
// TODOs: return the right value. For now the chunk is assumed to be not sorted
|
||||
fn is_sorted(&self) -> bool {
|
||||
match &self.state {
|
||||
State::MutableBuffer { .. } => false,
|
||||
State::ReadBuffer { .. } => false,
|
||||
State::ReadBuffer { .. } => false,
|
||||
State::ParquetFile { .. } => false,
|
||||
}
|
||||
}
|
||||
fn primary_key_columns(&self) -> Vec<&ColumnSummary> {
|
||||
self.meta.table_summary.primary_key_columns()
|
||||
}
|
||||
}
|
||||
|
||||
impl Prunable for DbChunk {
|
||||
|
|
Loading…
Reference in New Issue