Merge pull request #1662 from influxdata/alamb/hook_up_grouping

feat: Hook up chunk grouping into provider
pull/24376/head
kodiakhq[bot] 2021-06-08 19:01:58 +00:00 committed by GitHub
commit afcaa5d7a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 226 additions and 17 deletions

View File

@ -66,6 +66,10 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
/// Returns the name of the table stored in this chunk
fn table_name(&self) -> &str;
/// Returns true if the chunk may contain a duplicate "primary
/// key" within itself
fn may_contain_pk_duplicates(&self) -> bool;
/// Returns the result of applying the `predicate` to the chunk
/// using an efficient, but inexact method, based on metadata.
///

View File

@ -2,7 +2,7 @@
use std::sync::Arc;
use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
use datafusion::{
datasource::{
datasource::{Statistics, TableProviderFilterPushDown},
@ -16,6 +16,7 @@ use internal_types::schema::{builder::SchemaMerger, Schema};
use observability_deps::tracing::debug;
use crate::{
duplicate::group_potential_duplicates,
predicate::{Predicate, PredicateBuilder},
util::project_schema,
PartitionChunk,
@ -58,9 +59,26 @@ pub enum Error {
InternalPushdownPredicate {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error: Can not group chunks '{}'", source,))]
InternalChunkGrouping { source: crate::duplicate::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<Error> for ArrowError {
// Wrap an error into an arrow error
fn from(e: Error) -> Self {
Self::ExternalError(Box::new(e))
}
}
impl From<Error> for DataFusionError {
// Wrap an error into a datafusion error
fn from(e: Error) -> Self {
Self::ArrowError(e.into())
}
}
/// Something that can prune chunks based on their metadata
pub trait ChunkPruner<C: PartitionChunk>: Sync + Send + std::fmt::Debug {
/// prune `chunks`, if possible, based on predicate.
@ -249,7 +267,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
scan_schema,
chunks,
predicate,
);
)?;
Ok(plan)
}
@ -343,9 +361,18 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
schema: ArrowSchemaRef,
chunks: Vec<Arc<C>>,
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
//finding overlapped chunks and put them into the right group
self.split_overlapped_chunks(chunks.to_vec());
) -> Result<Arc<dyn ExecutionPlan>> {
// findi overlapped chunks and put them into the right group
self.split_overlapped_chunks(chunks.to_vec())?;
// TEMP until the rest of this module's code is complete:
// merge all plans into the same
self.no_duplicates_chunks
.append(&mut self.in_chunk_duplicates_chunks);
for mut group in &mut self.overlapped_chunks_set {
self.no_duplicates_chunks.append(&mut group);
}
self.overlapped_chunks_set.clear();
// Building plans
let mut plans = vec![];
@ -396,22 +423,32 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
// There are still plan, add UnionExec
if !plans.is_empty() {
// final_plan = union_plan
panic!("Unexpected error: There should be only one output for scan plan");
panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans);
}
final_plan
Ok(final_plan)
}
/// discover overlaps and split them into three groups:
/// 1. vector of vector of overlapped chunks
/// 2. vector of non-overlapped chunks, each have duplicates in itself
/// 3. vectors of non-overlapped chunks without duplicates
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<C>>) {
// TODO: need to discover overlaps and split them
// The current behavior is just like neither overlaps nor having duplicates in its own chunk
//self.overlapped_chunks_set = vec![];
//self.in_chunk_duplicates_chunks = vec![];
self.no_duplicates_chunks.append(&mut chunks.to_vec());
fn split_overlapped_chunks(&mut self, chunks: Vec<Arc<C>>) -> Result<()> {
// Find all groups based on statstics
let groups = group_potential_duplicates(chunks).context(InternalChunkGrouping)?;
for mut group in groups {
if group.len() == 1 {
if group[0].may_contain_pk_duplicates() {
self.in_chunk_duplicates_chunks.append(&mut group);
} else {
self.no_duplicates_chunks.append(&mut group);
}
} else {
self.overlapped_chunks_set.push(group)
}
}
Ok(())
}
/// Return true if all chunks are neither overlap nor has duplicates in itself
@ -549,3 +586,58 @@ impl<C: PartitionChunk> ChunkPruner<C> for NoOpPruner {
chunks
}
}
#[cfg(test)]
mod test {
use crate::test::TestChunk;
use super::*;
#[test]
fn chunk_grouping() {
// This test just ensures that all the plumbing is connected
// for chunk grouping. The logic of the grouping is tested
// in the duplicate module
// c1: no overlaps
let c1 = Arc::new(TestChunk::new(1).with_tag_column_with_stats("t", "tag1", "a", "b"));
// c2: over lap with c3
let c2 = Arc::new(TestChunk::new(2).with_tag_column_with_stats("t", "tag1", "c", "d"));
// c3: overlap with c2
let c3 = Arc::new(TestChunk::new(3).with_tag_column_with_stats("t", "tag1", "c", "d"));
// c4: self overlap
let c4 = Arc::new(
TestChunk::new(4)
.with_tag_column_with_stats("t", "tag1", "e", "f")
.with_may_contain_pk_duplicates(true),
);
let mut deduplicator = Deduplicater::new();
deduplicator
.split_overlapped_chunks(vec![c1, c2, c3, c4])
.expect("split chunks");
assert_eq!(
chunk_group_ids(&deduplicator.overlapped_chunks_set),
vec!["Group 0: 2, 3"]
);
assert_eq!(chunk_ids(&deduplicator.in_chunk_duplicates_chunks), "4");
assert_eq!(chunk_ids(&deduplicator.no_duplicates_chunks), "1");
}
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
ids.join(", ")
}
fn chunk_group_ids(groups: &[Vec<Arc<TestChunk>>]) -> Vec<String> {
groups
.iter()
.enumerate()
.map(|(idx, group)| format!("Group {}: {}", idx, chunk_ids(group)))
.collect()
}
}

