diff --git a/data_types/src/partition_metadata.rs b/data_types/src/partition_metadata.rs index 90f8021b48..62f8ed2979 100644 --- a/data_types/src/partition_metadata.rs +++ b/data_types/src/partition_metadata.rs @@ -1,12 +1,13 @@ //! This module contains structs that describe the metadata for a partition //! including schema, summary statistics, and file locations in storage. -use std::{borrow::Cow, mem}; - use serde::{Deserialize, Serialize}; -use std::borrow::Borrow; -use std::iter::FromIterator; -use std::num::NonZeroU64; +use std::{ + borrow::{Borrow, Cow}, + iter::FromIterator, + mem, + num::NonZeroU64, +}; /// Describes the aggregated (across all chunks) summary /// statistics for each column in a partition @@ -44,11 +45,7 @@ pub struct PartitionChunkSummary { impl FromIterator for TableSummary { fn from_iter>(iter: T) -> Self { let mut iter = iter.into_iter(); - let first = iter.next().expect("must contain at least one element"); - let mut s = Self { - name: first.name, - columns: first.columns, - }; + let mut s = iter.next().expect("must contain at least one element"); for other in iter { s.update_from(&other) diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 366e2fe852..5519c5eb3f 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -270,21 +270,21 @@ mod test { async fn get_test_chunks() -> (Arc, Vec>) { // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Chunk 2 has an extra field, and only 4 fields let chunk2 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") - .with_int_field_column("t", "field_int2") - .with_four_rows_of_data("t"), + TestChunk::new("t") + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") + .with_i64_field_column("field_int2") + .with_four_rows_of_data(), ); let expected = vec![ diff --git a/query/src/provider.rs b/query/src/provider.rs index a7dbeeac12..f533858128 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -765,18 +765,31 @@ mod test { // in the duplicate module // c1: no overlaps - let c1 = Arc::new(TestChunk::new(1).with_tag_column_with_stats("t", "tag1", "a", "b")); + let c1 = Arc::new(TestChunk::new("t").with_id(1).with_tag_column_with_stats( + "tag1", + Some("a"), + Some("b"), + )); // c2: over lap with c3 - let c2 = Arc::new(TestChunk::new(2).with_tag_column_with_stats("t", "tag1", "c", "d")); + let c2 = Arc::new(TestChunk::new("t").with_id(2).with_tag_column_with_stats( + "tag1", + Some("c"), + Some("d"), + )); // c3: overlap with c2 - let c3 = Arc::new(TestChunk::new(3).with_tag_column_with_stats("t", "tag1", "c", "d")); + let c3 = Arc::new(TestChunk::new("t").with_id(3).with_tag_column_with_stats( + "tag1", + Some("c"), + Some("d"), + )); // c4: self overlap let c4 = Arc::new( - TestChunk::new(4) - .with_tag_column_with_stats("t", "tag1", "e", "f") + TestChunk::new("t") + .with_id(4) + .with_tag_column_with_stats("tag1", Some("e"), Some("f")) .with_may_contain_pk_duplicates(true), ); @@ -797,11 +810,11 @@ mod test { async fn sort_planning_one_tag_with_time() { // Chunk 1 with 5 rows of data let chunk = Arc::new( - TestChunk::new(1) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Datafusion schema of the chunk @@ -851,12 +864,12 @@ mod test { async fn sort_planning_two_tags_with_time() { // Chunk 1 with 5 rows of data let chunk = Arc::new( - TestChunk::new(1) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Datafusion schema of the chunk @@ -906,12 +919,12 @@ mod test { async fn sort_read_filter_plan_for_two_tags_with_time() { // Chunk 1 with 5 rows of data let chunk = Arc::new( - TestChunk::new(1) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Datafusion schema of the chunk @@ -943,22 +956,24 @@ mod test { async fn deduplicate_plan_for_overlapped_chunks() { // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( - TestChunk::new(1) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Chunk 2 exactly the same with Chunk 1 let chunk2 = Arc::new( - TestChunk::new(2) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Datafusion schema of the chunk // the same for 2 chunks @@ -1011,22 +1026,24 @@ mod test { // Same two chunks but only select the field and timestamp, not the tag values // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( - TestChunk::new(1) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Chunk 2 exactly the same with Chunk 1 let chunk2 = Arc::new( - TestChunk::new(2) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); let chunks = vec![chunk1, chunk2]; @@ -1083,30 +1100,33 @@ mod test { // Chunks with different fields / tags, and select a subset // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( - TestChunk::new(1) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Chunk 2 same tags, but different fields let chunk2 = Arc::new( - TestChunk::new(2) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_int_field_column("t", "other_field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("other_field_int") + .with_five_rows_of_data(), ); // Chunk 3 exactly the same with Chunk 2 let chunk3 = Arc::new( - TestChunk::new(3) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_int_field_column("t", "other_field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(3) + .with_time_column() + .with_tag_column("tag1") + .with_i64_field_column("other_field_int") + .with_five_rows_of_data(), ); let chunks = vec![chunk1, chunk2, chunk3]; @@ -1172,32 +1192,35 @@ mod test { async fn deduplicate_plan_for_overlapped_chunks_with_different_schemas() { // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( - TestChunk::new(1) - .with_time_column("t") - .with_tag_column("t", "tag1") - .with_tag_column("t", "tag2") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(1) + .with_time_column() + .with_tag_column("tag1") + .with_tag_column("tag2") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Chunk 2 has two different tags let chunk2 = Arc::new( - TestChunk::new(2) - .with_time_column("t") - .with_tag_column("t", "tag3") - .with_tag_column("t", "tag1") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(2) + .with_time_column() + .with_tag_column("tag3") + .with_tag_column("tag1") + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Chunk 3 has just tag3 let chunk3 = Arc::new( - TestChunk::new(3) - .with_time_column("t") - .with_tag_column("t", "tag3") - .with_int_field_column("t", "field_int") - .with_int_field_column("t", "field_int2") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(3) + .with_time_column() + .with_tag_column("tag3") + .with_i64_field_column("field_int") + .with_i64_field_column("field_int2") + .with_five_rows_of_data(), ); // Requested output schema == the schema for all three @@ -1271,11 +1294,11 @@ mod test { async fn scan_plan_with_one_chunk_no_duplicates() { // Test no duplicate at all let chunk = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Datafusion schema of the chunk @@ -1319,12 +1342,12 @@ mod test { async fn scan_plan_with_one_chunk_with_duplicates() { // Test one chunk with duplicate within let chunk = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") + TestChunk::new("t") + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) - .with_ten_rows_of_data_some_duplicates("t"), + .with_ten_rows_of_data_some_duplicates(), ); // Datafusion schema of the chunk @@ -1375,12 +1398,12 @@ mod test { async fn scan_plan_with_one_chunk_with_duplicates_subset() { // Test one chunk with duplicate within let chunk = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") + TestChunk::new("t") + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) - .with_ten_rows_of_data_some_duplicates("t"), + .with_ten_rows_of_data_some_duplicates(), ); let chunks = vec![chunk]; @@ -1440,19 +1463,19 @@ mod test { async fn scan_plan_with_two_overlapped_chunks_with_duplicates() { // test overlapped chunks let chunk1 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") - .with_ten_rows_of_data_some_duplicates("t"), + TestChunk::new("t") + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), ); let chunk2 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // Datafusion schema of the chunk @@ -1509,39 +1532,43 @@ mod test { async fn scan_plan_with_four_chunks() { // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within let chunk1 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") - .with_ten_rows_of_data_some_duplicates("t"), + TestChunk::new("t") + .with_id(1) + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") + .with_ten_rows_of_data_some_duplicates(), ); // chunk2 overlaps with chunk 1 let chunk2 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 5, 7000) - .with_tag_column_with_stats("t", "tag1", "AL", "MT") - .with_int_field_column("t", "field_int") - .with_five_rows_of_data("t"), + TestChunk::new("t") + .with_id(2) + .with_time_column_with_stats(Some(5), Some(7000)) + .with_tag_column_with_stats("tag1", Some("AL"), Some("MT")) + .with_i64_field_column("field_int") + .with_five_rows_of_data(), ); // chunk3 no overlap, no duplicates within let chunk3 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 8000, 20000) - .with_tag_column_with_stats("t", "tag1", "UT", "WA") - .with_int_field_column("t", "field_int") - .with_three_rows_of_data("t"), + TestChunk::new("t") + .with_id(3) + .with_time_column_with_stats(Some(8000), Some(20000)) + .with_tag_column_with_stats("tag1", Some("UT"), Some("WA")) + .with_i64_field_column("field_int") + .with_three_rows_of_data(), ); // chunk3 no overlap, duplicates within let chunk4 = Arc::new( - TestChunk::new(1) - .with_time_column_with_stats("t", 28000, 220000) - .with_tag_column_with_stats("t", "tag1", "UT", "WA") - .with_int_field_column("t", "field_int") + TestChunk::new("t") + .with_id(4) + .with_time_column_with_stats(Some(28000), Some(220000)) + .with_tag_column_with_stats("tag1", Some("UT"), Some("WA")) + .with_i64_field_column("field_int") .with_may_contain_pk_duplicates(true) - .with_four_rows_of_data("t"), + .with_four_rows_of_data(), ); // Datafusion schema of the chunk diff --git a/query/src/provider/overlap.rs b/query/src/provider/overlap.rs index f1e001a413..bf93f39074 100644 --- a/query/src/provider/overlap.rs +++ b/query/src/provider/overlap.rs @@ -276,13 +276,8 @@ where #[cfg(test)] mod test { - use data_types::partition_metadata::{ - ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary, - }; - use internal_types::schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}; - use std::sync::Arc; - use super::*; + use crate::{test::TestChunk, QueryChunk}; #[macro_export] macro_rules! assert_groups_eq { @@ -304,9 +299,17 @@ mod test { #[test] fn one_column_no_overlap() { - let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("mumbai")); + let c1 = TestChunk::new("chunk1").with_tag_column_with_stats( + "tag1", + Some("boston"), + Some("mumbai"), + ); - let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("new york"), Some("zoo york")); + let c2 = TestChunk::new("chunk2").with_tag_column_with_stats( + "tag1", + Some("new york"), + Some("zoo york"), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -316,9 +319,17 @@ mod test { #[test] fn one_column_overlap() { - let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("boston"), Some("new york")); + let c1 = TestChunk::new("chunk1").with_tag_column_with_stats( + "tag1", + Some("boston"), + Some("new york"), + ); - let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("denver"), Some("zoo york")); + let c2 = TestChunk::new("chunk2").with_tag_column_with_stats( + "tag1", + Some("denver"), + Some("zoo york"), + ); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -328,26 +339,24 @@ mod test { #[test] fn multi_columns() { - let c1 = TestChunk::new("chunk1").with_timestamp(0, 1000).with_tag( - "tag1", - Some("boston"), - Some("new york"), - ); + let c1 = TestChunk::new("chunk1") + .with_time_column_with_stats(Some(0), Some(1000)) + .with_tag_column_with_stats("tag1", Some("boston"), Some("new york")); // Overlaps in tag1, but not in time let c2 = TestChunk::new("chunk2") - .with_tag("tag1", Some("denver"), Some("zoo york")) - .with_timestamp(2000, 3000); + .with_tag_column_with_stats("tag1", Some("denver"), Some("zoo york")) + .with_time_column_with_stats(Some(2000), Some(3000)); // Overlaps in time, but not in tag1 let c3 = TestChunk::new("chunk3") - .with_tag("tag1", Some("zzx"), Some("zzy")) - .with_timestamp(500, 1500); + .with_tag_column_with_stats("tag1", Some("zzx"), Some("zzy")) + .with_time_column_with_stats(Some(500), Some(1500)); // Overlaps in time, and in tag1 let c4 = TestChunk::new("chunk4") - .with_tag("tag1", Some("aaa"), Some("zzz")) - .with_timestamp(500, 1500); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("zzz")) + .with_time_column_with_stats(Some(500), Some(1500)); let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded"); @@ -362,8 +371,10 @@ mod test { #[test] fn boundary() { // check that overlap calculations include the bound - let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb")); - let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("bbb"), Some("ccc")); + let c1 = + TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); + let c2 = + TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("bbb"), Some("ccc")); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -374,8 +385,10 @@ mod test { #[test] fn same() { // check that if chunks overlap exactly on the boundaries they are still grouped - let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb")); - let c2 = TestChunk::new("chunk2").with_tag("tag1", Some("aaa"), Some("bbb")); + let c1 = + TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); + let c2 = + TestChunk::new("chunk2").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -386,8 +399,10 @@ mod test { #[test] fn different_tag_names() { // check that if chunks overlap but in different tag names - let c1 = TestChunk::new("chunk1").with_tag("tag1", Some("aaa"), Some("bbb")); - let c2 = TestChunk::new("chunk2").with_tag("tag2", Some("aaa"), Some("bbb")); + let c1 = + TestChunk::new("chunk1").with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")); + let c2 = + TestChunk::new("chunk2").with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -401,12 +416,12 @@ mod test { fn different_tag_names_multi_tags() { // check that if chunks overlap but in different tag names let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", Some("aaa"), Some("bbb")); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")); let c2 = TestChunk::new("chunk2") - .with_tag("tag2", Some("aaa"), Some("bbb")) - .with_tag("tag3", Some("aaa"), Some("bbb")); + .with_tag_column_with_stats("tag2", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag3", Some("aaa"), Some("bbb")); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -418,21 +433,21 @@ mod test { #[test] fn three_column() { let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", Some("xxx"), Some("yyy")) - .with_timestamp(0, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)); let c2 = TestChunk::new("chunk2") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", Some("xxx"), Some("yyy")) + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) // Timestamp doesn't overlap, but the two tags do - .with_timestamp(2001, 3000); + .with_time_column_with_stats(Some(2001), Some(3000)); let c3 = TestChunk::new("chunk3") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", Some("aaa"), Some("zzz")) + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz")) // all three overlap - .with_timestamp(1000, 2000); + .with_time_column_with_stats(Some(1000), Some(2000)); let groups = group_potential_duplicates(vec![c1, c2, c3]).expect("grouping succeeded"); @@ -443,15 +458,15 @@ mod test { #[test] fn tag_order() { let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", Some("xxx"), Some("yyy")) - .with_timestamp(0, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)); let c2 = TestChunk::new("chunk2") - .with_tag("tag2", Some("aaa"), Some("zzz")) - .with_tag("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("aaa"), Some("zzz")) + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) // all three overlap, but tags in different order - .with_timestamp(500, 1000); + .with_time_column_with_stats(Some(500), Some(1000)); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -462,15 +477,15 @@ mod test { #[test] fn tag_order_no_tags() { let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", Some("xxx"), Some("yyy")) - .with_timestamp(0, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)); let c2 = TestChunk::new("chunk2") // tag1 and timestamp overlap, but no tag2 (aka it is all null) // so it could overlap if there was a null tag2 value in chunk1 - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_timestamp(500, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_time_column_with_stats(Some(500), Some(1000)); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -481,16 +496,16 @@ mod test { #[test] fn tag_order_null_stats() { let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", Some("xxx"), Some("yyy")) - .with_timestamp(0, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", Some("xxx"), Some("yyy")) + .with_time_column_with_stats(Some(0), Some(1000)); let c2 = TestChunk::new("chunk2") // tag1 and timestamp overlap, tag2 has no stats (is all null) // so they might overlap if chunk1 had a null in tag 2 - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_tag("tag2", None, None) - .with_timestamp(500, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_tag_column_with_stats("tag2", None, None) + .with_time_column_with_stats(Some(500), Some(1000)); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -501,13 +516,13 @@ mod test { #[test] fn tag_order_partial_stats() { let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_timestamp(0, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_time_column_with_stats(Some(0), Some(1000)); let c2 = TestChunk::new("chunk2") // tag1 has a min but not a max. Should result in error - .with_tag("tag1", Some("aaa"), None) - .with_timestamp(500, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), None) + .with_time_column_with_stats(Some(500), Some(1000)); let result = group_potential_duplicates(vec![c1, c2]).unwrap_err(); @@ -525,16 +540,16 @@ mod test { #[test] fn tag_fields_not_counted() { let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_int_field("field", Some(0), Some(2)) - .with_timestamp(0, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_i64_field_column_with_stats("field", Some(0), Some(2)) + .with_time_column_with_stats(Some(0), Some(1000)); let c2 = TestChunk::new("chunk2") // tag1 and timestamp overlap, but field value does not // should still overlap - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_int_field("field", Some(100), Some(200)) - .with_timestamp(500, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_i64_field_column_with_stats("field", Some(100), Some(200)) + .with_time_column_with_stats(Some(500), Some(1000)); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -548,15 +563,15 @@ mod test { // chunks; this will likely cause errors elsewhere in practice // as the schemas are incompatible (and can't be merged) let c1 = TestChunk::new("chunk1") - .with_tag("tag1", Some("aaa"), Some("bbb")) - .with_timestamp(0, 1000); + .with_tag_column_with_stats("tag1", Some("aaa"), Some("bbb")) + .with_time_column_with_stats(Some(0), Some(1000)); let c2 = TestChunk::new("chunk2") // tag1 column is actually a field is different in chunk // 2, so since the timestamps overlap these chunks // might also have duplicates (if tag1 was null in c1) - .with_int_field("tag1", Some(100), Some(200)) - .with_timestamp(0, 1000); + .with_i64_field_column_with_stats("tag1", Some(100), Some(200)) + .with_time_column_with_stats(Some(0), Some(1000)); let groups = group_potential_duplicates(vec![c1, c2]).expect("grouping succeeded"); @@ -569,115 +584,9 @@ mod test { fn to_string(groups: Vec>) -> Vec { let mut s = vec![]; for (idx, group) in groups.iter().enumerate() { - let names = group.iter().map(|c| c.name.as_str()).collect::>(); + let names = group.iter().map(|c| c.table_name()).collect::>(); s.push(format!("Group {}: [{}]", idx, names.join(", "))); } s } - - /// Mocked out prunable provider to use testing overlaps - #[derive(Debug, Clone)] - struct TestChunk { - // The name of this chunk - name: String, - summary: TableSummary, - builder: SchemaBuilder, - } - - /// Implementation of creating a new column with statitics for TestChunkMeta - macro_rules! make_stats { - ($MIN:expr, $MAX:expr, $STAT_TYPE:ident) => {{ - Statistics::$STAT_TYPE(StatValues { - distinct_count: None, - min: $MIN, - max: $MAX, - count: 42, - }) - }}; - } - - impl TestChunk { - /// Create a new TestChunk with a specified name - fn new(name: impl Into) -> Self { - let name = name.into(); - let summary = TableSummary::new(name.clone()); - let builder = SchemaBuilder::new(); - Self { - name, - summary, - builder, - } - } - - /// Adds a tag column with the specified min/max values - fn with_tag( - mut self, - name: impl Into, - min: Option<&str>, - max: Option<&str>, - ) -> Self { - let min = min.map(|v| v.to_string()); - let max = max.map(|v| v.to_string()); - - let tag_name = name.into(); - self.builder.tag(&tag_name); - - self.summary.columns.push(ColumnSummary { - name: tag_name, - influxdb_type: Some(InfluxDbType::Tag), - stats: make_stats!(min, max, String), - }); - self - } - - /// Adds a timestamp column with the specified min/max values - fn with_timestamp(mut self, min: i64, max: i64) -> Self { - self.builder.timestamp(); - - let min = Some(min); - let max = Some(max); - - self.summary.columns.push(ColumnSummary { - name: TIME_COLUMN_NAME.into(), - influxdb_type: Some(InfluxDbType::Timestamp), - stats: make_stats!(min, max, I64), - }); - self - } - - /// Adds an I64 field column with the specified min/max values - fn with_int_field( - mut self, - name: impl Into, - min: Option, - max: Option, - ) -> Self { - let field_name = name.into(); - self.builder - .field(&field_name, arrow::datatypes::DataType::Int64); - - self.summary.columns.push(ColumnSummary { - name: field_name, - influxdb_type: Some(InfluxDbType::Field), - stats: make_stats!(min, max, I64), - }); - self - } - } - - impl QueryChunkMeta for TestChunk { - fn summary(&self) -> &TableSummary { - &self.summary - } - - fn schema(&self) -> Arc { - let schema = self - .builder - // need to clone because `build` resets builder state - .clone() - .build() - .expect("created schema"); - Arc::new(schema) - } - } } diff --git a/query/src/pruning.rs b/query/src/pruning.rs index c56b29374d..2f8804383e 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -165,20 +165,15 @@ impl<'a> PruningStatistics for ChunkMetaStats<'a> { #[cfg(test)] mod test { use super::*; - use std::{cell::RefCell, fmt, sync::Arc}; - - use arrow::datatypes::DataType; - use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics}; + use crate::{predicate::PredicateBuilder, test::TestChunk, QueryChunk}; use datafusion::logical_plan::{col, lit}; - use internal_types::schema::{builder::SchemaBuilder, merge::SchemaMerger, Schema}; - - use crate::predicate::PredicateBuilder; + use std::{cell::RefCell, sync::Arc}; #[test] fn test_empty() { test_helpers::maybe_start_logging(); let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1")); + let c1 = Arc::new(TestChunk::new("chunk1")); let predicate = PredicateBuilder::new().build(); let pruned = prune_chunks(&observer, vec![c1], &predicate); @@ -196,7 +191,7 @@ mod test { // column1 > 100.0 where // c1: [0.0, 10.0] --> pruned let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_f64_column( + let c1 = Arc::new(TestChunk::new("chunk1").with_f64_field_column_with_stats( "column1", Some(0.0), Some(10.0), @@ -218,8 +213,11 @@ mod test { // c1: [0, 10] --> pruned let observer = TestObserver::new(); - let c1 = - Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(10))); + let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats( + "column1", + Some(0), + Some(10), + )); let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -238,8 +236,11 @@ mod test { // c1: [0, 10] --> pruned let observer = TestObserver::new(); - let c1 = - Arc::new(TestChunkMeta::new("chunk1").with_u64_column("column1", Some(0), Some(10))); + let c1 = Arc::new(TestChunk::new("chunk1").with_u64_field_column_with_stats( + "column1", + Some(0), + Some(10), + )); let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -258,7 +259,7 @@ mod test { // c1: [false, false] --> pruned let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_bool_column( + let c1 = Arc::new(TestChunk::new("chunk1").with_bool_field_column_with_stats( "column1", Some(false), Some(false), @@ -279,11 +280,13 @@ mod test { // c1: ["a", "q"] --> pruned let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_string_column( - "column1", - Some("a"), - Some("q"), - )); + let c1 = Arc::new( + TestChunk::new("chunk1").with_string_field_column_with_stats( + "column1", + Some("a"), + Some("q"), + ), + ); let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit("z"))) @@ -301,7 +304,7 @@ mod test { // column1 < 100.0 where // c1: [0.0, 10.0] --> not pruned let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_f64_column( + let c1 = Arc::new(TestChunk::new("chunk1").with_f64_field_column_with_stats( "column1", Some(0.0), Some(10.0), @@ -323,8 +326,11 @@ mod test { // c1: [0, 10] --> not pruned let observer = TestObserver::new(); - let c1 = - Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(10))); + let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats( + "column1", + Some(0), + Some(10), + )); let predicate = PredicateBuilder::new() .add_expr(col("column1").lt(lit(100))) @@ -343,8 +349,11 @@ mod test { // c1: [0, 10] --> not pruned let observer = TestObserver::new(); - let c1 = - Arc::new(TestChunkMeta::new("chunk1").with_u64_column("column1", Some(0), Some(10))); + let c1 = Arc::new(TestChunk::new("chunk1").with_u64_field_column_with_stats( + "column1", + Some(0), + Some(10), + )); let predicate = PredicateBuilder::new() .add_expr(col("column1").lt(lit(100))) @@ -363,7 +372,7 @@ mod test { // c1: [false, true] --> not pruned let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_bool_column( + let c1 = Arc::new(TestChunk::new("chunk1").with_bool_field_column_with_stats( "column1", Some(false), Some(true), @@ -384,11 +393,13 @@ mod test { // c1: ["a", "q"] --> not pruned let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_string_column( - "column1", - Some("a"), - Some("q"), - )); + let c1 = Arc::new( + TestChunk::new("chunk1").with_string_field_column_with_stats( + "column1", + Some("a"), + Some("q"), + ), + ); let predicate = PredicateBuilder::new() .add_expr(col("column1").lt(lit("z"))) @@ -410,13 +421,23 @@ mod test { // c4: Null --> not pruned (no statistics at all) let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", None, Some(10))); + let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats( + "column1", + None, + Some(10), + )); - let c2 = Arc::new(TestChunkMeta::new("chunk2").with_i64_column("column1", Some(0), None)); + let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats( + "column1", + Some(0), + None, + )); - let c3 = Arc::new(TestChunkMeta::new("chunk3").with_i64_column("column1", None, None)); + let c3 = Arc::new( + TestChunk::new("chunk3").with_i64_field_column_with_stats("column1", None, None), + ); - let c4 = Arc::new(TestChunkMeta::new("chunk4").with_i64_column_no_stats("column1")); + let c4 = Arc::new(TestChunk::new("chunk4").with_i64_field_column_no_stats("column1")); let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -440,20 +461,39 @@ mod test { // c6: [None, 10] --> pruned let observer = TestObserver::new(); - let c1 = - Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(10))); + let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats( + "column1", + Some(0), + Some(10), + )); - let c2 = - Arc::new(TestChunkMeta::new("chunk2").with_i64_column("column1", Some(0), Some(1000))); + let c2 = Arc::new(TestChunk::new("chunk2").with_i64_field_column_with_stats( + "column1", + Some(0), + Some(1000), + )); - let c3 = - Arc::new(TestChunkMeta::new("chunk3").with_i64_column("column1", Some(10), Some(20))); + let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats( + "column1", + Some(10), + Some(20), + )); - let c4 = Arc::new(TestChunkMeta::new("chunk4").with_i64_column("column1", None, None)); + let c4 = Arc::new( + TestChunk::new("chunk4").with_i64_field_column_with_stats("column1", None, None), + ); - let c5 = Arc::new(TestChunkMeta::new("chunk5").with_i64_column("column1", Some(10), None)); + let c5 = Arc::new(TestChunk::new("chunk5").with_i64_field_column_with_stats( + "column1", + Some(10), + None, + )); - let c6 = Arc::new(TestChunkMeta::new("chunk6").with_i64_column("column1", None, Some(20))); + let c6 = Arc::new(TestChunk::new("chunk6").with_i64_field_column_with_stats( + "column1", + None, + Some(20), + )); let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -477,19 +517,22 @@ mod test { // c3: None, column2 [0, 4] --> not pruned (no stats for column1) let observer = TestObserver::new(); let c1 = Arc::new( - TestChunkMeta::new("chunk1") - .with_i64_column("column1", Some(0), Some(100)) - .with_i64_column("column2", Some(0), Some(4)), + TestChunk::new("chunk1") + .with_i64_field_column_with_stats("column1", Some(0), Some(100)) + .with_i64_field_column_with_stats("column2", Some(0), Some(4)), ); let c2 = Arc::new( - TestChunkMeta::new("chunk2") - .with_i64_column("column1", Some(0), Some(1000)) - .with_i64_column("column2", Some(0), Some(4)), + TestChunk::new("chunk2") + .with_i64_field_column_with_stats("column1", Some(0), Some(1000)) + .with_i64_field_column_with_stats("column2", Some(0), Some(4)), ); - let c3 = - Arc::new(TestChunkMeta::new("chunk3").with_i64_column("column2", Some(0), Some(4))); + let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats( + "column2", + Some(0), + Some(4), + )); let predicate = PredicateBuilder::new() .add_expr(col("column1").gt(lit(100))) @@ -520,39 +563,39 @@ mod test { let observer = TestObserver::new(); let c1 = Arc::new( - TestChunkMeta::new("chunk1") - .with_i64_column("column1", Some(0), Some(1000)) - .with_i64_column("column2", Some(0), Some(4)), + TestChunk::new("chunk1") + .with_i64_field_column_with_stats("column1", Some(0), Some(1000)) + .with_i64_field_column_with_stats("column2", Some(0), Some(4)), ); let c2 = Arc::new( - TestChunkMeta::new("chunk2") - .with_i64_column("column1", Some(0), Some(10)) - .with_i64_column("column2", Some(0), Some(4)), + TestChunk::new("chunk2") + .with_i64_field_column_with_stats("column1", Some(0), Some(10)) + .with_i64_field_column_with_stats("column2", Some(0), Some(4)), ); let c3 = Arc::new( - TestChunkMeta::new("chunk3") - .with_i64_column("column1", Some(0), Some(10)) - .with_i64_column("column2", Some(5), Some(10)), + TestChunk::new("chunk3") + .with_i64_field_column_with_stats("column1", Some(0), Some(10)) + .with_i64_field_column_with_stats("column2", Some(5), Some(10)), ); let c4 = Arc::new( - TestChunkMeta::new("chunk4") - .with_i64_column("column1", Some(1000), Some(2000)) - .with_i64_column("column2", Some(0), Some(4)), + TestChunk::new("chunk4") + .with_i64_field_column_with_stats("column1", Some(1000), Some(2000)) + .with_i64_field_column_with_stats("column2", Some(0), Some(4)), ); let c5 = Arc::new( - TestChunkMeta::new("chunk5") - .with_i64_column("column1", Some(0), Some(10)) - .with_i64_column_no_stats("column2"), + TestChunk::new("chunk5") + .with_i64_field_column_with_stats("column1", Some(0), Some(10)) + .with_i64_field_column_no_stats("column2"), ); let c6 = Arc::new( - TestChunkMeta::new("chunk6") - .with_i64_column_no_stats("column1") - .with_i64_column("column2", Some(0), Some(4)), + TestChunk::new("chunk6") + .with_i64_field_column_no_stats("column1") + .with_i64_field_column_with_stats("column2", Some(0), Some(4)), ); let predicate = PredicateBuilder::new() @@ -580,19 +623,23 @@ mod test { // c3: column1 [1000, 2000] --> pruned (types are correct) let observer = TestObserver::new(); - let c1 = Arc::new(TestChunkMeta::new("chunk1").with_string_column( - "column1", - Some("0"), - Some("9"), - )); + let c1 = Arc::new( + TestChunk::new("chunk1").with_string_field_column_with_stats( + "column1", + Some("0"), + Some("9"), + ), + ); - let c2 = Arc::new(TestChunkMeta::new("chunk2").with_string_column( - "column1", - Some("1000"), - Some("2000"), - )); + let c2 = Arc::new( + TestChunk::new("chunk2").with_string_field_column_with_stats( + "column1", + Some("1000"), + Some("2000"), + ), + ); - let c3 = Arc::new(TestChunkMeta::new("chunk3").with_i64_column( + let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats( "column1", Some(1000), Some(2000), @@ -628,19 +675,25 @@ mod test { // c4: column1 [1000u64, 2000u64] --> pruned (types are different) let observer = TestObserver::new(); - let c1 = - Arc::new(TestChunkMeta::new("chunk1").with_i64_column("column1", Some(0), Some(1000))); + let c1 = Arc::new(TestChunk::new("chunk1").with_i64_field_column_with_stats( + "column1", + Some(0), + Some(1000), + )); - let c2 = - Arc::new(TestChunkMeta::new("chunk2").with_u64_column("column1", Some(0), Some(1000))); + let c2 = Arc::new(TestChunk::new("chunk2").with_u64_field_column_with_stats( + "column1", + Some(0), + Some(1000), + )); - let c3 = Arc::new(TestChunkMeta::new("chunk3").with_i64_column( + let c3 = Arc::new(TestChunk::new("chunk3").with_i64_field_column_with_stats( "column1", Some(1000), Some(2000), )); - let c4 = Arc::new(TestChunkMeta::new("chunk4").with_u64_column( + let c4 = Arc::new(TestChunk::new("chunk4").with_u64_field_column_with_stats( "column1", Some(1000), Some(2000), @@ -656,8 +709,8 @@ mod test { assert_eq!(names(&pruned), vec!["chunk1", "chunk2"]); } - fn names(pruned: &[Arc]) -> Vec<&str> { - pruned.iter().map(|p| p.name.as_str()).collect() + fn names(pruned: &[Arc]) -> Vec<&str> { + pruned.iter().map(|p| p.table_name()).collect() } #[derive(Debug, Default)] @@ -676,7 +729,7 @@ mod test { } impl PruningObserver for TestObserver { - type Observed = TestChunkMeta; + type Observed = TestChunk; fn was_pruned(&self, chunk: &Self::Observed) { self.events.borrow_mut().push(format!("{}: Pruned", chunk)) @@ -694,168 +747,4 @@ mod test { .push(format!("{}: Could not prune chunk: {}", chunk, reason)) } } - - #[derive(Debug, Clone)] - struct TestChunkMeta { - name: String, - summary: TableSummary, - schema: Arc, - } - - /// Implementation of creating a new column with statitics for TestChunkMeta - macro_rules! impl_with_column { - ($SELF:expr, $COLUMN_NAME:expr, $MIN:expr, $MAX:expr, $DATA_TYPE:ident, $STAT_TYPE:ident) => {{ - let Self { - name, - summary, - schema, - } = $SELF; - let column_name = $COLUMN_NAME.into(); - let new_self = Self { - name, - schema: Self::add_field_to_schema(&column_name, schema, DataType::$DATA_TYPE), - summary: Self::add_column_to_summary( - summary, - column_name, - Statistics::$STAT_TYPE(StatValues { - distinct_count: None, - min: $MIN, - max: $MAX, - count: 42, - }), - ), - }; - new_self - }}; - } - - impl TestChunkMeta { - fn new(name: impl Into) -> Self { - let name = name.into(); - let summary = TableSummary::new(&name); - let schema = Arc::new(SchemaBuilder::new().build().unwrap()); - Self { - name, - summary, - schema, - } - } - - /// Adds an f64 column named into the schema - fn with_f64_column( - self, - column_name: impl Into, - min: Option, - max: Option, - ) -> Self { - impl_with_column!(self, column_name, min, max, Float64, F64) - } - - /// Adds an i64 column named into the schema - fn with_i64_column( - self, - column_name: impl Into, - min: Option, - max: Option, - ) -> Self { - impl_with_column!(self, column_name, min, max, Int64, I64) - } - - /// Adds an i64 column named into the schema, but with no stats - fn with_i64_column_no_stats(self, column_name: impl AsRef) -> Self { - let Self { - name, - summary, - schema, - } = self; - Self { - name, - schema: Self::add_field_to_schema(column_name.as_ref(), schema, DataType::Int64), - // Note we don't add any stats - summary, - } - } - - /// Adds an u64 column named into the schema - fn with_u64_column( - self, - column_name: impl Into, - min: Option, - max: Option, - ) -> Self { - impl_with_column!(self, column_name, min, max, UInt64, U64) - } - - /// Adds bool column named into the schema - fn with_bool_column( - self, - column_name: impl Into, - min: Option, - max: Option, - ) -> Self { - impl_with_column!(self, column_name, min, max, Boolean, Bool) - } - - /// Adds a string column named into the schema - fn with_string_column( - self, - column_name: impl Into, - min: Option<&str>, - max: Option<&str>, - ) -> Self { - let min = min.map(|v| v.to_string()); - let max = max.map(|v| v.to_string()); - impl_with_column!(self, column_name, min, max, Utf8, String) - } - - fn add_field_to_schema( - column_name: &str, - schema: Arc, - data_type: DataType, - ) -> Arc { - let new_schema = SchemaBuilder::new() - .field(column_name, data_type) - .build() - .expect("built new field schema"); - - let new_schema = SchemaMerger::new() - .merge(schema.as_ref()) - .expect("merged existing schema") - .merge(&new_schema) - .expect("merged new schema") - .build(); - - Arc::new(new_schema) - } - - fn add_column_to_summary( - mut summary: TableSummary, - column_name: impl Into, - stats: Statistics, - ) -> TableSummary { - summary.columns.push(ColumnSummary { - name: column_name.into(), - influxdb_type: None, - stats, - }); - - summary - } - } - - impl fmt::Display for TestChunkMeta { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.name) - } - } - - impl QueryChunkMeta for TestChunkMeta { - fn summary(&self) -> &TableSummary { - &self.summary - } - - fn schema(&self) -> Arc { - Arc::clone(&self.schema) - } - } } diff --git a/query/src/test.rs b/query/src/test.rs index 218d083ee6..b8c2be37ac 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -3,35 +3,30 @@ //! //! AKA it is a Mock -use arrow::{ - array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray}, - datatypes::{DataType, Int32Type, TimeUnit}, - record_batch::RecordBatch, -}; -use data_types::{ - chunk_metadata::ChunkSummary, - partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, -}; -use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; -use futures::StreamExt; - use crate::exec::Executor; use crate::{ exec::stringset::{StringSet, StringSetRef}, DatabaseStore, Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase, }; - +use arrow::{ + array::{ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray}, + datatypes::{DataType, Int32Type, TimeUnit}, + record_batch::RecordBatch, +}; +use async_trait::async_trait; +use data_types::{ + chunk_metadata::ChunkSummary, + partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, +}; +use datafusion::physical_plan::{common::SizedRecordBatchStream, SendableRecordBatchStream}; +use futures::StreamExt; use internal_types::{ - schema::{ - builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema, TIME_COLUMN_NAME, - }, + schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema}, selection::Selection, }; - -use async_trait::async_trait; use parking_lot::Mutex; use snafu::Snafu; -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, fmt, sync::Arc}; #[derive(Debug, Default)] pub struct TestDatabase { @@ -135,8 +130,17 @@ impl QueryDatabase for TestDatabase { } } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct TestChunk { + /// Table name + table_name: String, + + /// Schema of the table + schema: Arc, + + /// Return value for summary() + table_summary: TableSummary, + id: u32, /// Set the flag if this chunk might contain duplicates @@ -145,12 +149,6 @@ pub struct TestChunk { /// A copy of the captured predicates passed predicates: Mutex>, - /// Table name - table_name: Option, - - /// Schema of the table - table_schema: Option, - /// RecordBatches that are returned on each request table_data: Vec>, @@ -159,19 +157,87 @@ pub struct TestChunk { /// Return value for apply_predicate, if desired predicate_match: Option, +} - /// Return value for summary(), if desired - table_summary: Option, +/// Implements a method for adding a column with default stats +macro_rules! impl_with_column { + ($NAME:ident, $DATA_TYPE:ident) => { + pub fn $NAME(self, column_name: impl Into) -> Self { + let column_name = column_name.into(); + + let new_column_schema = SchemaBuilder::new() + .field(&column_name, DataType::$DATA_TYPE) + .build() + .unwrap(); + self.add_schema_to_table(new_column_schema, true, None) + } + }; +} + +/// Implements a method for adding a column without any stats +macro_rules! impl_with_column_no_stats { + ($NAME:ident, $DATA_TYPE:ident) => { + pub fn $NAME(self, column_name: impl Into) -> Self { + let column_name = column_name.into(); + + let new_column_schema = SchemaBuilder::new() + .field(&column_name, DataType::$DATA_TYPE) + .build() + .unwrap(); + + self.add_schema_to_table(new_column_schema, false, None) + } + }; +} + +/// Implements a method for adding a column with stats that have the specified min and max +macro_rules! impl_with_column_with_stats { + ($NAME:ident, $DATA_TYPE:ident, $RUST_TYPE:ty, $STAT_TYPE:ident) => { + pub fn $NAME( + self, + column_name: impl Into, + min: Option<$RUST_TYPE>, + max: Option<$RUST_TYPE>, + ) -> Self { + let column_name = column_name.into(); + + let new_column_schema = SchemaBuilder::new() + .field(&column_name, DataType::$DATA_TYPE) + .build() + .unwrap(); + + let stats = Statistics::$STAT_TYPE(StatValues { + min, + max, + ..Default::default() + }); + + self.add_schema_to_table(new_column_schema, true, Some(stats)) + } + }; } impl TestChunk { - pub fn new(id: u32) -> Self { + pub fn new(table_name: impl Into) -> Self { + let table_name = table_name.into(); Self { - id, - ..Default::default() + table_name: table_name.clone(), + schema: Arc::new(SchemaBuilder::new().build().unwrap()), + table_summary: TableSummary::new(table_name), + id: Default::default(), + may_contain_pk_duplicates: Default::default(), + predicates: Default::default(), + table_data: Default::default(), + saved_error: Default::default(), + predicate_match: Default::default(), } } + pub fn with_id(mut self, id: u32) -> Self { + self.id = id; + self + } + /// specify that any call should result in an error with the message /// specified pub fn with_error(mut self, error_message: impl Into) -> Self { @@ -194,183 +260,163 @@ impl TestChunk { } } - /// Register a table with the test chunk and a "dummy" column - pub fn with_table(self, table_name: impl Into) -> Self { - self.with_tag_column(table_name, "dummy_col") - } - /// Set the `may_contain_pk_duplicates` flag pub fn with_may_contain_pk_duplicates(mut self, v: bool) -> Self { self.may_contain_pk_duplicates = v; self } - /// Register an tag column with the test chunk - pub fn with_tag_column( - self, - table_name: impl Into, - column_name: impl Into, - ) -> Self { - let table_name = table_name.into(); + /// Register a tag column with the test chunk with default stats + pub fn with_tag_column(self, column_name: impl Into) -> Self { let column_name = column_name.into(); // make a new schema with the specified column and // merge it in to any existing schema let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap(); - self.add_schema_to_table(table_name, new_column_schema) + self.add_schema_to_table(new_column_schema, true, None) } - /// Register an tag column with the test chunk + /// Register a tag column with the test chunk pub fn with_tag_column_with_stats( self, - table_name: impl Into, column_name: impl Into, - min: &str, - max: &str, + min: Option<&str>, + max: Option<&str>, ) -> Self { - let table_name = table_name.into(); let column_name = column_name.into(); - let mut new_self = self.with_tag_column(&table_name, &column_name); + // make a new schema with the specified column and + // merge it in to any existing schema + let new_column_schema = SchemaBuilder::new().tag(&column_name).build().unwrap(); - // Now, find the appropriate column summary and update the stats - let column_summary: &mut ColumnSummary = new_self - .table_summary - .as_mut() - .expect("had table summary") - .columns - .iter_mut() - .find(|c| c.name == column_name) - .expect("had column"); - - column_summary.stats = Statistics::String(StatValues { - min: Some(min.to_string()), - max: Some(max.to_string()), + // Construct stats + let stats = Statistics::String(StatValues { + min: min.map(ToString::to_string), + max: max.map(ToString::to_string), ..Default::default() }); - new_self + self.add_schema_to_table(new_column_schema, true, Some(stats)) } - /// Register a timestamp column with the test chunk - pub fn with_time_column(self, table_name: impl Into) -> Self { - let table_name = table_name.into(); - + /// Register a timestamp column with the test chunk with default stats + pub fn with_time_column(self) -> Self { // make a new schema with the specified column and // merge it in to any existing schema let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap(); - self.add_schema_to_table(table_name, new_column_schema) + self.add_schema_to_table(new_column_schema, true, None) } /// Register a timestamp column with the test chunk - pub fn with_time_column_with_stats( - self, - table_name: impl Into, - min: i64, - max: i64, - ) -> Self { - let table_name = table_name.into(); + pub fn with_time_column_with_stats(self, min: Option, max: Option) -> Self { + // make a new schema with the specified column and + // merge it in to any existing schema + let new_column_schema = SchemaBuilder::new().timestamp().build().unwrap(); - let mut new_self = self.with_time_column(&table_name); - - // Now, find the appropriate column summary and update the stats - let column_summary: &mut ColumnSummary = new_self - .table_summary - .as_mut() - .expect("had table summary") - .columns - .iter_mut() - .find(|c| c.name == TIME_COLUMN_NAME) - .expect("had column"); - - column_summary.stats = Statistics::I64(StatValues { - min: Some(min), - max: Some(max), + // Construct stats + let stats = Statistics::I64(StatValues { + min, + max, ..Default::default() }); - new_self + self.add_schema_to_table(new_column_schema, true, Some(stats)) } - /// Register an int field column with the test chunk - pub fn with_int_field_column( + impl_with_column!(with_i64_field_column, Int64); + impl_with_column_no_stats!(with_i64_field_column_no_stats, Int64); + impl_with_column_with_stats!(with_i64_field_column_with_stats, Int64, i64, I64); + + impl_with_column!(with_u64_column, UInt64); + impl_with_column_no_stats!(with_u64_field_column_no_stats, UInt64); + impl_with_column_with_stats!(with_u64_field_column_with_stats, UInt64, u64, U64); + + impl_with_column!(with_f64_field_column, Float64); + impl_with_column_no_stats!(with_f64_field_column_no_stats, Float64); + impl_with_column_with_stats!(with_f64_field_column_with_stats, Float64, f64, F64); + + impl_with_column!(with_bool_field_column, Boolean); + impl_with_column_no_stats!(with_bool_field_column_no_stats, Boolean); + impl_with_column_with_stats!(with_bool_field_column_with_stats, Boolean, bool, Bool); + + /// Register a string field column with the test chunk + pub fn with_string_field_column_with_stats( self, - table_name: impl Into, column_name: impl Into, + min: Option<&str>, + max: Option<&str>, ) -> Self { let column_name = column_name.into(); // make a new schema with the specified column and // merge it in to any existing schema let new_column_schema = SchemaBuilder::new() - .field(&column_name, DataType::Int64) + .field(&column_name, DataType::Utf8) .build() .unwrap(); - self.add_schema_to_table(table_name, new_column_schema) + + // Construct stats + let stats = Statistics::String(StatValues { + min: min.map(ToString::to_string), + max: max.map(ToString::to_string), + ..Default::default() + }); + + self.add_schema_to_table(new_column_schema, true, Some(stats)) } + /// Adds the specified schema and optionally a column summary containing optional stats. + /// If `add_column_summary` is false, `stats` is ignored. If `add_column_summary` is true but + /// `stats` is `None`, default stats will be added to the column summary. fn add_schema_to_table( mut self, - table_name: impl Into, new_column_schema: Schema, + add_column_summary: bool, + stats: Option, ) -> Self { - let table_name = table_name.into(); - if let Some(existing_name) = &self.table_name { - assert_eq!(&table_name, existing_name); - } - self.table_name = Some(table_name.clone()); - // assume the new schema has exactly a single table assert_eq!(new_column_schema.len(), 1); let (col_type, new_field) = new_column_schema.field(0); - let influxdb_type = col_type.map(|t| match t { - InfluxColumnType::IOx(_) => todo!(), - InfluxColumnType::Tag => InfluxDbType::Tag, - InfluxColumnType::Field(_) => InfluxDbType::Field, - InfluxColumnType::Timestamp => InfluxDbType::Timestamp, - }); - - let stats = match new_field.data_type() { - DataType::Boolean => Statistics::Bool(StatValues::default()), - DataType::Int64 => Statistics::I64(StatValues::default()), - DataType::UInt64 => Statistics::U64(StatValues::default()), - DataType::Utf8 => Statistics::String(StatValues::default()), - DataType::Dictionary(_, value_type) => { - assert!(matches!(**value_type, DataType::Utf8)); - Statistics::String(StatValues::default()) - } - DataType::Float64 => Statistics::String(StatValues::default()), - DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()), - _ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()), - }; - - let column_summary = ColumnSummary { - name: new_field.name().clone(), - influxdb_type, - stats, - }; - let mut merger = SchemaMerger::new(); merger = merger.merge(&new_column_schema).unwrap(); + merger = merger + .merge(self.schema.as_ref()) + .expect("merging was successful"); + self.schema = Arc::new(merger.build()); - if let Some(existing_schema) = self.table_schema.as_ref() { - merger = merger - .merge(existing_schema) - .expect("merging was successful"); + if add_column_summary { + let influxdb_type = col_type.map(|t| match t { + InfluxColumnType::IOx(_) => todo!(), + InfluxColumnType::Tag => InfluxDbType::Tag, + InfluxColumnType::Field(_) => InfluxDbType::Field, + InfluxColumnType::Timestamp => InfluxDbType::Timestamp, + }); + + let stats = stats.unwrap_or_else(|| match new_field.data_type() { + DataType::Boolean => Statistics::Bool(StatValues::default()), + DataType::Int64 => Statistics::I64(StatValues::default()), + DataType::UInt64 => Statistics::U64(StatValues::default()), + DataType::Utf8 => Statistics::String(StatValues::default()), + DataType::Dictionary(_, value_type) => { + assert!(matches!(**value_type, DataType::Utf8)); + Statistics::String(StatValues::default()) + } + DataType::Float64 => Statistics::F64(StatValues::default()), + DataType::Timestamp(_, _) => Statistics::I64(StatValues::default()), + _ => panic!("Unsupported type in TestChunk: {:?}", new_field.data_type()), + }); + + let column_summary = ColumnSummary { + name: new_field.name().clone(), + influxdb_type, + stats, + }; + + self.table_summary.columns.push(column_summary); } - let new_schema = merger.build(); - - self.table_schema = Some(new_schema); - - let mut table_summary = self - .table_summary - .take() - .unwrap_or_else(|| TableSummary::new(table_name)); - table_summary.columns.push(column_summary); - self.table_summary = Some(table_summary); self } @@ -382,15 +428,10 @@ impl TestChunk { /// Prepares this chunk to return a specific record batch with one /// row of non null data. - pub fn with_one_row_of_null_data(mut self, _table_name: impl Into) -> Self { - //let table_name = table_name.into(); - let schema = self - .table_schema - .as_ref() - .expect("table must exist in TestChunk"); - + pub fn with_one_row_of_data(mut self) -> Self { // create arrays - let columns = schema + let columns = self + .schema .iter() .map(|(_influxdb_column_type, field)| match field.data_type() { DataType::Int64 => Arc::new(Int64Array::from(vec![1000])) as ArrayRef, @@ -411,7 +452,8 @@ impl TestChunk { }) .collect::>(); - let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + let batch = + RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); println!("TestChunk batch data: {:#?}", batch); self.table_data.push(Arc::new(batch)); @@ -428,14 +470,10 @@ impl TestChunk { /// "| UT | RI | 70 | 1970-01-01 00:00:00.000020 |", /// "+------+------+-----------+-------------------------------+", /// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(8000, 20000) - pub fn with_three_rows_of_data(mut self, _table_name: impl Into) -> Self { - let schema = self - .table_schema - .as_ref() - .expect("table must exist in TestChunk"); - + pub fn with_three_rows_of_data(mut self) -> Self { // create arrays - let columns = schema + let columns = self + .schema .iter() .map(|(_influxdb_column_type, field)| match field.data_type() { DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70])) as ArrayRef, @@ -475,7 +513,8 @@ impl TestChunk { }) .collect::>(); - let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + let batch = + RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self @@ -492,14 +531,10 @@ impl TestChunk { /// "| VT | NC | 50 | 1970-01-01 00:00:00.000210 |", // duplicate of (1) /// "+------+------+-----------+-------------------------------+", /// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(28000, 220000) - pub fn with_four_rows_of_data(mut self, _table_name: impl Into) -> Self { - let schema = self - .table_schema - .as_ref() - .expect("table must exist in TestChunk"); - + pub fn with_four_rows_of_data(mut self) -> Self { // create arrays - let columns = schema + let columns = self + .schema .iter() .map(|(_influxdb_column_type, field)| match field.data_type() { DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70, 50])) as ArrayRef, @@ -539,7 +574,8 @@ impl TestChunk { }) .collect::>(); - let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + let batch = + RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self @@ -557,14 +593,10 @@ impl TestChunk { /// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", /// "+------+------+-----------+-------------------------------+", /// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000) - pub fn with_five_rows_of_data(mut self, _table_name: impl Into) -> Self { - let schema = self - .table_schema - .as_ref() - .expect("table must exist in TestChunk"); - + pub fn with_five_rows_of_data(mut self) -> Self { // create arrays - let columns = schema + let columns = self + .schema .iter() .map(|(_influxdb_column_type, field)| match field.data_type() { DataType::Int64 => { @@ -611,7 +643,8 @@ impl TestChunk { }) .collect::>(); - let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + let batch = + RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self @@ -634,15 +667,10 @@ impl TestChunk { /// "| MT | AL | 30 | 1970-01-01 00:00:00.000005 |", // Duplicate with (3) /// "+------+------+-----------+-------------------------------+", /// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000) - pub fn with_ten_rows_of_data_some_duplicates(mut self, _table_name: impl Into) -> Self { - //let table_name = table_name.into(); - let schema = self - .table_schema - .as_ref() - .expect("table must exist in TestChunk"); - + pub fn with_ten_rows_of_data_some_duplicates(mut self) -> Self { // create arrays - let columns = schema + let columns = self + .schema .iter() .map(|(_influxdb_column_type, field)| match field.data_type() { DataType::Int64 => Arc::new(Int64Array::from(vec![ @@ -693,35 +721,34 @@ impl TestChunk { }) .collect::>(); - let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + let batch = + RecordBatch::try_new(self.schema.as_ref().into(), columns).expect("made record batch"); self.table_data.push(Arc::new(batch)); self } /// Returns all columns of the table - pub fn all_column_names(&self) -> Option { - let column_names = self.table_schema.as_ref().map(|schema| { - schema - .iter() - .map(|(_, field)| field.name().to_string()) - .collect::() - }); - - column_names + pub fn all_column_names(&self) -> StringSet { + self.schema + .iter() + .map(|(_, field)| field.name().to_string()) + .collect() } /// Returns just the specified columns - pub fn specific_column_names_selection(&self, columns: &[&str]) -> Option { - let column_names = self.table_schema.as_ref().map(|schema| { - schema - .iter() - .map(|(_, field)| field.name().to_string()) - .filter(|col| columns.contains(&col.as_str())) - .collect::() - }); + pub fn specific_column_names_selection(&self, columns: &[&str]) -> StringSet { + self.schema + .iter() + .map(|(_, field)| field.name().to_string()) + .filter(|col| columns.contains(&col.as_str())) + .collect() + } +} - column_names +impl fmt::Display for TestChunk { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.table_name()) } } @@ -733,7 +760,7 @@ impl QueryChunk for TestChunk { } fn table_name(&self) -> &str { - self.table_name.as_deref().unwrap() + &self.table_name } fn may_contain_pk_duplicates(&self) -> bool { @@ -772,17 +799,11 @@ impl QueryChunk for TestChunk { } // otherwise fall back to basic filtering based on table name predicate. - let predicate_match = self - .table_name - .as_ref() - .map(|table_name| { - if !predicate.should_include_table(&table_name) { - PredicateMatch::Zero - } else { - PredicateMatch::Unknown - } - }) - .unwrap_or(PredicateMatch::Unknown); + let predicate_match = if !predicate.should_include_table(&self.table_name) { + PredicateMatch::Zero + } else { + PredicateMatch::Unknown + }; Ok(predicate_match) } @@ -812,22 +833,17 @@ impl QueryChunk for TestChunk { Selection::Some(cols) => self.specific_column_names_selection(cols), }; - Ok(column_names) + Ok(Some(column_names)) } } impl QueryChunkMeta for TestChunk { fn summary(&self) -> &TableSummary { - self.table_summary - .as_ref() - .expect("Table summary not configured for TestChunk") + &self.table_summary } fn schema(&self) -> Arc { - self.table_schema - .as_ref() - .map(|s| Arc::new(s.clone())) - .expect("schema was set") + Arc::clone(&self.schema) } } diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index bb08c26bcd..cbdca28cda 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -1252,13 +1252,13 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk0 = TestChunk::new(0) - .with_predicate_match(PredicateMatch::AtLeastOne) - .with_table("h2o"); + let chunk0 = TestChunk::new("h2o") + .with_id(0) + .with_predicate_match(PredicateMatch::AtLeastOne); - let chunk1 = TestChunk::new(1) - .with_predicate_match(PredicateMatch::AtLeastOne) - .with_table("o2"); + let chunk1 = TestChunk::new("o2") + .with_id(1) + .with_predicate_match(PredicateMatch::AtLeastOne); fixture .test_storage @@ -1346,13 +1346,15 @@ mod tests { let partition_id = 1; // Note multiple tables / measureemnts: - let chunk0 = TestChunk::new(0) - .with_tag_column("m1", "k1") - .with_tag_column("m1", "k2"); + let chunk0 = TestChunk::new("m1") + .with_id(0) + .with_tag_column("k1") + .with_tag_column("k2"); - let chunk1 = TestChunk::new(1) - .with_tag_column("m2", "k3") - .with_tag_column("m2", "k4"); + let chunk1 = TestChunk::new("m2") + .with_id(1) + .with_tag_column("k3") + .with_tag_column("k4"); fixture .test_storage @@ -1414,9 +1416,7 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - .with_table("my_table") - .with_error("Sugar we are going down"); + let chunk = TestChunk::new("my_table").with_error("Sugar we are going down"); fixture .test_storage @@ -1458,15 +1458,15 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk0 = TestChunk::new(0) + let chunk0 = TestChunk::new("m1") // predicate specifies m4, so this is filtered out - .with_tag_column("m1", "k0"); + .with_tag_column("k0"); - let chunk1 = TestChunk::new(1) - .with_tag_column("m4", "k1") - .with_tag_column("m4", "k2") - .with_tag_column("m4", "k3") - .with_tag_column("m4", "k4"); + let chunk1 = TestChunk::new("m4") + .with_tag_column("k1") + .with_tag_column("k2") + .with_tag_column("k3") + .with_tag_column("k4"); fixture .test_storage @@ -1540,10 +1540,8 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - // predicate specifies m4, so this is filtered out - .with_table("my_table") - .with_error("This is an error"); + // predicate specifies m4, so this is filtered out + let chunk = TestChunk::new("my_table").with_error("This is an error"); fixture .test_storage @@ -1587,10 +1585,10 @@ mod tests { let partition_id = 1; // Add a chunk with a field - let chunk = TestChunk::new(0) - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -1648,9 +1646,7 @@ mod tests { tag_key: [0].into(), }; - let chunk = TestChunk::new(0) - .with_predicate_match(PredicateMatch::AtLeastOne) - .with_table("h2o"); + let chunk = TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOne); fixture .test_storage @@ -1679,11 +1675,11 @@ mod tests { let partition_id = 1; // Add a chunk with a field - let chunk = TestChunk::new(0) - .with_int_field_column("TheMeasurement", "Field1") - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_i64_field_column("Field1") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -1727,9 +1723,7 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - .with_table("my_table") - .with_error("Sugar we are going down"); + let chunk = TestChunk::new("my_table").with_error("Sugar we are going down"); fixture .test_storage @@ -1798,10 +1792,10 @@ mod tests { let partition_id = 1; // Add a chunk with a field - let chunk = TestChunk::new(0) - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -1847,9 +1841,7 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - .with_table("my_table") - .with_error("Sugar we are going down"); + let chunk = TestChunk::new("my_table").with_error("Sugar we are going down"); fixture .test_storage @@ -1953,10 +1945,10 @@ mod tests { let partition_id = 1; // Add a chunk with a field - let chunk = TestChunk::new(0) - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -1999,9 +1991,7 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - .with_table("my_table") - .with_error("Sugar we are going down"); + let chunk = TestChunk::new("my_table").with_error("Sugar we are going down"); fixture .test_storage @@ -2041,10 +2031,10 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -2093,9 +2083,7 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - .with_table("my_table") - .with_error("Sugar we are going down"); + let chunk = TestChunk::new("my_table").with_error("Sugar we are going down"); fixture .test_storage @@ -2177,10 +2165,10 @@ mod tests { let partition_id = 1; // Add a chunk with a field - let chunk = TestChunk::new(0) - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -2237,10 +2225,10 @@ mod tests { let partition_id = 1; // Add a chunk with a field - let chunk = TestChunk::new(0) - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -2305,9 +2293,7 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0) - .with_table("my_table") - .with_error("Sugar we are going down"); + let chunk = TestChunk::new("my_table").with_error("Sugar we are going down"); fixture .test_storage @@ -2363,11 +2349,11 @@ mod tests { let partition_id = 1; // Add a chunk with a field - let chunk = TestChunk::new(0) - .with_int_field_column("TheMeasurement", "Field1") - .with_time_column("TheMeasurement") - .with_tag_column("TheMeasurement", "state") - .with_one_row_of_null_data("TheMeasurement"); + let chunk = TestChunk::new("TheMeasurement") + .with_i64_field_column("Field1") + .with_time_column() + .with_tag_column("state") + .with_one_row_of_data(); fixture .test_storage @@ -2413,7 +2399,7 @@ mod tests { let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + let chunk = TestChunk::new("t").with_error("Sugar we are going down"); fixture .test_storage