Merge pull request #1950 from influxdata/cn/refactor-mock-chunks
refactor: Go from 3 mock chunks to 1pull/24376/head
commit
466416016b
|
@ -1,12 +1,13 @@
|
|||
//! This module contains structs that describe the metadata for a partition
|
||||
//! including schema, summary statistics, and file locations in storage.
|
||||
|
||||
use std::{borrow::Cow, mem};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Borrow;
|
||||
use std::iter::FromIterator;
|
||||
use std::num::NonZeroU64;
|
||||
use std::{
|
||||
borrow::{Borrow, Cow},
|
||||
iter::FromIterator,
|
||||
mem,
|
||||
num::NonZeroU64,
|
||||
};
|
||||
|
||||
/// Describes the aggregated (across all chunks) summary
|
||||
/// statistics for each column in a partition
|
||||
|
@ -44,11 +45,7 @@ pub struct PartitionChunkSummary {
|
|||
impl FromIterator<Self> for TableSummary {
|
||||
fn from_iter<T: IntoIterator<Item = Self>>(iter: T) -> Self {
|
||||
let mut iter = iter.into_iter();
|
||||
let first = iter.next().expect("must contain at least one element");
|
||||
let mut s = Self {
|
||||
name: first.name,
|
||||
columns: first.columns,
|
||||
};
|
||||
let mut s = iter.next().expect("must contain at least one element");
|
||||
|
||||
for other in iter {
|
||||
s.update_from(&other)
|
||||
|
|
|
@ -270,21 +270,21 @@ mod test {
|
|||
async fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<TestChunk>>) {
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Chunk 2 has an extra field, and only 4 fields
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_int_field_column("t", "field_int2")
|
||||
.with_four_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_i64_field_column("field_int2")
|
||||
.with_four_rows_of_data(),
|
||||
);
|
||||
|
||||
let expected = vec![
|
||||
|
|
|
@ -765,18 +765,31 @@ mod test {
|
|||
// in the duplicate module
|
||||
|
||||
// c1: no overlaps
|
||||
let c1 = Arc::new(TestChunk::new(1).with_tag_column_with_stats("t", "tag1", "a", "b"));
|
||||
let c1 = Arc::new(TestChunk::new("t").with_id(1).with_tag_column_with_stats(
|
||||
"tag1",
|
||||
Some("a"),
|
||||
Some("b"),
|
||||
));
|
||||
|
||||
// c2: over lap with c3
|
||||
let c2 = Arc::new(TestChunk::new(2).with_tag_column_with_stats("t", "tag1", "c", "d"));
|
||||
let c2 = Arc::new(TestChunk::new("t").with_id(2).with_tag_column_with_stats(
|
||||
"tag1",
|
||||
Some("c"),
|
||||
Some("d"),
|
||||
));
|
||||
|
||||
// c3: overlap with c2
|
||||
let c3 = Arc::new(TestChunk::new(3).with_tag_column_with_stats("t", "tag1", "c", "d"));
|
||||
let c3 = Arc::new(TestChunk::new("t").with_id(3).with_tag_column_with_stats(
|
||||
"tag1",
|
||||
Some("c"),
|
||||
Some("d"),
|
||||
));
|
||||
|
||||
// c4: self overlap
|
||||
let c4 = Arc::new(
|
||||
TestChunk::new(4)
|
||||
.with_tag_column_with_stats("t", "tag1", "e", "f")
|
||||
TestChunk::new("t")
|
||||
.with_id(4)
|
||||
.with_tag_column_with_stats("tag1", Some("e"), Some("f"))
|
||||
.with_may_contain_pk_duplicates(true),
|
||||
);
|
||||
|
||||
|
@ -797,11 +810,11 @@ mod test {
|
|||
async fn sort_planning_one_tag_with_time() {
|
||||
// Chunk 1 with 5 rows of data
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
@ -851,12 +864,12 @@ mod test {
|
|||
async fn sort_planning_two_tags_with_time() {
|
||||
// Chunk 1 with 5 rows of data
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
@ -906,12 +919,12 @@ mod test {
|
|||
async fn sort_read_filter_plan_for_two_tags_with_time() {
|
||||
// Chunk 1 with 5 rows of data
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
@ -943,22 +956,24 @@ mod test {
|
|||
async fn deduplicate_plan_for_overlapped_chunks() {
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Chunk 2 exactly the same with Chunk 1
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new(2)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(2)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
// Datafusion schema of the chunk
|
||||
// the same for 2 chunks
|
||||
|
@ -1011,22 +1026,24 @@ mod test {
|
|||
// Same two chunks but only select the field and timestamp, not the tag values
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Chunk 2 exactly the same with Chunk 1
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new(2)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(2)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
let chunks = vec![chunk1, chunk2];
|
||||
|
||||
|
@ -1083,30 +1100,33 @@ mod test {
|
|||
// Chunks with different fields / tags, and select a subset
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Chunk 2 same tags, but different fields
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new(2)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_int_field_column("t", "other_field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(2)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_i64_field_column("other_field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Chunk 3 exactly the same with Chunk 2
|
||||
let chunk3 = Arc::new(
|
||||
TestChunk::new(3)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_int_field_column("t", "other_field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(3)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_i64_field_column("other_field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
let chunks = vec![chunk1, chunk2, chunk3];
|
||||
|
@ -1172,32 +1192,35 @@ mod test {
|
|||
async fn deduplicate_plan_for_overlapped_chunks_with_different_schemas() {
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_tag_column("t", "tag2")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag1")
|
||||
.with_tag_column("tag2")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Chunk 2 has two different tags
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new(2)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag3")
|
||||
.with_tag_column("t", "tag1")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(2)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag3")
|
||||
.with_tag_column("tag1")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Chunk 3 has just tag3
|
||||
let chunk3 = Arc::new(
|
||||
TestChunk::new(3)
|
||||
.with_time_column("t")
|
||||
.with_tag_column("t", "tag3")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_int_field_column("t", "field_int2")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(3)
|
||||
.with_time_column()
|
||||
.with_tag_column("tag3")
|
||||
.with_i64_field_column("field_int")
|
||||
.with_i64_field_column("field_int2")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Requested output schema == the schema for all three
|
||||
|
@ -1271,11 +1294,11 @@ mod test {
|
|||
async fn scan_plan_with_one_chunk_no_duplicates() {
|
||||
// Test no duplicate at all
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
@ -1319,12 +1342,12 @@ mod test {
|
|||
async fn scan_plan_with_one_chunk_with_duplicates() {
|
||||
// Test one chunk with duplicate within
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
TestChunk::new("t")
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_may_contain_pk_duplicates(true)
|
||||
.with_ten_rows_of_data_some_duplicates("t"),
|
||||
.with_ten_rows_of_data_some_duplicates(),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
@ -1375,12 +1398,12 @@ mod test {
|
|||
async fn scan_plan_with_one_chunk_with_duplicates_subset() {
|
||||
// Test one chunk with duplicate within
|
||||
let chunk = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
TestChunk::new("t")
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_may_contain_pk_duplicates(true)
|
||||
.with_ten_rows_of_data_some_duplicates("t"),
|
||||
.with_ten_rows_of_data_some_duplicates(),
|
||||
);
|
||||
|
||||
let chunks = vec![chunk];
|
||||
|
@ -1440,19 +1463,19 @@ mod test {
|
|||
async fn scan_plan_with_two_overlapped_chunks_with_duplicates() {
|
||||
// test overlapped chunks
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_ten_rows_of_data_some_duplicates("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_ten_rows_of_data_some_duplicates(),
|
||||
);
|
||||
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
@ -1509,39 +1532,43 @@ mod test {
|
|||
async fn scan_plan_with_four_chunks() {
|
||||
// This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_ten_rows_of_data_some_duplicates("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(1)
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_ten_rows_of_data_some_duplicates(),
|
||||
);
|
||||
|
||||
// chunk2 overlaps with chunk 1
|
||||
let chunk2 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 5, 7000)
|
||||
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_five_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(2)
|
||||
.with_time_column_with_stats(Some(5), Some(7000))
|
||||
.with_tag_column_with_stats("tag1", Some("AL"), Some("MT"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_five_rows_of_data(),
|
||||
);
|
||||
|
||||
// chunk3 no overlap, no duplicates within
|
||||
let chunk3 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 8000, 20000)
|
||||
.with_tag_column_with_stats("t", "tag1", "UT", "WA")
|
||||
.with_int_field_column("t", "field_int")
|
||||
.with_three_rows_of_data("t"),
|
||||
TestChunk::new("t")
|
||||
.with_id(3)
|
||||
.with_time_column_with_stats(Some(8000), Some(20000))
|
||||
.with_tag_column_with_stats("tag1", Some("UT"), Some("WA"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_three_rows_of_data(),
|
||||
);
|
||||
|
||||
// chunk3 no overlap, duplicates within
|
||||
let chunk4 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
.with_time_column_with_stats("t", 28000, 220000)
|
||||
.with_tag_column_with_stats("t", "tag1", "UT", "WA")
|
||||
.with_int_field_column("t", "field_int")
|
||||
TestChunk::new("t")
|
||||
.with_id(4)
|
||||
.with_time_column_with_stats(Some(28000), Some(220000))
|
||||
.with_tag_column_with_stats("tag1", Some("UT"), Some("WA"))
|
||||
.with_i64_field_column("field_int")
|
||||
.with_may_contain_pk_duplicates(true)
|
||||
.with_four_rows_of_data("t"),
|
||||
.with_four_rows_of_data(),
|
||||
);
|
||||
|
||||
// Datafusion schema of the chunk
|
||||
|
|
|
@ -276,13 +276,8 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use data_types::partition_metadata::{
|
||||
ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary,
|
||||
};
|
||||
use internal_types::schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::{test::TestChunk, QueryChunk};
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! assert_groups_eq {
|
||||
|
@ -304,9 +299,17 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn one_column_no_overlap() {
|
||||
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("mumbai"));
|
||||
let c1 = TestChunk::new("chunk1").with_tag_column_with_stats(
|
||||
"tag1",
|
||||
Some("boston"),
|
||||
Some("mumbai"),
|
||||
);
|
||||
|
||||
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("new york"), Some("zoo york"));
|
||||
let c2 = TestChunk::new("chunk2").with_tag_column_with_stats(
|
||||
"tag1",
|
||||
Some("new york"),
|
||||
Some("zoo york"),
|
||||
);
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -316,9 +319,17 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn one_column_overlap() {
|
||||
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("new york"));
|
||||
let c1 = TestChunk::new("chunk1").with_tag_column_with_stats(
|
||||
"tag1",
|
||||
Some("boston"),
|
||||
Some("new york"),
|
||||
);
|
||||
|
||||
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("denver"), Some("zoo york"));
|
||||
let c2 = TestChunk::new("chunk2").with_tag_column_with_stats(
|
||||
"tag1",
|
||||
Some("denver"),
|
||||
Some("zoo york"),
|
||||
);
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -328,26 +339,24 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn multi_columns() {
|
||||
let c1 = TestChunk::new("chunk1").with_timestamp(0, 1000).with_tag(
|
||||
"tag1",
|
||||
Some("boston"),
|
||||
Some("new york"),
|
||||
);
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_time_column_with_stats(Some(0), Some(1000))
|
||||
.with_tag_column_with_stats("tag1", Some("boston"), Some("new york"));
|
||||
|
||||
// Overlaps in tag1, but not in time
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
.with_tag("tag1", Some("denver"), Some("zoo york"))
|
||||
.with_timestamp(2000, 3000);
|
||||
.with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york"))
|
||||
.with_time_column_with_stats(Some(2000), Some(3000));
|
||||
|
||||
// Overlaps in time, but not in tag1
|
||||
let c3 = TestChunk::new("chunk3")
|
||||
.with_tag("tag1", Some("zzx"), Some("zzy"))
|
||||
.with_timestamp(500, 1500);
|
||||
.with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy"))
|
||||
.with_time_column_with_stats(Some(500), Some(1500));
|
||||
|
||||
// Overlaps in time, and in tag1
|
||||
let c4 = TestChunk::new("chunk4")
|
||||
.with_tag("tag1", Some("aaa"), Some("zzz"))
|
||||
.with_timestamp(500, 1500);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("zzz"))
|
||||
.with_time_column_with_stats(Some(500), Some(1500));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
|
||||
|
||||
|
@ -362,8 +371,10 @@ mod test {
|
|||
#[test]
|
||||
fn boundary() {
|
||||
// check that overlap calculations include the bound
|
||||
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb"));
|
||||
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("bbb"), Some("ccc"));
|
||||
let c1 =
|
||||
TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
|
||||
let c2 =
|
||||
TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("bbb"), Some("ccc"));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -374,8 +385,10 @@ mod test {
|
|||
#[test]
|
||||
fn same() {
|
||||
// check that if chunks overlap exactly on the boundaries they are still grouped
|
||||
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb"));
|
||||
let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("aaa"), Some("bbb"));
|
||||
let c1 =
|
||||
TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
|
||||
let c2 =
|
||||
TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -386,8 +399,10 @@ mod test {
|
|||
#[test]
|
||||
fn different_tag_names() {
|
||||
// check that if chunks overlap but in different tag names
|
||||
let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb"));
|
||||
let c2 = TestChunk::new("chunk2").with_tag("tag2", Some("aaa"), Some("bbb"));
|
||||
let c1 =
|
||||
TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"));
|
||||
let c2 =
|
||||
TestChunk::new("chunk2").with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb"));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -401,12 +416,12 @@ mod test {
|
|||
fn different_tag_names_multi_tags() {
|
||||
// check that if chunks overlap but in different tag names
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", Some("aaa"), Some("bbb"));
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb"));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
.with_tag("tag2", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag3", Some("aaa"), Some("bbb"));
|
||||
.with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag3", Some("aaa"), Some("bbb"));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -418,21 +433,21 @@ mod test {
|
|||
#[test]
|
||||
fn three_column() {
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
|
||||
// Timestamp doesn't overlap, but the two tags do
|
||||
.with_timestamp(2001, 3000);
|
||||
.with_time_column_with_stats(Some(2001), Some(3000));
|
||||
|
||||
let c3 = TestChunk::new("chunk3")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", Some("aaa"), Some("zzz"))
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz"))
|
||||
// all three overlap
|
||||
.with_timestamp(1000, 2000);
|
||||
.with_time_column_with_stats(Some(1000), Some(2000));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2, c3]).expect("grouping succeeded");
|
||||
|
||||
|
@ -443,15 +458,15 @@ mod test {
|
|||
#[test]
|
||||
fn tag_order() {
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
.with_tag("tag2", Some("aaa"), Some("zzz"))
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz"))
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
// all three overlap, but tags in different order
|
||||
.with_timestamp(500, 1000);
|
||||
.with_time_column_with_stats(Some(500), Some(1000));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -462,15 +477,15 @@ mod test {
|
|||
#[test]
|
||||
fn tag_order_no_tags() {
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
// tag1 and timestamp overlap, but no tag2 (aka it is all null)
|
||||
// so it could overlap if there was a null tag2 value in chunk1
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_timestamp(500, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_time_column_with_stats(Some(500), Some(1000));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -481,16 +496,16 @@ mod test {
|
|||
#[test]
|
||||
fn tag_order_null_stats() {
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy"))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
// tag1 and timestamp overlap, tag2 has no stats (is all null)
|
||||
// so they might overlap if chunk1 had a null in tag 2
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag("tag2", None, None)
|
||||
.with_timestamp(500, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_tag_column_with_stats("tag2", None, None)
|
||||
.with_time_column_with_stats(Some(500), Some(1000));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -501,13 +516,13 @@ mod test {
|
|||
#[test]
|
||||
fn tag_order_partial_stats() {
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
// tag1 has a min but not a max. Should result in error
|
||||
.with_tag("tag1", Some("aaa"), None)
|
||||
.with_timestamp(500, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), None)
|
||||
.with_time_column_with_stats(Some(500), Some(1000));
|
||||
|
||||
let result = group_potential_duplicates(vec![c1, c2]).unwrap_err();
|
||||
|
||||
|
@ -525,16 +540,16 @@ mod test {
|
|||
#[test]
|
||||
fn tag_fields_not_counted() {
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_int_field("field", Some(0), Some(2))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_i64_field_column_with_stats("field", Some(0), Some(2))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
// tag1 and timestamp overlap, but field value does not
|
||||
// should still overlap
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_int_field("field", Some(100), Some(200))
|
||||
.with_timestamp(500, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_i64_field_column_with_stats("field", Some(100), Some(200))
|
||||
.with_time_column_with_stats(Some(500), Some(1000));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -548,15 +563,15 @@ mod test {
|
|||
// chunks; this will likely cause errors elsewhere in practice
|
||||
// as the schemas are incompatible (and can't be merged)
|
||||
let c1 = TestChunk::new("chunk1")
|
||||
.with_tag("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb"))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let c2 = TestChunk::new("chunk2")
|
||||
// tag1 column is actually a field is different in chunk
|
||||
// 2, so since the timestamps overlap these chunks
|
||||
// might also have duplicates (if tag1 was null in c1)
|
||||
.with_int_field("tag1", Some(100), Some(200))
|
||||
.with_timestamp(0, 1000);
|
||||
.with_i64_field_column_with_stats("tag1", Some(100), Some(200))
|
||||
.with_time_column_with_stats(Some(0), Some(1000));
|
||||
|
||||
let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded");
|
||||
|
||||
|
@ -569,115 +584,9 @@ mod test {
|
|||
fn to_string(groups: Vec<Vec<TestChunk>>) -> Vec<String> {
|
||||
let mut s = vec![];
|
||||
for (idx, group) in groups.iter().enumerate() {
|
||||
let names = group.iter().map(|c| c.name.as_str()).collect::<Vec<_>>();
|
||||
let names = group.iter().map(|c| c.table_name()).collect::<Vec<_>>();
|
||||
s.push(format!("Group {}: [{}]", idx, names.join(", ")));
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
/// Mocked out prunable provider to use testing overlaps
|
||||
#[derive(Debug, Clone)]
|
||||
struct TestChunk {
|
||||
// The name of this chunk
|
||||
name: String,
|
||||
summary: TableSummary,
|
||||
builder: SchemaBuilder,
|
||||
}
|
||||
|
||||
/// Implementation of creating a new column with statitics for TestChunkMeta
|
||||
macro_rules! make_stats {
|
||||
($MIN:expr, $MAX:expr, $STAT_TYPE:ident) => {{
|
||||
Statistics::$STAT_TYPE(StatValues {
|
||||
distinct_count: None,
|
||||
min: $MIN,
|
||||
max: $MAX,
|
||||
count: 42,
|
||||
})
|
||||
}};
|
||||
}
|
||||
|
||||
impl TestChunk {
|
||||
/// Create a new TestChunk with a specified name
|
||||
fn new(name: impl Into<String>) -> Self {
|
||||
let name = name.into();
|
||||
let summary = TableSummary::new(name.clone());
|
||||
let builder = SchemaBuilder::new();
|
||||
Self {
|
||||
name,
|
||||
summary,
|
||||
builder,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds a tag column with the specified min/max values
|
||||
fn with_tag(
|
||||
mut self,
|
||||
name: impl Into<String>,
|
||||
min: Option<&str>,
|
||||
max: Option<&str>,
|
||||
) -> Self {
|
||||
let min = min.map(|v| v.to_string());
|
||||
let max = max.map(|v| v.to_string());
|
||||
|
||||
let tag_name = name.into();
|
||||
self.builder.tag(&tag_name);
|
||||
|
||||
self.summary.columns.push(ColumnSummary {
|
||||
name: tag_name,
|
||||
influxdb_type: Some(InfluxDbType::Tag),
|
||||
stats: make_stats!(min, max, String),
|
||||
});
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds a timestamp column with the specified min/max values
|
||||
fn with_timestamp(mut self, min: i64, max: i64) -> Self {
|
||||
self.builder.timestamp();
|
||||
|
||||
let min = Some(min);
|
||||
let max = Some(max);
|
||||
|
||||
self.summary.columns.push(ColumnSummary {
|
||||
name: TIME_COLUMN_NAME.into(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
stats: make_stats!(min, max, I64),
|
||||
});
|
||||
self
|
||||
}
|
||||
|
||||
/// Adds an I64 field column with the specified min/max values
|
||||
fn with_int_field(
|
||||
mut self,
|
||||
name: impl Into<String>,
|
||||
min: Option<i64>,
|
||||
max: Option<i64>,
|
||||
) -> Self {
|
||||
let field_name = name.into();
|
||||
self.builder
|
||||
.field(&field_name, arrow::datatypes::DataType::Int64);
|
||||
|
||||
self.summary.columns.push(ColumnSummary {
|
||||
name: field_name,
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: make_stats!(min, max, I64),
|
||||
});
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryChunkMeta for TestChunk {
|
||||
fn summary(&self) -> &TableSummary {
|
||||
&self.summary
|
||||
}
|
||||
|
||||
fn schema(&self) -> Arc<Schema> {
|
||||
let schema = self
|
||||
.builder
|
||||
// need to clone because `build` resets builder state
|
||||
.clone()
|
||||
.build()
|
||||
.expect("created schema");
|
||||
Arc::new(schema)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -165,20 +165,15 @@ impl<'a> PruningStatistics for ChunkMetaStats<'a> {
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::{cell::RefCell, fmt, sync::Arc};
|
||||
|
||||
use arrow::datatypes::DataType;
|
||||
use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics};
|
||||
use crate::{predicate::PredicateBuilder, test::TestChunk, QueryChunk};
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use internal_types::schema::{builder::SchemaBuilder, merge::SchemaMerger, Schema};
|
||||
|
||||
use crate::predicate::PredicateBuilder;
|
||||
use std::{cell::RefCell, sync::Arc};
|
||||
|
||||
#[test]
|
||||
fn test_empty() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1"));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1"));
|
||||
|
||||
let predicate = PredicateBuilder::new().build();
|
||||
let pruned = prune_chunks(&observer, vec![c1], &predicate);
|
||||
|
@ -196,7 +191,7 @@ mod test {
|
|||
// column1 > 100.0 where
|
||||
// c1: [0.0, 10.0] --> pruned
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_f64_column(
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_f64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0.0),
|
||||
Some(10.0),
|
||||
|
@ -218,8 +213,11 @@ mod test {
|
|||
// c1: [0, 10] --> pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 =
|
||||
Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(10)));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(10),
|
||||
));
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").gt(lit(100)))
|
||||
|
@ -238,8 +236,11 @@ mod test {
|
|||
// c1: [0, 10] --> pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 =
|
||||
Arc::new(TestChunkMeta::new("chunk1").with_u64_column("column1", Some(0), Some(10)));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_u64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(10),
|
||||
));
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").gt(lit(100)))
|
||||
|
@ -258,7 +259,7 @@ mod test {
|
|||
// c1: [false, false] --> pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_bool_column(
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_bool_field_column_with_stats(
|
||||
"column1",
|
||||
Some(false),
|
||||
Some(false),
|
||||
|
@ -279,11 +280,13 @@ mod test {
|
|||
// c1: ["a", "q"] --> pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_string_column(
|
||||
"column1",
|
||||
Some("a"),
|
||||
Some("q"),
|
||||
));
|
||||
let c1 = Arc::new(
|
||||
TestChunk::new("chunk1").with_string_field_column_with_stats(
|
||||
"column1",
|
||||
Some("a"),
|
||||
Some("q"),
|
||||
),
|
||||
);
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").gt(lit("z")))
|
||||
|
@ -301,7 +304,7 @@ mod test {
|
|||
// column1 < 100.0 where
|
||||
// c1: [0.0, 10.0] --> not pruned
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_f64_column(
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_f64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0.0),
|
||||
Some(10.0),
|
||||
|
@ -323,8 +326,11 @@ mod test {
|
|||
// c1: [0, 10] --> not pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 =
|
||||
Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(10)));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(10),
|
||||
));
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").lt(lit(100)))
|
||||
|
@ -343,8 +349,11 @@ mod test {
|
|||
// c1: [0, 10] --> not pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 =
|
||||
Arc::new(TestChunkMeta::new("chunk1").with_u64_column("column1", Some(0), Some(10)));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_u64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(10),
|
||||
));
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").lt(lit(100)))
|
||||
|
@ -363,7 +372,7 @@ mod test {
|
|||
// c1: [false, true] --> not pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_bool_column(
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_bool_field_column_with_stats(
|
||||
"column1",
|
||||
Some(false),
|
||||
Some(true),
|
||||
|
@ -384,11 +393,13 @@ mod test {
|
|||
// c1: ["a", "q"] --> not pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_string_column(
|
||||
"column1",
|
||||
Some("a"),
|
||||
Some("q"),
|
||||
));
|
||||
let c1 = Arc::new(
|
||||
TestChunk::new("chunk1").with_string_field_column_with_stats(
|
||||
"column1",
|
||||
Some("a"),
|
||||
Some("q"),
|
||||
),
|
||||
);
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").lt(lit("z")))
|
||||
|
@ -410,13 +421,23 @@ mod test {
|
|||
// c4: Null --> not pruned (no statistics at all)
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", None, Some(10)));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
None,
|
||||
Some(10),
|
||||
));
|
||||
|
||||
let c2 = Arc::new(TestChunkMeta::new("chunk2").with_i64_column("column1", Some(0), None));
|
||||
let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
None,
|
||||
));
|
||||
|
||||
let c3 = Arc::new(TestChunkMeta::new("chunk3").with_i64_column("column1", None, None));
|
||||
let c3 = Arc::new(
|
||||
TestChunk::new("chunk3").with_i64_field_column_with_stats("column1", None, None),
|
||||
);
|
||||
|
||||
let c4 = Arc::new(TestChunkMeta::new("chunk4").with_i64_column_no_stats("column1"));
|
||||
let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1"));
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").gt(lit(100)))
|
||||
|
@ -440,20 +461,39 @@ mod test {
|
|||
// c6: [None, 10] --> pruned
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 =
|
||||
Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(10)));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(10),
|
||||
));
|
||||
|
||||
let c2 =
|
||||
Arc::new(TestChunkMeta::new("chunk2").with_i64_column("column1", Some(0), Some(1000)));
|
||||
let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(1000),
|
||||
));
|
||||
|
||||
let c3 =
|
||||
Arc::new(TestChunkMeta::new("chunk3").with_i64_column("column1", Some(10), Some(20)));
|
||||
let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(10),
|
||||
Some(20),
|
||||
));
|
||||
|
||||
let c4 = Arc::new(TestChunkMeta::new("chunk4").with_i64_column("column1", None, None));
|
||||
let c4 = Arc::new(
|
||||
TestChunk::new("chunk4").with_i64_field_column_with_stats("column1", None, None),
|
||||
);
|
||||
|
||||
let c5 = Arc::new(TestChunkMeta::new("chunk5").with_i64_column("column1", Some(10), None));
|
||||
let c5 = Arc::new(TestChunk::new("chunk5").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(10),
|
||||
None,
|
||||
));
|
||||
|
||||
let c6 = Arc::new(TestChunkMeta::new("chunk6").with_i64_column("column1", None, Some(20)));
|
||||
let c6 = Arc::new(TestChunk::new("chunk6").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
None,
|
||||
Some(20),
|
||||
));
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").gt(lit(100)))
|
||||
|
@ -477,19 +517,22 @@ mod test {
|
|||
// c3: None, column2 [0, 4] --> not pruned (no stats for column1)
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(
|
||||
TestChunkMeta::new("chunk1")
|
||||
.with_i64_column("column1", Some(0), Some(100))
|
||||
.with_i64_column("column2", Some(0), Some(4)),
|
||||
TestChunk::new("chunk1")
|
||||
.with_i64_field_column_with_stats("column1", Some(0), Some(100))
|
||||
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
|
||||
);
|
||||
|
||||
let c2 = Arc::new(
|
||||
TestChunkMeta::new("chunk2")
|
||||
.with_i64_column("column1", Some(0), Some(1000))
|
||||
.with_i64_column("column2", Some(0), Some(4)),
|
||||
TestChunk::new("chunk2")
|
||||
.with_i64_field_column_with_stats("column1", Some(0), Some(1000))
|
||||
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
|
||||
);
|
||||
|
||||
let c3 =
|
||||
Arc::new(TestChunkMeta::new("chunk3").with_i64_column("column2", Some(0), Some(4)));
|
||||
let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats(
|
||||
"column2",
|
||||
Some(0),
|
||||
Some(4),
|
||||
));
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
.add_expr(col("column1").gt(lit(100)))
|
||||
|
@ -520,39 +563,39 @@ mod test {
|
|||
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(
|
||||
TestChunkMeta::new("chunk1")
|
||||
.with_i64_column("column1", Some(0), Some(1000))
|
||||
.with_i64_column("column2", Some(0), Some(4)),
|
||||
TestChunk::new("chunk1")
|
||||
.with_i64_field_column_with_stats("column1", Some(0), Some(1000))
|
||||
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
|
||||
);
|
||||
|
||||
let c2 = Arc::new(
|
||||
TestChunkMeta::new("chunk2")
|
||||
.with_i64_column("column1", Some(0), Some(10))
|
||||
.with_i64_column("column2", Some(0), Some(4)),
|
||||
TestChunk::new("chunk2")
|
||||
.with_i64_field_column_with_stats("column1", Some(0), Some(10))
|
||||
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
|
||||
);
|
||||
|
||||
let c3 = Arc::new(
|
||||
TestChunkMeta::new("chunk3")
|
||||
.with_i64_column("column1", Some(0), Some(10))
|
||||
.with_i64_column("column2", Some(5), Some(10)),
|
||||
TestChunk::new("chunk3")
|
||||
.with_i64_field_column_with_stats("column1", Some(0), Some(10))
|
||||
.with_i64_field_column_with_stats("column2", Some(5), Some(10)),
|
||||
);
|
||||
|
||||
let c4 = Arc::new(
|
||||
TestChunkMeta::new("chunk4")
|
||||
.with_i64_column("column1", Some(1000), Some(2000))
|
||||
.with_i64_column("column2", Some(0), Some(4)),
|
||||
TestChunk::new("chunk4")
|
||||
.with_i64_field_column_with_stats("column1", Some(1000), Some(2000))
|
||||
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
|
||||
);
|
||||
|
||||
let c5 = Arc::new(
|
||||
TestChunkMeta::new("chunk5")
|
||||
.with_i64_column("column1", Some(0), Some(10))
|
||||
.with_i64_column_no_stats("column2"),
|
||||
TestChunk::new("chunk5")
|
||||
.with_i64_field_column_with_stats("column1", Some(0), Some(10))
|
||||
.with_i64_field_column_no_stats("column2"),
|
||||
);
|
||||
|
||||
let c6 = Arc::new(
|
||||
TestChunkMeta::new("chunk6")
|
||||
.with_i64_column_no_stats("column1")
|
||||
.with_i64_column("column2", Some(0), Some(4)),
|
||||
TestChunk::new("chunk6")
|
||||
.with_i64_field_column_no_stats("column1")
|
||||
.with_i64_field_column_with_stats("column2", Some(0), Some(4)),
|
||||
);
|
||||
|
||||
let predicate = PredicateBuilder::new()
|
||||
|
@ -580,19 +623,23 @@ mod test {
|
|||
// c3: column1 [1000, 2000] --> pruned (types are correct)
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 = Arc::new(TestChunkMeta::new("chunk1").with_string_column(
|
||||
"column1",
|
||||
Some("0"),
|
||||
Some("9"),
|
||||
));
|
||||
let c1 = Arc::new(
|
||||
TestChunk::new("chunk1").with_string_field_column_with_stats(
|
||||
"column1",
|
||||
Some("0"),
|
||||
Some("9"),
|
||||
),
|
||||
);
|
||||
|
||||
let c2 = Arc::new(TestChunkMeta::new("chunk2").with_string_column(
|
||||
"column1",
|
||||
Some("1000"),
|
||||
Some("2000"),
|
||||
));
|
||||
let c2 = Arc::new(
|
||||
TestChunk::new("chunk2").with_string_field_column_with_stats(
|
||||
"column1",
|
||||
Some("1000"),
|
||||
Some("2000"),
|
||||
),
|
||||
);
|
||||
|
||||
let c3 = Arc::new(TestChunkMeta::new("chunk3").with_i64_column(
|
||||
let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(1000),
|
||||
Some(2000),
|
||||
|
@ -628,19 +675,25 @@ mod test {
|
|||
// c4: column1 [1000u64, 2000u64] --> pruned (types are different)
|
||||
|
||||
let observer = TestObserver::new();
|
||||
let c1 =
|
||||
Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(1000)));
|
||||
let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(1000),
|
||||
));
|
||||
|
||||
let c2 =
|
||||
Arc::new(TestChunkMeta::new("chunk2").with_u64_column("column1", Some(0), Some(1000)));
|
||||
let c2 = Arc::new(TestChunk::new("chunk2").with_u64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(0),
|
||||
Some(1000),
|
||||
));
|
||||
|
||||
let c3 = Arc::new(TestChunkMeta::new("chunk3").with_i64_column(
|
||||
let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(1000),
|
||||
Some(2000),
|
||||
));
|
||||
|
||||
let c4 = Arc::new(TestChunkMeta::new("chunk4").with_u64_column(
|
||||
let c4 = Arc::new(TestChunk::new("chunk4").with_u64_field_column_with_stats(
|
||||
"column1",
|
||||
Some(1000),
|
||||
Some(2000),
|
||||
|
@ -656,8 +709,8 @@ mod test {
|
|||
assert_eq!(names(&pruned), vec!["chunk1", "chunk2"]);
|
||||
}
|
||||
|
||||
fn names(pruned: &[Arc<TestChunkMeta>]) -> Vec<&str> {
|
||||
pruned.iter().map(|p| p.name.as_str()).collect()
|
||||
fn names(pruned: &[Arc<TestChunk>]) -> Vec<&str> {
|
||||
pruned.iter().map(|p| p.table_name()).collect()
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
@ -676,7 +729,7 @@ mod test {
|
|||
}
|
||||
|
||||
impl PruningObserver for TestObserver {
|
||||
type Observed = TestChunkMeta;
|
||||
type Observed = TestChunk;
|
||||
|
||||
fn was_pruned(&self, chunk: &Self::Observed) {
|
||||
self.events.borrow_mut().push(format!("{}: Pruned", chunk))
|
||||
|
@ -694,168 +747,4 @@ mod test {
|
|||
.push(format!("{}: Could not prune chunk: {}", chunk, reason))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TestChunkMeta {
|
||||
name: String,
|
||||
summary: TableSummary,
|
||||
schema: Arc<Schema>,
|
||||
}
|
||||
|
||||
/// Implementation of creating a new column with statitics for TestChunkMeta
|
||||
macro_rules! impl_with_column {
|
||||
($SELF:expr, $COLUMN_NAME:expr, $MIN:expr, $MAX:expr, $DATA_TYPE:ident, $STAT_TYPE:ident) => {{
|
||||
let Self {
|
||||
name,
|
||||
summary,
|
||||
schema,
|
||||
} = $SELF;
|
||||
let column_name = $COLUMN_NAME.into();
|
||||
let new_self = Self {
|
||||
name,
|
||||
schema: Self::add_field_to_schema(&column_name, schema, DataType::$DATA_TYPE),
|
||||
summary: Self::add_column_to_summary(
|
||||
summary,
|
||||
column_name,
|
||||
Statistics::$STAT_TYPE(StatValues {
|
||||
distinct_count: None,
|
||||
min: $MIN,
|
||||
max: $MAX,
|
||||
count: 42,
|
||||
}),
|
||||
),
|
||||
};
|
||||
new_self
|
||||
}};
|
||||
}
|
||||
|
||||
impl TestChunkMeta {
|
||||
fn new(name: impl Into<String>) -> Self {
|
||||
let name = name.into();
|
||||
let summary = TableSummary::new(&name);
|
||||
let schema = Arc::new(SchemaBuilder::new().build().unwrap());
|
||||
Self {
|
||||
name,
|
||||
summary,
|
||||
schema,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds an f64 column named into the schema
|
||||
fn with_f64_column(
|
||||
self,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<f64>,
|
||||
max: Option<f64>,
|
||||
) -> Self {
|
||||
impl_with_column!(self, column_name, min, max, Float64, F64)
|
||||
}
|
||||
|
||||
/// Adds an i64 column named into the schema
|
||||
fn with_i64_column(
|
||||
self,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<i64>,
|
||||
max: Option<i64>,
|
||||
) -> Self {
|
||||
impl_with_column!(self, column_name, min, max, Int64, I64)
|
||||
}
|
||||
|
||||
/// Adds an i64 column named into the schema, but with no stats
|
||||
fn with_i64_column_no_stats(self, column_name: impl AsRef<str>) -> Self {
|
||||
let Self {
|
||||
name,
|
||||
summary,
|
||||
schema,
|
||||
} = self;
|
||||
Self {
|
||||
name,
|
||||
schema: Self::add_field_to_schema(column_name.as_ref(), schema, DataType::Int64),
|
||||
// Note we don't add any stats
|
||||
summary,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds an u64 column named into the schema
|
||||
fn with_u64_column(
|
||||
self,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<u64>,
|
||||
max: Option<u64>,
|
||||
) -> Self {
|
||||
impl_with_column!(self, column_name, min, max, UInt64, U64)
|
||||
}
|
||||
|
||||
/// Adds bool column named into the schema
|
||||
fn with_bool_column(
|
||||
self,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<bool>,
|
||||
max: Option<bool>,
|
||||
) -> Self {
|
||||
impl_with_column!(self, column_name, min, max, Boolean, Bool)
|
||||
}
|
||||
|
||||
/// Adds a string column named into the schema
|
||||
fn with_string_column(
|
||||
self,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<&str>,
|
||||
max: Option<&str>,
|
||||
) -> Self {
|
||||
let min = min.map(|v| v.to_string());
|
||||
let max = max.map(|v| v.to_string());
|
||||
impl_with_column!(self, column_name, min, max, Utf8, String)
|
||||
}
|
||||
|
||||
fn add_field_to_schema(
|
||||
column_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
data_type: DataType,
|
||||
) -> Arc<Schema> {
|
||||
let new_schema = SchemaBuilder::new()
|
||||
.field(column_name, data_type)
|
||||
.build()
|
||||
.expect("built new field schema");
|
||||
|
||||
let new_schema = SchemaMerger::new()
|
||||
.merge(schema.as_ref())
|
||||
.expect("merged existing schema")
|
||||
.merge(&new_schema)
|
||||
.expect("merged new schema")
|
||||
.build();
|
||||
|
||||
Arc::new(new_schema)
|
||||
}
|
||||
|
||||
fn add_column_to_summary(
|
||||
mut summary: TableSummary,
|
||||
column_name: impl Into<String>,
|
||||
stats: Statistics,
|
||||
) -> TableSummary {
|
||||
summary.columns.push(ColumnSummary {
|
||||
name: column_name.into(),
|
||||
influxdb_type: None,
|
||||
stats,
|
||||
});
|
||||
|
||||
summary
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TestChunkMeta {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.name)
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryChunkMeta for TestChunkMeta {
|
||||
fn summary(&self) -> &TableSummary {
|
||||
&self.summary
|
||||
}
|
||||
|
||||
fn schema(&self) -> Arc<Schema> {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,35 +3,30 @@
|
|||
//!
|
||||
//! AKA it is a Mock
|
||||
|
||||
use arrow::{
|
||||
array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray},
|
||||
datatypes::{DataType, Int32Type, TimeUnit},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkSummary,
|
||||
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
|
||||
};
|
||||
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream};
|
||||
use futures::StreamExt;
|
||||
|
||||
use crate::exec::Executor;
|
||||
use crate::{
|
||||
exec::stringset::{StringSet, StringSetRef},
|
||||
DatabaseStore, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
|
||||
};
|
||||
|
||||
use arrow::{
|
||||
array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray},
|
||||
datatypes::{DataType, Int32Type, TimeUnit},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
chunk_metadata::ChunkSummary,
|
||||
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
|
||||
};
|
||||
use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream};
|
||||
use futures::StreamExt;
|
||||
use internal_types::{
|
||||
schema::{
|
||||
builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema, TIME_COLUMN_NAME,
|
||||
},
|
||||
schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema},
|
||||
selection::Selection,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use parking_lot::Mutex;
|
||||
use snafu::Snafu;
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
use std::{collections::BTreeMap, fmt, sync::Arc};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TestDatabase {
|
||||
|
@ -135,8 +130,17 @@ impl QueryDatabase for TestDatabase {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug)]
|
||||
pub struct TestChunk {
|
||||
/// Table name
|
||||
table_name: String,
|
||||
|
||||
/// Schema of the table
|
||||
schema: Arc<Schema>,
|
||||
|
||||
/// Return value for summary()
|
||||
table_summary: TableSummary,
|
||||
|
||||
id: u32,
|
||||
|
||||
/// Set the flag if this chunk might contain duplicates
|
||||
|
@ -145,12 +149,6 @@ pub struct TestChunk {
|
|||
/// A copy of the captured predicates passed
|
||||
predicates: Mutex<Vec<Predicate>>,
|
||||
|
||||
/// Table name
|
||||
table_name: Option<String>,
|
||||
|
||||
/// Schema of the table
|
||||
table_schema: Option<Schema>,
|
||||
|
||||
/// RecordBatches that are returned on each request
|
||||
table_data: Vec<Arc<RecordBatch>>,
|
||||
|
||||
|
@ -159,19 +157,87 @@ pub struct TestChunk {
|
|||
|
||||
/// Return value for apply_predicate, if desired
|
||||
predicate_match: Option<PredicateMatch>,
|
||||
}
|
||||
|
||||
/// Return value for summary(), if desired
|
||||
table_summary: Option<TableSummary>,
|
||||
/// Implements a method for adding a column with default stats
|
||||
macro_rules! impl_with_column {
|
||||
($NAME:ident, $DATA_TYPE:ident) => {
|
||||
pub fn $NAME(self, column_name: impl Into<String>) -> Self {
|
||||
let column_name = column_name.into();
|
||||
|
||||
let new_column_schema = SchemaBuilder::new()
|
||||
.field(&column_name, DataType::$DATA_TYPE)
|
||||
.build()
|
||||
.unwrap();
|
||||
self.add_schema_to_table(new_column_schema, true, None)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Implements a method for adding a column without any stats
|
||||
macro_rules! impl_with_column_no_stats {
|
||||
($NAME:ident, $DATA_TYPE:ident) => {
|
||||
pub fn $NAME(self, column_name: impl Into<String>) -> Self {
|
||||
let column_name = column_name.into();
|
||||
|
||||
let new_column_schema = SchemaBuilder::new()
|
||||
.field(&column_name, DataType::$DATA_TYPE)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
self.add_schema_to_table(new_column_schema, false, None)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Implements a method for adding a column with stats that have the specified min and max
|
||||
macro_rules! impl_with_column_with_stats {
|
||||
($NAME:ident, $DATA_TYPE:ident, $RUST_TYPE:ty, $STAT_TYPE:ident) => {
|
||||
pub fn $NAME(
|
||||
self,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<$RUST_TYPE>,
|
||||
max: Option<$RUST_TYPE>,
|
||||
) -> Self {
|
||||
let column_name = column_name.into();
|
||||
|
||||
let new_column_schema = SchemaBuilder::new()
|
||||
.field(&column_name, DataType::$DATA_TYPE)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let stats = Statistics::$STAT_TYPE(StatValues {
|
||||
min,
|
||||
max,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
self.add_schema_to_table(new_column_schema, true, Some(stats))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl TestChunk {
|
||||
pub fn new(id: u32) -> Self {
|
||||
pub fn new(table_name: impl Into<String>) -> Self {
|
||||
let table_name = table_name.into();
|
||||
Self {
|
||||
id,
|
||||
..Default::default()
|
||||
table_name: table_name.clone(),
|
||||
schema: Arc::new(SchemaBuilder::new().build().unwrap()),
|
||||
table_summary: TableSummary::new(table_name),
|
||||
id: Default::default(),
|
||||
may_contain_pk_duplicates: Default::default(),
|
||||
predicates: Default::default(),
|
||||
table_data: Default::default(),
|
||||
saved_error: Default::default(),
|
||||
predicate_match: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_id(mut self, id: u32) -> Self {
|
||||
self.id = id;
|
||||
self
|
||||
}
|
||||
|
||||
/// specify that any call should result in an error with the message
|
||||
/// specified
|
||||
pub fn with_error(mut self, error_message: impl Into<String>) -> Self {
|
||||
|
@ -194,183 +260,163 @@ impl TestChunk {
|
|||
}
|
||||
}
|
||||
|
||||
/// Register a table with the test chunk and a "dummy" column
|
||||
pub fn with_table(self, table_name: impl Into<String>) -> Self {
|
||||
self.with_tag_column(table_name, "dummy_col")
|
||||
}
|
||||
|
||||
/// Set the `may_contain_pk_duplicates` flag
|
||||
pub fn with_may_contain_pk_duplicates(mut self, v: bool) -> Self {
|
||||
self.may_contain_pk_duplicates = v;
|
||||
self
|
||||
}
|
||||
|
||||
/// Register an tag column with the test chunk
|
||||
pub fn with_tag_column(
|
||||
self,
|
||||
table_name: impl Into<String>,
|
||||
column_name: impl Into<String>,
|
||||
) -> Self {
|
||||
let table_name = table_name.into();
|
||||
/// Register a tag column with the test chunk with default stats
|
||||
pub fn with_tag_column(self, column_name: impl Into<String>) -> Self {
|
||||
let column_name = column_name.into();
|
||||
|
||||
// make a new schema with the specified column and
|
||||
// merge it in to any existing schema
|
||||
let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap();
|
||||
|
||||
self.add_schema_to_table(table_name, new_column_schema)
|
||||
self.add_schema_to_table(new_column_schema, true, None)
|
||||
}
|
||||
|
||||
/// Register an tag column with the test chunk
|
||||
/// Register a tag column with the test chunk
|
||||
pub fn with_tag_column_with_stats(
|
||||
self,
|
||||
table_name: impl Into<String>,
|
||||
column_name: impl Into<String>,
|
||||
min: &str,
|
||||
max: &str,
|
||||
min: Option<&str>,
|
||||
max: Option<&str>,
|
||||
) -> Self {
|
||||
let table_name = table_name.into();
|
||||
let column_name = column_name.into();
|
||||
|
||||
let mut new_self = self.with_tag_column(&table_name, &column_name);
|
||||
// make a new schema with the specified column and
|
||||
// merge it in to any existing schema
|
||||
let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap();
|
||||
|
||||
// Now, find the appropriate column summary and update the stats
|
||||
let column_summary: &mut ColumnSummary = new_self
|
||||
.table_summary
|
||||
.as_mut()
|
||||
.expect("had table summary")
|
||||
.columns
|
||||
.iter_mut()
|
||||
.find(|c| c.name == column_name)
|
||||
.expect("had column");
|
||||
|
||||
column_summary.stats = Statistics::String(StatValues {
|
||||
min: Some(min.to_string()),
|
||||
max: Some(max.to_string()),
|
||||
// Construct stats
|
||||
let stats = Statistics::String(StatValues {
|
||||
min: min.map(ToString::to_string),
|
||||
max: max.map(ToString::to_string),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
new_self
|
||||
self.add_schema_to_table(new_column_schema, true, Some(stats))
|
||||
}
|
||||
|
||||
/// Register a timestamp column with the test chunk
|
||||
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
|
||||
let table_name = table_name.into();
|
||||
|
||||
/// Register a timestamp column with the test chunk with default stats
|
||||
pub fn with_time_column(self) -> Self {
|
||||
// make a new schema with the specified column and
|
||||
// merge it in to any existing schema
|
||||
let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap();
|
||||
|
||||
self.add_schema_to_table(table_name, new_column_schema)
|
||||
self.add_schema_to_table(new_column_schema, true, None)
|
||||
}
|
||||
|
||||
/// Register a timestamp column with the test chunk
|
||||
pub fn with_time_column_with_stats(
|
||||
self,
|
||||
table_name: impl Into<String>,
|
||||
min: i64,
|
||||
max: i64,
|
||||
) -> Self {
|
||||
let table_name = table_name.into();
|
||||
pub fn with_time_column_with_stats(self, min: Option<i64>, max: Option<i64>) -> Self {
|
||||
// make a new schema with the specified column and
|
||||
// merge it in to any existing schema
|
||||
let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap();
|
||||
|
||||
let mut new_self = self.with_time_column(&table_name);
|
||||
|
||||
// Now, find the appropriate column summary and update the stats
|
||||
let column_summary: &mut ColumnSummary = new_self
|
||||
.table_summary
|
||||
.as_mut()
|
||||
.expect("had table summary")
|
||||
.columns
|
||||
.iter_mut()
|
||||
.find(|c| c.name == TIME_COLUMN_NAME)
|
||||
.expect("had column");
|
||||
|
||||
column_summary.stats = Statistics::I64(StatValues {
|
||||
min: Some(min),
|
||||
max: Some(max),
|
||||
// Construct stats
|
||||
let stats = Statistics::I64(StatValues {
|
||||
min,
|
||||
max,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
new_self
|
||||
self.add_schema_to_table(new_column_schema, true, Some(stats))
|
||||
}
|
||||
|
||||
/// Register an int field column with the test chunk
|
||||
pub fn with_int_field_column(
|
||||
impl_with_column!(with_i64_field_column, Int64);
|
||||
impl_with_column_no_stats!(with_i64_field_column_no_stats, Int64);
|
||||
impl_with_column_with_stats!(with_i64_field_column_with_stats, Int64, i64, I64);
|
||||
|
||||
impl_with_column!(with_u64_column, UInt64);
|
||||
impl_with_column_no_stats!(with_u64_field_column_no_stats, UInt64);
|
||||
impl_with_column_with_stats!(with_u64_field_column_with_stats, UInt64, u64, U64);
|
||||
|
||||
impl_with_column!(with_f64_field_column, Float64);
|
||||
impl_with_column_no_stats!(with_f64_field_column_no_stats, Float64);
|
||||
impl_with_column_with_stats!(with_f64_field_column_with_stats, Float64, f64, F64);
|
||||
|
||||
impl_with_column!(with_bool_field_column, Boolean);
|
||||
impl_with_column_no_stats!(with_bool_field_column_no_stats, Boolean);
|
||||
impl_with_column_with_stats!(with_bool_field_column_with_stats, Boolean, bool, Bool);
|
||||
|
||||
/// Register a string field column with the test chunk
|
||||
pub fn with_string_field_column_with_stats(
|
||||
self,
|
||||
table_name: impl Into<String>,
|
||||
column_name: impl Into<String>,
|
||||
min: Option<&str>,
|
||||
max: Option<&str>,
|
||||
) -> Self {
|
||||
let column_name = column_name.into();
|
||||
|
||||
// make a new schema with the specified column and
|
||||
// merge it in to any existing schema
|
||||
let new_column_schema = SchemaBuilder::new()
|
||||
.field(&column_name, DataType::Int64)
|
||||
.field(&column_name, DataType::Utf8)
|
||||
.build()
|
||||
.unwrap();
|
||||
self.add_schema_to_table(table_name, new_column_schema)
|
||||
|
||||
// Construct stats
|
||||
let stats = Statistics::String(StatValues {
|
||||
min: min.map(ToString::to_string),
|
||||
max: max.map(ToString::to_string),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
self.add_schema_to_table(new_column_schema, true, Some(stats))
|
||||
}
|
||||
|
||||
/// Adds the specified schema and optionally a column summary containing optional stats.
|
||||
/// If `add_column_summary` is false, `stats` is ignored. If `add_column_summary` is true but
|
||||
/// `stats` is `None`, default stats will be added to the column summary.
|
||||
fn add_schema_to_table(
|
||||
mut self,
|
||||
table_name: impl Into<String>,
|
||||
new_column_schema: Schema,
|
||||
add_column_summary: bool,
|
||||
stats: Option<Statistics>,
|
||||
) -> Self {
|
||||
let table_name = table_name.into();
|
||||
if let Some(existing_name) = &self.table_name {
|
||||
assert_eq!(&table_name, existing_name);
|
||||
}
|
||||
self.table_name = Some(table_name.clone());
|
||||
|
||||
// assume the new schema has exactly a single table
|
||||
assert_eq!(new_column_schema.len(), 1);
|
||||
let (col_type, new_field) = new_column_schema.field(0);
|
||||
|
||||
let influxdb_type = col_type.map(|t| match t {
|
||||
InfluxColumnType::IOx(_) => todo!(),
|
||||
InfluxColumnType::Tag => InfluxDbType::Tag,
|
||||
InfluxColumnType::Field(_) => InfluxDbType::Field,
|
||||
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
|
||||
});
|
||||
|
||||
let stats = match new_field.data_type() {
|
||||
DataType::Boolean => Statistics::Bool(StatValues::default()),
|
||||
DataType::Int64 => Statistics::I64(StatValues::default()),
|
||||
DataType::UInt64 => Statistics::U64(StatValues::default()),
|
||||
DataType::Utf8 => Statistics::String(StatValues::default()),
|
||||
DataType::Dictionary(_, value_type) => {
|
||||
assert!(matches!(**value_type, DataType::Utf8));
|
||||
Statistics::String(StatValues::default())
|
||||
}
|
||||
DataType::Float64 => Statistics::String(StatValues::default()),
|
||||
DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()),
|
||||
_ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()),
|
||||
};
|
||||
|
||||
let column_summary = ColumnSummary {
|
||||
name: new_field.name().clone(),
|
||||
influxdb_type,
|
||||
stats,
|
||||
};
|
||||
|
||||
let mut merger = SchemaMerger::new();
|
||||
merger = merger.merge(&new_column_schema).unwrap();
|
||||
merger = merger
|
||||
.merge(self.schema.as_ref())
|
||||
.expect("merging was successful");
|
||||
self.schema = Arc::new(merger.build());
|
||||
|
||||
if let Some(existing_schema) = self.table_schema.as_ref() {
|
||||
merger = merger
|
||||
.merge(existing_schema)
|
||||
.expect("merging was successful");
|
||||
if add_column_summary {
|
||||
let influxdb_type = col_type.map(|t| match t {
|
||||
InfluxColumnType::IOx(_) => todo!(),
|
||||
InfluxColumnType::Tag => InfluxDbType::Tag,
|
||||
InfluxColumnType::Field(_) => InfluxDbType::Field,
|
||||
InfluxColumnType::Timestamp => InfluxDbType::Timestamp,
|
||||
});
|
||||
|
||||
let stats = stats.unwrap_or_else(|| match new_field.data_type() {
|
||||
DataType::Boolean => Statistics::Bool(StatValues::default()),
|
||||
DataType::Int64 => Statistics::I64(StatValues::default()),
|
||||
DataType::UInt64 => Statistics::U64(StatValues::default()),
|
||||
DataType::Utf8 => Statistics::String(StatValues::default()),
|
||||
DataType::Dictionary(_, value_type) => {
|
||||
assert!(matches!(**value_type, DataType::Utf8));
|
||||
Statistics::String(StatValues::default())
|
||||
}
|
||||
DataType::Float64 => Statistics::F64(StatValues::default()),
|
||||
DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()),
|
||||
_ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()),
|
||||
});
|
||||
|
||||
let column_summary = ColumnSummary {
|
||||
name: new_field.name().clone(),
|
||||
influxdb_type,
|
||||
stats,
|
||||
};
|
||||
|
||||
self.table_summary.columns.push(column_summary);
|
||||
}
|
||||
let new_schema = merger.build();
|
||||
|
||||
self.table_schema = Some(new_schema);
|
||||
|
||||
let mut table_summary = self
|
||||
.table_summary
|
||||
.take()
|
||||
.unwrap_or_else(|| TableSummary::new(table_name));
|
||||
table_summary.columns.push(column_summary);
|
||||
self.table_summary = Some(table_summary);
|
||||
|
||||
self
|
||||
}
|
||||
|
@ -382,15 +428,10 @@ impl TestChunk {
|
|||
|
||||
/// Prepares this chunk to return a specific record batch with one
|
||||
/// row of non null data.
|
||||
pub fn with_one_row_of_null_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||
//let table_name = table_name.into();
|
||||
let schema = self
|
||||
.table_schema
|
||||
.as_ref()
|
||||
.expect("table must exist in TestChunk");
|
||||
|
||||
pub fn with_one_row_of_data(mut self) -> Self {
|
||||
// create arrays
|
||||
let columns = schema
|
||||
let columns = self
|
||||
.schema
|
||||
.iter()
|
||||
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||
DataType::Int64 => Arc::new(Int64Array::from(vec![1000])) as ArrayRef,
|
||||
|
@ -411,7 +452,8 @@ impl TestChunk {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||
let batch =
|
||||
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
|
||||
println!("TestChunk batch data: {:#?}", batch);
|
||||
|
||||
self.table_data.push(Arc::new(batch));
|
||||
|
@ -428,14 +470,10 @@ impl TestChunk {
|
|||
/// "| UT | RI | 70 | 1970-01-01 00:00:00.000020 |",
|
||||
/// "+------+------+-----------+-------------------------------+",
|
||||
/// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(8000, 20000)
|
||||
pub fn with_three_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||
let schema = self
|
||||
.table_schema
|
||||
.as_ref()
|
||||
.expect("table must exist in TestChunk");
|
||||
|
||||
pub fn with_three_rows_of_data(mut self) -> Self {
|
||||
// create arrays
|
||||
let columns = schema
|
||||
let columns = self
|
||||
.schema
|
||||
.iter()
|
||||
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||
DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70])) as ArrayRef,
|
||||
|
@ -475,7 +513,8 @@ impl TestChunk {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||
let batch =
|
||||
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
|
||||
|
||||
self.table_data.push(Arc::new(batch));
|
||||
self
|
||||
|
@ -492,14 +531,10 @@ impl TestChunk {
|
|||
/// "| VT | NC | 50 | 1970-01-01 00:00:00.000210 |", // duplicate of (1)
|
||||
/// "+------+------+-----------+-------------------------------+",
|
||||
/// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(28000, 220000)
|
||||
pub fn with_four_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||
let schema = self
|
||||
.table_schema
|
||||
.as_ref()
|
||||
.expect("table must exist in TestChunk");
|
||||
|
||||
pub fn with_four_rows_of_data(mut self) -> Self {
|
||||
// create arrays
|
||||
let columns = schema
|
||||
let columns = self
|
||||
.schema
|
||||
.iter()
|
||||
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||
DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70, 50])) as ArrayRef,
|
||||
|
@ -539,7 +574,8 @@ impl TestChunk {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||
let batch =
|
||||
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
|
||||
|
||||
self.table_data.push(Arc::new(batch));
|
||||
self
|
||||
|
@ -557,14 +593,10 @@ impl TestChunk {
|
|||
/// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |",
|
||||
/// "+------+------+-----------+-------------------------------+",
|
||||
/// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000)
|
||||
pub fn with_five_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||
let schema = self
|
||||
.table_schema
|
||||
.as_ref()
|
||||
.expect("table must exist in TestChunk");
|
||||
|
||||
pub fn with_five_rows_of_data(mut self) -> Self {
|
||||
// create arrays
|
||||
let columns = schema
|
||||
let columns = self
|
||||
.schema
|
||||
.iter()
|
||||
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||
DataType::Int64 => {
|
||||
|
@ -611,7 +643,8 @@ impl TestChunk {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||
let batch =
|
||||
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
|
||||
|
||||
self.table_data.push(Arc::new(batch));
|
||||
self
|
||||
|
@ -634,15 +667,10 @@ impl TestChunk {
|
|||
/// "| MT | AL | 30 | 1970-01-01 00:00:00.000005 |", // Duplicate with (3)
|
||||
/// "+------+------+-----------+-------------------------------+",
|
||||
/// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000)
|
||||
pub fn with_ten_rows_of_data_some_duplicates(mut self, _table_name: impl Into<String>) -> Self {
|
||||
//let table_name = table_name.into();
|
||||
let schema = self
|
||||
.table_schema
|
||||
.as_ref()
|
||||
.expect("table must exist in TestChunk");
|
||||
|
||||
pub fn with_ten_rows_of_data_some_duplicates(mut self) -> Self {
|
||||
// create arrays
|
||||
let columns = schema
|
||||
let columns = self
|
||||
.schema
|
||||
.iter()
|
||||
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||
DataType::Int64 => Arc::new(Int64Array::from(vec![
|
||||
|
@ -693,35 +721,34 @@ impl TestChunk {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||
let batch =
|
||||
RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch");
|
||||
|
||||
self.table_data.push(Arc::new(batch));
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns all columns of the table
|
||||
pub fn all_column_names(&self) -> Option<StringSet> {
|
||||
let column_names = self.table_schema.as_ref().map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
|
||||
column_names
|
||||
pub fn all_column_names(&self) -> StringSet {
|
||||
self.schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns just the specified columns
|
||||
pub fn specific_column_names_selection(&self, columns: &[&str]) -> Option<StringSet> {
|
||||
let column_names = self.table_schema.as_ref().map(|schema| {
|
||||
schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.filter(|col| columns.contains(&col.as_str()))
|
||||
.collect::<StringSet>()
|
||||
});
|
||||
pub fn specific_column_names_selection(&self, columns: &[&str]) -> StringSet {
|
||||
self.schema
|
||||
.iter()
|
||||
.map(|(_, field)| field.name().to_string())
|
||||
.filter(|col| columns.contains(&col.as_str()))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
column_names
|
||||
impl fmt::Display for TestChunk {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.table_name())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -733,7 +760,7 @@ impl QueryChunk for TestChunk {
|
|||
}
|
||||
|
||||
fn table_name(&self) -> &str {
|
||||
self.table_name.as_deref().unwrap()
|
||||
&self.table_name
|
||||
}
|
||||
|
||||
fn may_contain_pk_duplicates(&self) -> bool {
|
||||
|
@ -772,17 +799,11 @@ impl QueryChunk for TestChunk {
|
|||
}
|
||||
|
||||
// otherwise fall back to basic filtering based on table name predicate.
|
||||
let predicate_match = self
|
||||
.table_name
|
||||
.as_ref()
|
||||
.map(|table_name| {
|
||||
if !predicate.should_include_table(&table_name) {
|
||||
PredicateMatch::Zero
|
||||
} else {
|
||||
PredicateMatch::Unknown
|
||||
}
|
||||
})
|
||||
.unwrap_or(PredicateMatch::Unknown);
|
||||
let predicate_match = if !predicate.should_include_table(&self.table_name) {
|
||||
PredicateMatch::Zero
|
||||
} else {
|
||||
PredicateMatch::Unknown
|
||||
};
|
||||
|
||||
Ok(predicate_match)
|
||||
}
|
||||
|
@ -812,22 +833,17 @@ impl QueryChunk for TestChunk {
|
|||
Selection::Some(cols) => self.specific_column_names_selection(cols),
|
||||
};
|
||||
|
||||
Ok(column_names)
|
||||
Ok(Some(column_names))
|
||||
}
|
||||
}
|
||||
|
||||
impl QueryChunkMeta for TestChunk {
|
||||
fn summary(&self) -> &TableSummary {
|
||||
self.table_summary
|
||||
.as_ref()
|
||||
.expect("Table summary not configured for TestChunk")
|
||||
&self.table_summary
|
||||
}
|
||||
|
||||
fn schema(&self) -> Arc<Schema> {
|
||||
self.table_schema
|
||||
.as_ref()
|
||||
.map(|s| Arc::new(s.clone()))
|
||||
.expect("schema was set")
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1252,13 +1252,13 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk0 = TestChunk::new(0)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne)
|
||||
.with_table("h2o");
|
||||
let chunk0 = TestChunk::new("h2o")
|
||||
.with_id(0)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne);
|
||||
|
||||
let chunk1 = TestChunk::new(1)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne)
|
||||
.with_table("o2");
|
||||
let chunk1 = TestChunk::new("o2")
|
||||
.with_id(1)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne);
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1346,13 +1346,15 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Note multiple tables / measureemnts:
|
||||
let chunk0 = TestChunk::new(0)
|
||||
.with_tag_column("m1", "k1")
|
||||
.with_tag_column("m1", "k2");
|
||||
let chunk0 = TestChunk::new("m1")
|
||||
.with_id(0)
|
||||
.with_tag_column("k1")
|
||||
.with_tag_column("k2");
|
||||
|
||||
let chunk1 = TestChunk::new(1)
|
||||
.with_tag_column("m2", "k3")
|
||||
.with_tag_column("m2", "k4");
|
||||
let chunk1 = TestChunk::new("m2")
|
||||
.with_id(1)
|
||||
.with_tag_column("k3")
|
||||
.with_tag_column("k4");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1414,9 +1416,7 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1458,15 +1458,15 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk0 = TestChunk::new(0)
|
||||
let chunk0 = TestChunk::new("m1")
|
||||
// predicate specifies m4, so this is filtered out
|
||||
.with_tag_column("m1", "k0");
|
||||
.with_tag_column("k0");
|
||||
|
||||
let chunk1 = TestChunk::new(1)
|
||||
.with_tag_column("m4", "k1")
|
||||
.with_tag_column("m4", "k2")
|
||||
.with_tag_column("m4", "k3")
|
||||
.with_tag_column("m4", "k4");
|
||||
let chunk1 = TestChunk::new("m4")
|
||||
.with_tag_column("k1")
|
||||
.with_tag_column("k2")
|
||||
.with_tag_column("k3")
|
||||
.with_tag_column("k4");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1540,10 +1540,8 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
// predicate specifies m4, so this is filtered out
|
||||
.with_table("my_table")
|
||||
.with_error("This is an error");
|
||||
// predicate specifies m4, so this is filtered out
|
||||
let chunk = TestChunk::new("my_table").with_error("This is an error");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1587,10 +1585,10 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Add a chunk with a field
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1648,9 +1646,7 @@ mod tests {
|
|||
tag_key: [0].into(),
|
||||
};
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOne)
|
||||
.with_table("h2o");
|
||||
let chunk = TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOne);
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1679,11 +1675,11 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Add a chunk with a field
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_int_field_column("TheMeasurement", "Field1")
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_i64_field_column("Field1")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1727,9 +1723,7 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1798,10 +1792,10 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Add a chunk with a field
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1847,9 +1841,7 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1953,10 +1945,10 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Add a chunk with a field
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1999,9 +1991,7 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2041,10 +2031,10 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2093,9 +2083,7 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2177,10 +2165,10 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Add a chunk with a field
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2237,10 +2225,10 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Add a chunk with a field
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2305,9 +2293,7 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_table("my_table")
|
||||
.with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new("my_table").with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2363,11 +2349,11 @@ mod tests {
|
|||
let partition_id = 1;
|
||||
|
||||
// Add a chunk with a field
|
||||
let chunk = TestChunk::new(0)
|
||||
.with_int_field_column("TheMeasurement", "Field1")
|
||||
.with_time_column("TheMeasurement")
|
||||
.with_tag_column("TheMeasurement", "state")
|
||||
.with_one_row_of_null_data("TheMeasurement");
|
||||
let chunk = TestChunk::new("TheMeasurement")
|
||||
.with_i64_field_column("Field1")
|
||||
.with_time_column()
|
||||
.with_tag_column("state")
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2413,7 +2399,7 @@ mod tests {
|
|||
let db_info = OrgAndBucket::new(123, 456);
|
||||
let partition_id = 1;
|
||||
|
||||
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
|
||||
let chunk = TestChunk::new("t").with_error("Sugar we are going down");
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
|
Loading…
Reference in New Issue