View File

@ -1,4 +1,6 @@
//! Implementation of statistics based pruning
use std::sync::Arc;
use arrow::{array::ArrayRef, datatypes::SchemaRef};
use data_types::partition_metadata::{ColumnSummary, Statistics, TableSummary};
use datafusion::{
@ -20,6 +22,19 @@ pub trait Prunable: Sized {
fn schema(&self) -> SchemaRef;
}
impl<P> Prunable for Arc<P>
where
P: Prunable,
{
fn summary(&self) -> &TableSummary {
self.as_ref().summary()
}
fn schema(&self) -> SchemaRef {
self.as_ref().schema()
}
}
/// Something that cares to be notified when pruning of chunks occurs
pub trait PruningObserver {
type Observed;

View File

@ -8,7 +8,10 @@ use arrow::{
datatypes::{DataType, Int32Type, TimeUnit},
record_batch::RecordBatch,
};
use data_types::{chunk_metadata::ChunkSummary, partition_metadata::TableSummary};
use data_types::{
chunk_metadata::ChunkSummary,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
};
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream};
use crate::{
@ -20,7 +23,7 @@ use crate::{exec::Executor, pruning::Prunable};
use internal_types::{
schema::{
builder::{SchemaBuilder, SchemaMerger},
Schema,
InfluxColumnType, Schema,
},
selection::Selection,
};
@ -119,6 +122,9 @@ impl Database for TestDatabase {
pub struct TestChunk {
id: u32,
/// Set the flag if this chunk might contain duplicates
may_contain_pk_duplicates: bool,
/// A copy of the captured predicates passed
predicates: Mutex<Vec<Predicate>>,
@ -136,6 +142,9 @@ pub struct TestChunk {
/// Return value for apply_predicate, if desired
predicate_match: Option<PredicateMatch>,
/// Return value for summary(), if desired
table_summary: Option<TableSummary>,
}
impl TestChunk {
@ -173,6 +182,12 @@ impl TestChunk {
self.with_tag_column(table_name, "dummy_col")
}
/// Set the `may_contain_pk_duplicates` flag
pub fn with_may_contain_pk_duplicates(mut self, v: bool) -> Self {
self.may_contain_pk_duplicates = v;
self
}
/// Register an tag column with the test chunk
pub fn with_tag_column(
self,
@ -189,6 +204,38 @@ impl TestChunk {
self.add_schema_to_table(table_name, new_column_schema)
}
/// Register an tag column with the test chunk
pub fn with_tag_column_with_stats(
self,
table_name: impl Into<String>,
column_name: impl Into<String>,
min: &str,
max: &str,
) -> Self {
let table_name = table_name.into();
let column_name = column_name.into();
let mut new_self = self.with_tag_column(&table_name, &column_name);
// Now, find the appropriate column summary and update the stats
let column_summary: &mut ColumnSummary = new_self
.table_summary
.as_mut()
.expect("had table summary")
.columns
.iter_mut()
.find(|c| c.name == column_name)
.expect("had column");
column_summary.stats = Statistics::String(StatValues {
min: Some(min.to_string()),
max: Some(max.to_string()),
..Default::default()
});
new_self
}
/// Register a timetamp column with the test chunk
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
let table_name = table_name.into();
@ -226,7 +273,37 @@ impl TestChunk {
if let Some(existing_name) = &self.table_name {
assert_eq!(&table_name, existing_name);
}
self.table_name = Some(table_name);
self.table_name = Some(table_name.clone());
// assume the new schema has exactly a single table
assert_eq!(new_column_schema.len(), 1);
let (col_type, new_field) = new_column_schema.field(0);
let influxdb_type = col_type.map(|t| match t {
InfluxColumnType::Tag => InfluxDbType::Tag,
InfluxColumnType::Field(_) => InfluxDbType::Field,
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
});
let stats = match new_field.data_type() {
DataType::Boolean => Statistics::Bool(StatValues::default()),
DataType::Int64 => Statistics::I64(StatValues::default()),
DataType::UInt64 => Statistics::U64(StatValues::default()),
DataType::Utf8 => Statistics::String(StatValues::default()),
DataType::Dictionary(_, value_type) => {
assert!(matches!(**value_type, DataType::Utf8));
Statistics::String(StatValues::default())
}
DataType::Float64 => Statistics::String(StatValues::default()),
DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()),
_ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()),
};
let column_summary = ColumnSummary {
name: new_field.name().clone(),
influxdb_type,
stats,
};
let mut merger = SchemaMerger::new().merge(new_column_schema).unwrap();
@ -238,6 +315,14 @@ impl TestChunk {
let new_schema = merger.build().unwrap();
self.table_schema = Some(new_schema);
let mut table_summary = self
.table_summary
.take()
.unwrap_or_else(|| TableSummary::new(table_name));
table_summary.columns.push(column_summary);
self.table_summary = Some(table_summary);
self
}
@ -320,6 +405,10 @@ impl PartitionChunk for TestChunk {
self.table_name.as_deref().unwrap()
}
fn may_contain_pk_duplicates(&self) -> bool {
self.may_contain_pk_duplicates
}
fn read_filter(
&self,
predicate: &Predicate,
@ -403,7 +492,9 @@ impl PartitionChunk for TestChunk {
impl Prunable for TestChunk {
fn summary(&self) -> &TableSummary {
unimplemented!();
self.table_summary
.as_ref()
.expect("Table summary not configured for TestChunk")
}
fn schema(&self) -> arrow::datatypes::SchemaRef {

View File

@ -217,6 +217,13 @@ impl PartitionChunk for DbChunk {
self.table_name.as_ref()
}
fn may_contain_pk_duplicates(&self) -> bool {
// Assume that the MUB can contain duplicates as it has the
// raw incoming stream of writes, but that all other types of
// chunks are deduplicated as part of creation
matches!(self.state, State::ReadBuffer { .. })
}
fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
if !predicate.should_include_table(self.table_name().as_ref()) {
return Ok(PredicateMatch::Zero);