Merge branch 'main' into crepererum/extract_server_init

pull/24376/head
kodiakhq[bot] 2021-06-10 07:14:57 +00:00 committed by GitHub
commit 5f863a59fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 374 additions and 35 deletions

View File

@ -138,30 +138,95 @@ where
}
}
/// Returns true if the chunk has a potential primary key overlap with the other chunk
/// Returns true if the chunk has a potential primary key overlap
/// with the other chunk.
///
/// This this algorithm is O(2^N) in the worst case. However, the
/// pathological case is where two chunks each have a large
/// numbers of tags that have no overlap, which seems unlikely in
/// the real world.
///
/// Note this algoritm is quite conservative (in that it will
/// assume that any column can contain nulls) and thus can match
/// with chunks that do not have that column. for example
///
/// Chunk 1: tag_a
/// Chunk 2: tag_a, tag_b
///
/// In this case Chunk 2 has values for tag_b but Chunk 1
/// doesn't have any values in tag_b (its values are implicitly
/// null)
///
/// If Chunk 2 has any null values in the tag_b column, it could
/// overlap with Chunk 1 (as logically there can be rows with
/// (tag_a = NULL, tag_b = NULL) in both chunks
///
/// We could make this algorithm significantly less conservative
/// if we stored the Null count in the ColumnSummary (and thus
/// could rule out matches with columns that were not present) if
/// there were no NULLs
fn potential_overlap(&self, other: &Self) -> Result<bool> {
// in order to have overlap, *all* the columns in the sort order
// need to be the same. Note gaps in the sort order mean they
// are for different parts of the keyspace
if self.key_summaries.len() != other.key_summaries.len() {
// Short circuit on different lengths
return Ok(false);
}
// This algorithm assumes that the keys are sorted by name (so
// they can't appear in different orders on the two sides)
debug_assert!(self
.key_summaries
.windows(2)
.all(|s| s[0].name <= s[1].name));
debug_assert!(other
.key_summaries
.windows(2)
.all(|s| s[0].name <= s[1].name));
self.potential_overlap_impl(0, other, 0)
}
let iter = self.key_summaries.iter().zip(other.key_summaries.iter());
for (s1, s2) in iter {
if s1.name != s2.name || !Self::columns_might_overlap(s1, s2)? {
return Ok(false);
// Checks the remainder of self.columns[self_idx..] and
// other.columns[..other_idx] if they are compatible
fn potential_overlap_impl(
&self,
mut self_idx: usize,
other: &Self,
mut other_idx: usize,
) -> Result<bool> {
loop {
let s1 = self.key_summaries.get(self_idx);
let s2 = other.key_summaries.get(other_idx);
if let (Some(s1), Some(s2)) = (s1, s2) {
if s1.name == s2.name {
// pk matched in this position, so check values. If we
// find no overlap, know this is false, otherwise need to keep checking
if Self::columns_might_overlap(s1, s2)? {
self_idx += 1;
other_idx += 1;
} else {
return Ok(false);
}
} else {
// name didn't match, so try and find the next
// place it does. Since there may be missing keys
// in each side, need to check each in turn
//
// Note this will result in O(num_tags) stack
// frames in the worst case, but we expect the
// number of tags to be relatively small (~20 at
// the time of this writing)
return Ok(self.potential_overlap_impl(self_idx + 1, other, other_idx)?
|| self.potential_overlap_impl(self_idx, other, other_idx + 1)?);
}
} else {
// ran out of columns to check on one side, assume the
// other could have nulls all the way down (due to null
// assumption)
return Ok(true);
}
}
Ok(true)
}
/// Returns true if the two columns MAY overlap other, based on
/// statistics
pub fn columns_might_overlap(s1: &ColumnSummary, s2: &ColumnSummary) -> Result<bool> {
use Statistics::*;
let overlap = match (&s1.stats, &s2.stats) {
(I64(s1), I64(s2)) => s1.overlaps(s2),
(U64(s1), U64(s2)) => s1.overlaps(s2),
@ -176,14 +241,15 @@ where
}
};
// If either column has no min/max, treat the column as
// being entirely null
// If either column has no min/max, treat the column as being
// entirely null, meaning that it could overlap the other
// stats if it had nulls.
let is_none = s1.stats.is_none() || s2.stats.is_none();
match overlap {
StatOverlap::NonZero => Ok(true),
StatOverlap::Zero => Ok(false),
StatOverlap::Unknown if is_none => Ok(false), // no stats means no values
StatOverlap::Unknown if is_none => Ok(true),
// This case means there some stats, but not all.
// Unclear how this could happen, so throw an error for now
StatOverlap::Unknown => InternalPartialStatistics {
@ -313,7 +379,27 @@ mod test {
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
// the overlap could come when (tag1 = NULL, tag2=NULL) which
// could exist in either chunk
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
#[test]
fn different_tag_names_multi_tags() {
// check that if chunks overlap but in different tag names
let c1 = TestChunk::new("chunk1")
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", Some("aaa"), Some("bbb"));
let c2 = TestChunk::new("chunk2")
.with_tag("tag2", Some("aaa"), Some("bbb"))
.with_tag("tag3", Some("aaa"), Some("bbb"));
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
// the overlap could come when (tag1 = NULL, tag2, tag3=NULL)
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
@ -370,12 +456,13 @@ mod test {
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, but no tag2 (aka it is all null)
// so it could overlap if there was a null tag2 value in chunk1
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_timestamp(500, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
@ -387,15 +474,15 @@ mod test {
.with_timestamp(0, 1000);
let c2 = TestChunk::new("chunk2")
// tag1 and timestamp overlap, tag2 has no stats (null)
// so we say they can't overlap
// tag1 and timestamp overlap, tag2 has no stats (is all null)
// so they might overlap if chunk1 had a null in tag 2
.with_tag("tag1", Some("aaa"), Some("bbb"))
.with_tag("tag2", None, None)
.with_timestamp(500, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
@ -445,7 +532,7 @@ mod test {
#[test]
fn mismatched_types() {
// Test if same column has different types in different
// When the same column has different types in different
// chunks; this will likely cause errors elsewhere in practice
// as the schemas are incompatible (and can't be merged)
let c1 = TestChunk::new("chunk1")
@ -454,14 +541,14 @@ mod test {
let c2 = TestChunk::new("chunk2")
// tag1 column is actually a field is different in chunk
// 2, so even though the timestamps overlap these chunks
// don't have duplicates
// 2, so since the timestamps overlap these chunks
// might also have duplicates (if tag1 was null in c1)
.with_int_field("tag1", Some(100), Some(200))
.with_timestamp(0, 1000);
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
let expected = vec!["Group 0: [chunk1]", "Group 1: [chunk2]"];
let expected = vec!["Group 0: [chunk1, chunk2]"];
assert_groups_eq!(expected, groups);
}
@ -477,7 +564,7 @@ mod test {
}
/// Mocked out prunable provider to use testing overlaps
#[derive(Debug)]
#[derive(Debug, Clone)]
struct TestChunk {
// The name of this chunk
name: String,

View File

@ -122,6 +122,9 @@ pub trait PartitionChunk: Prunable + Debug + Send + Sync {
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, Self::Error>;
/// Returns true if data of this chunk is sorted
fn is_sorted_on_pk(&self) -> bool;
}
#[async_trait]

View File

@ -2,7 +2,7 @@
use std::sync::Arc;
use arrow::{datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
use arrow::{compute::SortOptions, datatypes::SchemaRef as ArrowSchemaRef, error::ArrowError};
use datafusion::{
datasource::{
datasource::{Statistics, TableProviderFilterPushDown},
@ -10,7 +10,11 @@ use datafusion::{
},
error::{DataFusionError, Result as DataFusionResult},
logical_plan::Expr,
physical_plan::ExecutionPlan,
physical_plan::{
expressions::{col, PhysicalSortExpr},
sort::SortExec,
ExecutionPlan,
},
};
use internal_types::schema::{merge::SchemaMerger, Schema};
use observability_deps::tracing::debug;
@ -50,6 +54,11 @@ pub enum Error {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error adding sort operator '{}'", source,))]
InternalSort {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error: Can not group chunks '{}'", source,))]
InternalChunkGrouping { source: crate::duplicate::Error },
}
@ -374,7 +383,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
Arc::clone(&schema),
chunk_with_duplicates.to_owned(),
predicate.clone(),
));
)?);
}
// Go over non_duplicates_chunks, build a plan for it
@ -495,14 +504,54 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
schema: ArrowSchemaRef,
chunk: Arc<C>, // This chunk is identified having duplicates
predicate: Predicate,
) -> Arc<dyn ExecutionPlan> {
// // TODO
// // Currently return just like there are no overlaps, no duplicates
Arc::new(IOxReadFilterNode::new(
) -> Result<Arc<dyn ExecutionPlan>> {
// Create the bottom node IOxReadFilterNode for this chunk
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
Arc::clone(&table_name),
schema,
vec![chunk],
vec![Arc::clone(&chunk)],
predicate,
));
// Add the sort operator, SortExec, if needed
//let plan = Self::build_sort_plan(chunk, input);
Self::build_sort_plan(chunk, input)
// Create DeduplicateExc
// TODO: Add DeuplicateExec here when it is implemented in https://github.com/influxdata/influxdb_iox/issues/1646
//plan = add_deduplicate_exec(plan);
//plan
}
/// Add SortExec operator on top of the input plan of the given chunk
/// The plan will be sorted on the chunk's primary key
fn build_sort_plan(
chunk: Arc<C>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
if chunk.is_sorted_on_pk() {
return Ok(input);
}
// Sort the chunk on pk
let key_summaries = chunk.summary().primary_key_columns();
// build sort expression
let mut sort_exprs = vec![];
for key in key_summaries {
sort_exprs.push(PhysicalSortExpr {
expr: col(key.name.as_str()),
options: SortOptions {
descending: false,
nulls_first: false,
},
});
}
// Create SortExec operator
Ok(Arc::new(
SortExec::try_new(sort_exprs, input).context(InternalSort)?,
))
}
@ -560,6 +609,10 @@ impl<C: PartitionChunk> ChunkPruner<C> for NoOpPruner {
#[cfg(test)]
mod test {
use arrow_util::assert_batches_eq;
use datafusion::physical_plan::collect;
use internal_types::selection::Selection;
use crate::test::TestChunk;
use super::*;
@ -599,6 +652,115 @@ mod test {
assert_eq!(chunk_ids(&deduplicator.no_duplicates_chunks), "1");
}
#[tokio::test]
async fn sort_planning_one_tag_with_time() {
// Chunk 1 with 5 rows of data
let chunk = Arc::new(
TestChunk::new(1)
.with_time_column("t")
.with_tag_column("t", "tag1")
.with_int_field_column("t", "field_int")
.with_five_rows_of_data("t"),
);
// Datafusion schema of the chunk
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
// IOx scan operator
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
Arc::from("t"),
schema,
vec![Arc::clone(&chunk)],
Predicate::default(),
));
let batch = collect(Arc::clone(&input)).await.unwrap();
// data in its original non-sorted form
let expected = vec![
"+-----------+------+-------------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-------------------------------+",
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
"+-----------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
// Add Sort operator on top of IOx scan
let sort_plan = Deduplicater::build_sort_plan(chunk, input);
let batch = collect(sort_plan.unwrap()).await.unwrap();
// data is not sorted on primary key(tag1, tag2, time)
let expected = vec![
"+-----------+------+-------------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+-------------------------------+",
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
"+-----------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
}
#[tokio::test]
async fn sort_planning_two_tags_with_time() {
// Chunk 1 with 5 rows of data
let chunk = Arc::new(
TestChunk::new(1)
.with_time_column("t")
.with_tag_column("t", "tag1")
.with_tag_column("t", "tag2")
.with_int_field_column("t", "field_int")
.with_five_rows_of_data("t"),
);
// Datafusion schema of the chunk
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
// IOx scan operator
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
Arc::from("t"),
schema,
vec![Arc::clone(&chunk)],
Predicate::default(),
));
let batch = collect(Arc::clone(&input)).await.unwrap();
// data in its original non-sorted form
let expected = vec![
"+-----------+------+------+-------------------------------+",
"| field_int | tag1 | tag2 | time |",
"+-----------+------+------+-------------------------------+",
"| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |",
"| 10 | MT | AL | 1970-01-01 00:00:00.000007 |",
"| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |",
"| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |",
"| 5 | MT | AL | 1970-01-01 00:00:00.000005 |",
"+-----------+------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
// Add Sort operator on top of IOx scan
let sort_plan = Deduplicater::build_sort_plan(chunk, input);
let batch = collect(sort_plan.unwrap()).await.unwrap();
// data is not sorted on primary key(tag1, tag2, time)
let expected = vec![
"+-----------+------+------+-------------------------------+",
"| field_int | tag1 | tag2 | time |",
"+-----------+------+------+-------------------------------+",
"| 100 | AL | MA | 1970-01-01 00:00:00.000000050 |",
"| 70 | CT | CT | 1970-01-01 00:00:00.000000100 |",
"| 5 | MT | AL | 1970-01-01 00:00:00.000005 |",
"| 10 | MT | AL | 1970-01-01 00:00:00.000007 |",
"| 1000 | MT | CT | 1970-01-01 00:00:00.000001 |",
"+-----------+------+------+-------------------------------+",
];
assert_batches_eq!(&expected, &batch);
}
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
ids.join(", ")

View File

@ -360,6 +360,79 @@ impl TestChunk {
})
.collect::<Vec<_>>();
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
println!("TestChunk batch data: {:#?}", batch);
self.table_data.push(Arc::new(batch));
self
}
/// Prepares this chunk to return a specific record batch with five
/// rows of non null data that look like
/// "+------+------+-----------+-------------------------------+",
/// "| tag1 | tag2 | field_int | time |",
/// "+------+------+-----------+-------------------------------+",
/// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
/// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
/// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
/// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
/// "+------+------+-----------+-------------------------------+",
pub fn with_five_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
//let table_name = table_name.into();
let schema = self
.table_schema
.as_ref()
.expect("table must exist in TestChunk");
// create arrays
let columns = schema
.iter()
.map(|(_influxdb_column_type, field)| match field.data_type() {
DataType::Int64 => {
Arc::new(Int64Array::from(vec![1000, 10, 70, 100, 5])) as ArrayRef
}
DataType::Utf8 => {
match field.name().as_str() {
"tag1" => Arc::new(StringArray::from(vec!["MT", "MT", "CT", "AL", "MT"]))
as ArrayRef,
"tag2" => Arc::new(StringArray::from(vec!["CT", "AL", "CT", "MA", "AL"]))
as ArrayRef,
_ => Arc::new(StringArray::from(vec!["CT", "MT", "AL", "AL", "MT"]))
as ArrayRef,
}
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new(
TimestampNanosecondArray::from_vec(vec![1000, 7000, 100, 50, 5000], None),
) as ArrayRef,
DataType::Dictionary(key, value)
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
{
match field.name().as_str() {
"tag1" => Arc::new(
vec!["MT", "MT", "CT", "AL", "MT"]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
) as ArrayRef,
"tag2" => Arc::new(
vec!["CT", "AL", "CT", "MA", "AL"]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
) as ArrayRef,
_ => Arc::new(
vec!["CT", "MT", "AL", "AL", "MT"]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
) as ArrayRef,
}
}
_ => unimplemented!(
"Unimplemented data type for test database: {:?}",
field.data_type()
),
})
.collect::<Vec<_>>();
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
self.table_data.push(Arc::new(batch));
@ -422,6 +495,11 @@ impl PartitionChunk for TestChunk {
Ok(Box::pin(stream))
}
/// Returns true if data of this chunk is sorted
fn is_sorted_on_pk(&self) -> bool {
false
}
fn apply_predicate(&self, predicate: &Predicate) -> Result<PredicateMatch> {
self.check_error()?;

View File

@ -449,6 +449,15 @@ impl PartitionChunk for DbChunk {
}
}
}
// TODOs: return the right value. For now the chunk is assumed to be not sorted
fn is_sorted_on_pk(&self) -> bool {
match &self.state {
State::MutableBuffer { .. } => false,
State::ReadBuffer { .. } => false,
State::ParquetFile { .. } => false,
}
}
}
impl Prunable for DbChunk {