diff --git a/query/src/provider.rs b/query/src/provider.rs index 2c0f93ea42..6604021be1 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -246,6 +246,7 @@ impl TableProvider for ChunkTableProvider { scan_schema, chunks, predicate, + false, )?; Ok(plan) @@ -317,6 +318,11 @@ impl Deduplicater { /// ┌───────────────────────┐ │ /// │SortPreservingMergeExec│ │ /// └───────────────────────┘ │ + /// ▲ │ + /// │ │ + /// ┌───────────────────────┐ │ + /// │ UnionExec │ │ + /// └───────────────────────┘ │ /// ▲ | /// │ | /// ┌───────────┴───────────┐ │ @@ -340,18 +346,21 @@ impl Deduplicater { schema: ArrowSchemaRef, chunks: Vec>, predicate: Predicate, + for_testing: bool, // TODO: remove this parameter when #1682 and #1683 are done ) -> Result> { // find overlapped chunks and put them into the right group self.split_overlapped_chunks(chunks.to_vec())?; // TEMP until the rest of this module's code is complete: // merge all plans into the same - self.no_duplicates_chunks - .append(&mut self.in_chunk_duplicates_chunks); - for mut group in &mut self.overlapped_chunks_set { - self.no_duplicates_chunks.append(&mut group); + if !for_testing { + self.no_duplicates_chunks + .append(&mut self.in_chunk_duplicates_chunks); + for mut group in &mut self.overlapped_chunks_set { + self.no_duplicates_chunks.append(&mut group); + } + self.overlapped_chunks_set.clear(); } - self.overlapped_chunks_set.clear(); // Building plans let mut plans = vec![]; @@ -396,16 +405,16 @@ impl Deduplicater { } } - let final_plan = plans.remove(0); - - // TODO - // There are still plan, add UnionExec - if !plans.is_empty() { - // final_plan = union_plan - panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans); + match plans.len() { + // No plan generated. Something must go wrong + // Even if the chunks are empty, IOxReadFilterNode is still created + 0 => panic!("Internal error generating deduplicate plan"), + // Only one plan, no need to add union node + // Return the plan itself + 1 => Ok(plans.remove(0)), + // Has many plans and need to union them + _ => Ok(Arc::new(UnionExec::new(plans))), } - - Ok(final_plan) } /// discover overlaps and split them into three groups: @@ -430,7 +439,7 @@ impl Deduplicater { Ok(()) } - /// Return true if all chunks are neither overlap nor has duplicates in itself + /// Return true if all chunks neither overlap nor have duplicates in itself fn no_duplicates(&self) -> bool { self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty() } @@ -888,9 +897,8 @@ mod test { Predicate::default(), ); let batch = collect(sort_plan.unwrap()).await.unwrap(); - // data is not sorted on primary key(tag1, tag2, time) - - // NOTE: When the full deduplication is done, the duplciates will be removed from this output + // data is sorted on primary key(tag1, tag2, time) + // NOTE: When the full deduplication is done, the duplicates will be removed from this output let expected = vec![ "+-----------+------+------+-------------------------------+", "| field_int | tag1 | tag2 | time |", @@ -910,6 +918,241 @@ mod test { assert_batches_eq!(&expected, &batch); } + #[tokio::test] + async fn scan_plan_with_one_chunk_no_duplicates() { + // Test no duplicate at all + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 5, 7000) + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + true, + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // No duplicates so no sort at all. The data will stay in their original order + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn scan_plan_with_one_chunk_with_duplicates() { + // Test one chunk with duplicate within + let chunk = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 5, 7000) + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_may_contain_pk_duplicates(true) + .with_ten_rows_of_data_some_duplicates("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk)], + Predicate::default(), + true, + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // Data must be sorted and duplicates removed + // TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646 + // is done, duplicates will be removed + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn scan_plan_with_two_overlapped_chunks_with_duplicates() { + // test overlapped chunks + let chunk1 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 5, 7000) + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_ten_rows_of_data_some_duplicates("t"), + ); + + let chunk2 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 5, 7000) + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![Arc::clone(&chunk1), Arc::clone(&chunk2)], + Predicate::default(), + true, + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // Two overlapped chunks will be sort merged with dupplicates removed + // TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646 + // is done, duplicates will be removed + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + + #[tokio::test] + async fn scan_plan_with_four_chunks() { + // This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within + let chunk1 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 5, 7000) + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_ten_rows_of_data_some_duplicates("t"), + ); + + // chunk2 overlaps with chunk 1 + let chunk2 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 5, 7000) + .with_tag_column_with_stats("t", "tag1", "AL", "MT") + .with_int_field_column("t", "field_int") + .with_five_rows_of_data("t"), + ); + + // chunk3 no overlap, no duplicates within + let chunk3 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 8000, 20000) + .with_tag_column_with_stats("t", "tag1", "UT", "WA") + .with_int_field_column("t", "field_int") + .with_three_rows_of_data("t"), + ); + + // chunk3 no overlap, duplicates within + let chunk4 = Arc::new( + TestChunk::new(1) + .with_time_column_with_stats("t", 28000, 220000) + .with_tag_column_with_stats("t", "tag1", "UT", "WA") + .with_int_field_column("t", "field_int") + .with_may_contain_pk_duplicates(true) + .with_four_rows_of_data("t"), + ); + + // Datafusion schema of the chunk + let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow(); + + let mut deduplicator = Deduplicater::new(); + let plan = deduplicator.build_scan_plan( + Arc::from("t"), + schema, + vec![ + Arc::clone(&chunk1), + Arc::clone(&chunk2), + Arc::clone(&chunk3), + Arc::clone(&chunk4), + ], + Predicate::default(), + true, + ); + let batch = collect(plan.unwrap()).await.unwrap(); + // Final data will be partially sorted and duplicates removed. Detailed: + // . chunk1 and chunk2 will be sorted merged and deduplicated (rows 8-32) + // . chunk3 will stay in its original (rows 1-3) + // . chunk4 will be sorted and deduplicated (rows 4-7) + // TODO: data is only partially sorted for now. The deduplication will happen when When https://github.com/influxdata/influxdb_iox/issues/1646 + // is done + let expected = vec![ + "+-----------+------+-------------------------------+", + "| field_int | tag1 | time |", + "+-----------+------+-------------------------------+", + "| 1000 | WA | 1970-01-01 00:00:00.000008 |", + "| 10 | VT | 1970-01-01 00:00:00.000010 |", + "| 70 | UT | 1970-01-01 00:00:00.000020 |", + "| 70 | UT | 1970-01-01 00:00:00.000020 |", + "| 10 | VT | 1970-01-01 00:00:00.000010 |", + "| 50 | VT | 1970-01-01 00:00:00.000010 |", + "| 1000 | WA | 1970-01-01 00:00:00.000008 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 10 | AL | 1970-01-01 00:00:00.000000050 |", + "| 100 | AL | 1970-01-01 00:00:00.000000050 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000100 |", + "| 70 | CT | 1970-01-01 00:00:00.000000500 |", + "| 5 | MT | 1970-01-01 00:00:00.000000005 |", + "| 30 | MT | 1970-01-01 00:00:00.000000005 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000001 |", + "| 1000 | MT | 1970-01-01 00:00:00.000002 |", + "| 5 | MT | 1970-01-01 00:00:00.000005 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "| 20 | MT | 1970-01-01 00:00:00.000007 |", + "| 10 | MT | 1970-01-01 00:00:00.000007 |", + "+-----------+------+-------------------------------+", + ]; + assert_batches_eq!(&expected, &batch); + } + fn chunk_ids(group: &[Arc]) -> String { let ids = group.iter().map(|c| c.id().to_string()).collect::>(); ids.join(", ") diff --git a/query/src/test.rs b/query/src/test.rs index 59a8183aab..07f5f80757 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -21,7 +21,9 @@ use crate::{ use crate::{exec::Executor, pruning::Prunable}; use internal_types::{ - schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema}, + schema::{ + builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema, TIME_COLUMN_NAME, + }, selection::Selection, }; @@ -233,7 +235,7 @@ impl TestChunk { new_self } - /// Register a timetamp column with the test chunk + /// Register a timestamp column with the test chunk pub fn with_time_column(self, table_name: impl Into) -> Self { let table_name = table_name.into(); @@ -244,6 +246,36 @@ impl TestChunk { self.add_schema_to_table(table_name, new_column_schema) } + /// Register a timestamp column with the test chunk + pub fn with_time_column_with_stats( + self, + table_name: impl Into, + min: i64, + max: i64, + ) -> Self { + let table_name = table_name.into(); + + let mut new_self = self.with_time_column(&table_name); + + // Now, find the appropriate column summary and update the stats + let column_summary: &mut ColumnSummary = new_self + .table_summary + .as_mut() + .expect("had table summary") + .columns + .iter_mut() + .find(|c| c.name == TIME_COLUMN_NAME) + .expect("had column"); + + column_summary.stats = Statistics::I64(StatValues { + min: Some(min), + max: Some(max), + ..Default::default() + }); + + new_self + } + /// Register an int field column with the test chunk pub fn with_int_field_column( self, @@ -367,19 +399,146 @@ impl TestChunk { self } - /// Prepares this chunk to return a specific record batch with five - /// rows of non null data that look like + /// Prepares this chunk to return a specific record batch with three + /// rows of non null data that look like, no duplicates within /// "+------+------+-----------+-------------------------------+", /// "| tag1 | tag2 | field_int | time |", /// "+------+------+-----------+-------------------------------+", - /// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |", - /// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |", - /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", - /// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |", - /// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |", + /// "| WA | SC | 1000 | 1970-01-01 00:00:00.000008 |", + /// "| VT | NC | 10 | 1970-01-01 00:00:00.000010 |", + /// "| UT | RI | 70 | 1970-01-01 00:00:00.000020 |", /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(8000, 20000) + pub fn with_three_rows_of_data(mut self, _table_name: impl Into) -> Self { + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70])) as ArrayRef, + DataType::Utf8 => match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT"])) as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI"])) as ArrayRef, + _ => Arc::new(StringArray::from(vec!["TX", "PR", "OR"])) as ArrayRef, + }, + DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( + TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000], None), + ) as ArrayRef, + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + match field.name().as_str() { + "tag1" => Arc::new( + vec!["WA", "VT", "UT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["SC", "NC", "RI"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["TX", "PR", "OR"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + + self.table_data.push(Arc::new(batch)); + self + } + + /// Prepares this chunk to return a specific record batch with four + /// rows of non null data that look like, duplicates within + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| WA | SC | 1000 | 1970-01-01 00:00:00.000028 |", + /// "| VT | NC | 10 | 1970-01-01 00:00:00.000210 |", (1) + /// "| UT | RI | 70 | 1970-01-01 00:00:00.000220 |", + /// "| VT | NC | 50 | 1970-01-01 00:00:00.000210 |", // duplicate of (1) + /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(28000, 220000) + pub fn with_four_rows_of_data(mut self, _table_name: impl Into) -> Self { + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70, 50])) as ArrayRef, + DataType::Utf8 => match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT", "VT"])) as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI", "NC"])) as ArrayRef, + _ => Arc::new(StringArray::from(vec!["TX", "PR", "OR", "AL"])) as ArrayRef, + }, + DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new( + TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000, 10000], None), + ) as ArrayRef, + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + match field.name().as_str() { + "tag1" => Arc::new( + vec!["WA", "VT", "UT", "VT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["SC", "NC", "RI", "NC"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["TX", "PR", "OR", "AL"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + + self.table_data.push(Arc::new(batch)); + self + } + + /// Prepares this chunk to return a specific record batch with five + /// rows of non null data that look like, no duplicates within + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", + /// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + /// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", + /// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", + /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000) pub fn with_five_rows_of_data(mut self, _table_name: impl Into) -> Self { - //let table_name = table_name.into(); let schema = self .table_schema .as_ref() @@ -439,6 +598,88 @@ impl TestChunk { self } + /// Prepares this chunk to return a specific record batch with ten + /// rows of non null data that look like, duplicates within + /// "+------+------+-----------+-------------------------------+", + /// "| tag1 | tag2 | field_int | time |", + /// "+------+------+-----------+-------------------------------+", + /// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |", + /// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", (1) + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |", + /// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", (2) + /// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", (3) + /// "| MT | CT | 1000 | 1970-01-01 00:00:00.000002 |", + /// "| MT | AL | 20 | 1970-01-01 00:00:00.000007 |", // Duplicate with (1) + /// "| CT | CT | 70 | 1970-01-01 00:00:00.000000500 |", + /// "| AL | MA | 10 | 1970-01-01 00:00:00.000000050 |", // Duplicate with (2) + /// "| MT | AL | 30 | 1970-01-01 00:00:00.000005 |", // Duplicate with (3) + /// "+------+------+-----------+-------------------------------+", + /// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000) + pub fn with_ten_rows_of_data_some_duplicates(mut self, _table_name: impl Into) -> Self { + //let table_name = table_name.into(); + let schema = self + .table_schema + .as_ref() + .expect("table must exist in TestChunk"); + + // create arrays + let columns = schema + .iter() + .map(|(_influxdb_column_type, field)| match field.data_type() { + DataType::Int64 => Arc::new(Int64Array::from(vec![ + 1000, 10, 70, 100, 5, 1000, 20, 70, 10, 30, + ])) as ArrayRef, + DataType::Utf8 => match field.name().as_str() { + "tag1" => Arc::new(StringArray::from(vec![ + "MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT", + ])) as ArrayRef, + "tag2" => Arc::new(StringArray::from(vec![ + "CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL", + ])) as ArrayRef, + _ => Arc::new(StringArray::from(vec![ + "CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT", + ])) as ArrayRef, + }, + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + Arc::new(TimestampNanosecondArray::from_vec( + vec![1000, 7000, 100, 50, 5, 2000, 7000, 500, 50, 5], + None, + )) as ArrayRef + } + DataType::Dictionary(key, value) + if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => + { + match field.name().as_str() { + "tag1" => Arc::new( + vec!["MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + "tag2" => Arc::new( + vec!["CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL"] + .into_iter() + .collect::>(), + ) as ArrayRef, + _ => Arc::new( + vec!["CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT"] + .into_iter() + .collect::>(), + ) as ArrayRef, + } + } + _ => unimplemented!( + "Unimplemented data type for test database: {:?}", + field.data_type() + ), + }) + .collect::>(); + + let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch"); + + self.table_data.push(Arc::new(batch)); + self + } + /// Returns all columns of the table pub fn all_column_names(&self) -> Option { let column_names = self.table_schema.as_ref().map(|schema| {