Merge pull request #1693 from influxdata/ntran/dedupe_final_union
feat: add UnionExec on top of the scan activitiespull/24376/head
commit
80db086426
|
@ -246,6 +246,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
||||||
scan_schema,
|
scan_schema,
|
||||||
chunks,
|
chunks,
|
||||||
predicate,
|
predicate,
|
||||||
|
false,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
Ok(plan)
|
Ok(plan)
|
||||||
|
@ -317,6 +318,11 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
/// ┌───────────────────────┐ │
|
/// ┌───────────────────────┐ │
|
||||||
/// │SortPreservingMergeExec│ │
|
/// │SortPreservingMergeExec│ │
|
||||||
/// └───────────────────────┘ │
|
/// └───────────────────────┘ │
|
||||||
|
/// ▲ │
|
||||||
|
/// │ │
|
||||||
|
/// ┌───────────────────────┐ │
|
||||||
|
/// │ UnionExec │ │
|
||||||
|
/// └───────────────────────┘ │
|
||||||
/// ▲ |
|
/// ▲ |
|
||||||
/// │ |
|
/// │ |
|
||||||
/// ┌───────────┴───────────┐ │
|
/// ┌───────────┴───────────┐ │
|
||||||
|
@ -340,18 +346,21 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
schema: ArrowSchemaRef,
|
schema: ArrowSchemaRef,
|
||||||
chunks: Vec<Arc<C>>,
|
chunks: Vec<Arc<C>>,
|
||||||
predicate: Predicate,
|
predicate: Predicate,
|
||||||
|
for_testing: bool, // TODO: remove this parameter when #1682 and #1683 are done
|
||||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||||
// find overlapped chunks and put them into the right group
|
// find overlapped chunks and put them into the right group
|
||||||
self.split_overlapped_chunks(chunks.to_vec())?;
|
self.split_overlapped_chunks(chunks.to_vec())?;
|
||||||
|
|
||||||
// TEMP until the rest of this module's code is complete:
|
// TEMP until the rest of this module's code is complete:
|
||||||
// merge all plans into the same
|
// merge all plans into the same
|
||||||
self.no_duplicates_chunks
|
if !for_testing {
|
||||||
.append(&mut self.in_chunk_duplicates_chunks);
|
self.no_duplicates_chunks
|
||||||
for mut group in &mut self.overlapped_chunks_set {
|
.append(&mut self.in_chunk_duplicates_chunks);
|
||||||
self.no_duplicates_chunks.append(&mut group);
|
for mut group in &mut self.overlapped_chunks_set {
|
||||||
|
self.no_duplicates_chunks.append(&mut group);
|
||||||
|
}
|
||||||
|
self.overlapped_chunks_set.clear();
|
||||||
}
|
}
|
||||||
self.overlapped_chunks_set.clear();
|
|
||||||
|
|
||||||
// Building plans
|
// Building plans
|
||||||
let mut plans = vec![];
|
let mut plans = vec![];
|
||||||
|
@ -396,16 +405,16 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let final_plan = plans.remove(0);
|
match plans.len() {
|
||||||
|
// No plan generated. Something must go wrong
|
||||||
// TODO
|
// Even if the chunks are empty, IOxReadFilterNode is still created
|
||||||
// There are still plan, add UnionExec
|
0 => panic!("Internal error generating deduplicate plan"),
|
||||||
if !plans.is_empty() {
|
// Only one plan, no need to add union node
|
||||||
// final_plan = union_plan
|
// Return the plan itself
|
||||||
panic!("Unexpected error: There should be only one output for scan plan, but there were: {:#?}", plans);
|
1 => Ok(plans.remove(0)),
|
||||||
|
// Has many plans and need to union them
|
||||||
|
_ => Ok(Arc::new(UnionExec::new(plans))),
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(final_plan)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// discover overlaps and split them into three groups:
|
/// discover overlaps and split them into three groups:
|
||||||
|
@ -430,7 +439,7 @@ impl<C: PartitionChunk + 'static> Deduplicater<C> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return true if all chunks are neither overlap nor has duplicates in itself
|
/// Return true if all chunks neither overlap nor have duplicates in itself
|
||||||
fn no_duplicates(&self) -> bool {
|
fn no_duplicates(&self) -> bool {
|
||||||
self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty()
|
self.overlapped_chunks_set.is_empty() && self.in_chunk_duplicates_chunks.is_empty()
|
||||||
}
|
}
|
||||||
|
@ -888,9 +897,8 @@ mod test {
|
||||||
Predicate::default(),
|
Predicate::default(),
|
||||||
);
|
);
|
||||||
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
let batch = collect(sort_plan.unwrap()).await.unwrap();
|
||||||
// data is not sorted on primary key(tag1, tag2, time)
|
// data is sorted on primary key(tag1, tag2, time)
|
||||||
|
// NOTE: When the full deduplication is done, the duplicates will be removed from this output
|
||||||
// NOTE: When the full deduplication is done, the duplciates will be removed from this output
|
|
||||||
let expected = vec![
|
let expected = vec![
|
||||||
"+-----------+------+------+-------------------------------+",
|
"+-----------+------+------+-------------------------------+",
|
||||||
"| field_int | tag1 | tag2 | time |",
|
"| field_int | tag1 | tag2 | time |",
|
||||||
|
@ -910,6 +918,241 @@ mod test {
|
||||||
assert_batches_eq!(&expected, &batch);
|
assert_batches_eq!(&expected, &batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_one_chunk_no_duplicates() {
|
||||||
|
// Test no duplicate at all
|
||||||
|
let chunk = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_five_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![Arc::clone(&chunk)],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// No duplicates so no sort at all. The data will stay in their original order
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_one_chunk_with_duplicates() {
|
||||||
|
// Test one chunk with duplicate within
|
||||||
|
let chunk = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_may_contain_pk_duplicates(true)
|
||||||
|
.with_ten_rows_of_data_some_duplicates("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![Arc::clone(&chunk)],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// Data must be sorted and duplicates removed
|
||||||
|
// TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646
|
||||||
|
// is done, duplicates will be removed
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_two_overlapped_chunks_with_duplicates() {
|
||||||
|
// test overlapped chunks
|
||||||
|
let chunk1 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_ten_rows_of_data_some_duplicates("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let chunk2 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_five_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![Arc::clone(&chunk1), Arc::clone(&chunk2)],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// Two overlapped chunks will be sort merged with dupplicates removed
|
||||||
|
// TODO: it is just sorted for now. When https://github.com/influxdata/influxdb_iox/issues/1646
|
||||||
|
// is done, duplicates will be removed
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn scan_plan_with_four_chunks() {
|
||||||
|
// This test covers all kind of chunks: overlap, non-overlap without duplicates within, non-overlap with duplicates within
|
||||||
|
let chunk1 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_ten_rows_of_data_some_duplicates("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// chunk2 overlaps with chunk 1
|
||||||
|
let chunk2 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 5, 7000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "AL", "MT")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_five_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// chunk3 no overlap, no duplicates within
|
||||||
|
let chunk3 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 8000, 20000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "UT", "WA")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_three_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// chunk3 no overlap, duplicates within
|
||||||
|
let chunk4 = Arc::new(
|
||||||
|
TestChunk::new(1)
|
||||||
|
.with_time_column_with_stats("t", 28000, 220000)
|
||||||
|
.with_tag_column_with_stats("t", "tag1", "UT", "WA")
|
||||||
|
.with_int_field_column("t", "field_int")
|
||||||
|
.with_may_contain_pk_duplicates(true)
|
||||||
|
.with_four_rows_of_data("t"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Datafusion schema of the chunk
|
||||||
|
let schema = chunk1.table_schema(Selection::All).unwrap().as_arrow();
|
||||||
|
|
||||||
|
let mut deduplicator = Deduplicater::new();
|
||||||
|
let plan = deduplicator.build_scan_plan(
|
||||||
|
Arc::from("t"),
|
||||||
|
schema,
|
||||||
|
vec![
|
||||||
|
Arc::clone(&chunk1),
|
||||||
|
Arc::clone(&chunk2),
|
||||||
|
Arc::clone(&chunk3),
|
||||||
|
Arc::clone(&chunk4),
|
||||||
|
],
|
||||||
|
Predicate::default(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
let batch = collect(plan.unwrap()).await.unwrap();
|
||||||
|
// Final data will be partially sorted and duplicates removed. Detailed:
|
||||||
|
// . chunk1 and chunk2 will be sorted merged and deduplicated (rows 8-32)
|
||||||
|
// . chunk3 will stay in its original (rows 1-3)
|
||||||
|
// . chunk4 will be sorted and deduplicated (rows 4-7)
|
||||||
|
// TODO: data is only partially sorted for now. The deduplication will happen when When https://github.com/influxdata/influxdb_iox/issues/1646
|
||||||
|
// is done
|
||||||
|
let expected = vec![
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| field_int | tag1 | time |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
"| 1000 | WA | 1970-01-01 00:00:00.000008 |",
|
||||||
|
"| 10 | VT | 1970-01-01 00:00:00.000010 |",
|
||||||
|
"| 70 | UT | 1970-01-01 00:00:00.000020 |",
|
||||||
|
"| 70 | UT | 1970-01-01 00:00:00.000020 |",
|
||||||
|
"| 10 | VT | 1970-01-01 00:00:00.000010 |",
|
||||||
|
"| 50 | VT | 1970-01-01 00:00:00.000010 |",
|
||||||
|
"| 1000 | WA | 1970-01-01 00:00:00.000008 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 10 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 100 | AL | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
"| 70 | CT | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 30 | MT | 1970-01-01 00:00:00.000000005 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000001 |",
|
||||||
|
"| 1000 | MT | 1970-01-01 00:00:00.000002 |",
|
||||||
|
"| 5 | MT | 1970-01-01 00:00:00.000005 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 20 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"| 10 | MT | 1970-01-01 00:00:00.000007 |",
|
||||||
|
"+-----------+------+-------------------------------+",
|
||||||
|
];
|
||||||
|
assert_batches_eq!(&expected, &batch);
|
||||||
|
}
|
||||||
|
|
||||||
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
|
fn chunk_ids(group: &[Arc<TestChunk>]) -> String {
|
||||||
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
|
let ids = group.iter().map(|c| c.id().to_string()).collect::<Vec<_>>();
|
||||||
ids.join(", ")
|
ids.join(", ")
|
||||||
|
|
|
@ -21,7 +21,9 @@ use crate::{
|
||||||
use crate::{exec::Executor, pruning::Prunable};
|
use crate::{exec::Executor, pruning::Prunable};
|
||||||
|
|
||||||
use internal_types::{
|
use internal_types::{
|
||||||
schema::{builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema},
|
schema::{
|
||||||
|
builder::SchemaBuilder, merge::SchemaMerger, InfluxColumnType, Schema, TIME_COLUMN_NAME,
|
||||||
|
},
|
||||||
selection::Selection,
|
selection::Selection,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -233,7 +235,7 @@ impl TestChunk {
|
||||||
new_self
|
new_self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a timetamp column with the test chunk
|
/// Register a timestamp column with the test chunk
|
||||||
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
|
pub fn with_time_column(self, table_name: impl Into<String>) -> Self {
|
||||||
let table_name = table_name.into();
|
let table_name = table_name.into();
|
||||||
|
|
||||||
|
@ -244,6 +246,36 @@ impl TestChunk {
|
||||||
self.add_schema_to_table(table_name, new_column_schema)
|
self.add_schema_to_table(table_name, new_column_schema)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register a timestamp column with the test chunk
|
||||||
|
pub fn with_time_column_with_stats(
|
||||||
|
self,
|
||||||
|
table_name: impl Into<String>,
|
||||||
|
min: i64,
|
||||||
|
max: i64,
|
||||||
|
) -> Self {
|
||||||
|
let table_name = table_name.into();
|
||||||
|
|
||||||
|
let mut new_self = self.with_time_column(&table_name);
|
||||||
|
|
||||||
|
// Now, find the appropriate column summary and update the stats
|
||||||
|
let column_summary: &mut ColumnSummary = new_self
|
||||||
|
.table_summary
|
||||||
|
.as_mut()
|
||||||
|
.expect("had table summary")
|
||||||
|
.columns
|
||||||
|
.iter_mut()
|
||||||
|
.find(|c| c.name == TIME_COLUMN_NAME)
|
||||||
|
.expect("had column");
|
||||||
|
|
||||||
|
column_summary.stats = Statistics::I64(StatValues {
|
||||||
|
min: Some(min),
|
||||||
|
max: Some(max),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
|
||||||
|
new_self
|
||||||
|
}
|
||||||
|
|
||||||
/// Register an int field column with the test chunk
|
/// Register an int field column with the test chunk
|
||||||
pub fn with_int_field_column(
|
pub fn with_int_field_column(
|
||||||
self,
|
self,
|
||||||
|
@ -367,19 +399,146 @@ impl TestChunk {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prepares this chunk to return a specific record batch with five
|
/// Prepares this chunk to return a specific record batch with three
|
||||||
/// rows of non null data that look like
|
/// rows of non null data that look like, no duplicates within
|
||||||
/// "+------+------+-----------+-------------------------------+",
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
/// "| tag1 | tag2 | field_int | time |",
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
/// "+------+------+-----------+-------------------------------+",
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
/// "| MA | MA | 1000 | 1970-01-01 00:00:00.000001 |",
|
/// "| WA | SC | 1000 | 1970-01-01 00:00:00.000008 |",
|
||||||
/// "| MT | MT | 10 | 1970-01-01 00:00:00.000007 |",
|
/// "| VT | NC | 10 | 1970-01-01 00:00:00.000010 |",
|
||||||
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
/// "| UT | RI | 70 | 1970-01-01 00:00:00.000020 |",
|
||||||
/// "| AL | AL | 100 | 1970-01-01 00:00:00.000000050 |",
|
|
||||||
/// "| MT | MT | 5 | 1970-01-01 00:00:00.000005 |",
|
|
||||||
/// "+------+------+-----------+-------------------------------+",
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(8000, 20000)
|
||||||
|
pub fn with_three_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
|
let schema = self
|
||||||
|
.table_schema
|
||||||
|
.as_ref()
|
||||||
|
.expect("table must exist in TestChunk");
|
||||||
|
|
||||||
|
// create arrays
|
||||||
|
let columns = schema
|
||||||
|
.iter()
|
||||||
|
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||||
|
DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70])) as ArrayRef,
|
||||||
|
DataType::Utf8 => match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT"])) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI"])) as ArrayRef,
|
||||||
|
_ => Arc::new(StringArray::from(vec!["TX", "PR", "OR"])) as ArrayRef,
|
||||||
|
},
|
||||||
|
DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new(
|
||||||
|
TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000], None),
|
||||||
|
) as ArrayRef,
|
||||||
|
DataType::Dictionary(key, value)
|
||||||
|
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||||
|
{
|
||||||
|
match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(
|
||||||
|
vec!["WA", "VT", "UT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(
|
||||||
|
vec!["SC", "NC", "RI"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
_ => Arc::new(
|
||||||
|
vec!["TX", "PR", "OR"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unimplemented!(
|
||||||
|
"Unimplemented data type for test database: {:?}",
|
||||||
|
field.data_type()
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||||
|
|
||||||
|
self.table_data.push(Arc::new(batch));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prepares this chunk to return a specific record batch with four
|
||||||
|
/// rows of non null data that look like, duplicates within
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| WA | SC | 1000 | 1970-01-01 00:00:00.000028 |",
|
||||||
|
/// "| VT | NC | 10 | 1970-01-01 00:00:00.000210 |", (1)
|
||||||
|
/// "| UT | RI | 70 | 1970-01-01 00:00:00.000220 |",
|
||||||
|
/// "| VT | NC | 50 | 1970-01-01 00:00:00.000210 |", // duplicate of (1)
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(UT, WA), tag2(RI, SC), time(28000, 220000)
|
||||||
|
pub fn with_four_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
|
let schema = self
|
||||||
|
.table_schema
|
||||||
|
.as_ref()
|
||||||
|
.expect("table must exist in TestChunk");
|
||||||
|
|
||||||
|
// create arrays
|
||||||
|
let columns = schema
|
||||||
|
.iter()
|
||||||
|
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||||
|
DataType::Int64 => Arc::new(Int64Array::from(vec![1000, 10, 70, 50])) as ArrayRef,
|
||||||
|
DataType::Utf8 => match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(StringArray::from(vec!["WA", "VT", "UT", "VT"])) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(StringArray::from(vec!["SC", "NC", "RI", "NC"])) as ArrayRef,
|
||||||
|
_ => Arc::new(StringArray::from(vec!["TX", "PR", "OR", "AL"])) as ArrayRef,
|
||||||
|
},
|
||||||
|
DataType::Timestamp(TimeUnit::Nanosecond, _) => Arc::new(
|
||||||
|
TimestampNanosecondArray::from_vec(vec![8000, 10000, 20000, 10000], None),
|
||||||
|
) as ArrayRef,
|
||||||
|
DataType::Dictionary(key, value)
|
||||||
|
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||||
|
{
|
||||||
|
match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(
|
||||||
|
vec!["WA", "VT", "UT", "VT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(
|
||||||
|
vec!["SC", "NC", "RI", "NC"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
_ => Arc::new(
|
||||||
|
vec!["TX", "PR", "OR", "AL"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unimplemented!(
|
||||||
|
"Unimplemented data type for test database: {:?}",
|
||||||
|
field.data_type()
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||||
|
|
||||||
|
self.table_data.push(Arc::new(batch));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prepares this chunk to return a specific record batch with five
|
||||||
|
/// rows of non null data that look like, no duplicates within
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |",
|
||||||
|
/// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |",
|
||||||
|
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
/// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |",
|
||||||
|
/// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000)
|
||||||
pub fn with_five_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
pub fn with_five_rows_of_data(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
//let table_name = table_name.into();
|
|
||||||
let schema = self
|
let schema = self
|
||||||
.table_schema
|
.table_schema
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
@ -439,6 +598,88 @@ impl TestChunk {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Prepares this chunk to return a specific record batch with ten
|
||||||
|
/// rows of non null data that look like, duplicates within
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| tag1 | tag2 | field_int | time |",
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// "| MT | CT | 1000 | 1970-01-01 00:00:00.000001 |",
|
||||||
|
/// "| MT | AL | 10 | 1970-01-01 00:00:00.000007 |", (1)
|
||||||
|
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000100 |",
|
||||||
|
/// "| AL | MA | 100 | 1970-01-01 00:00:00.000000050 |", (2)
|
||||||
|
/// "| MT | AL | 5 | 1970-01-01 00:00:00.000005 |", (3)
|
||||||
|
/// "| MT | CT | 1000 | 1970-01-01 00:00:00.000002 |",
|
||||||
|
/// "| MT | AL | 20 | 1970-01-01 00:00:00.000007 |", // Duplicate with (1)
|
||||||
|
/// "| CT | CT | 70 | 1970-01-01 00:00:00.000000500 |",
|
||||||
|
/// "| AL | MA | 10 | 1970-01-01 00:00:00.000000050 |", // Duplicate with (2)
|
||||||
|
/// "| MT | AL | 30 | 1970-01-01 00:00:00.000005 |", // Duplicate with (3)
|
||||||
|
/// "+------+------+-----------+-------------------------------+",
|
||||||
|
/// Stats(min, max) : tag1(AL, MT), tag2(AL, MA), time(5, 7000)
|
||||||
|
pub fn with_ten_rows_of_data_some_duplicates(mut self, _table_name: impl Into<String>) -> Self {
|
||||||
|
//let table_name = table_name.into();
|
||||||
|
let schema = self
|
||||||
|
.table_schema
|
||||||
|
.as_ref()
|
||||||
|
.expect("table must exist in TestChunk");
|
||||||
|
|
||||||
|
// create arrays
|
||||||
|
let columns = schema
|
||||||
|
.iter()
|
||||||
|
.map(|(_influxdb_column_type, field)| match field.data_type() {
|
||||||
|
DataType::Int64 => Arc::new(Int64Array::from(vec![
|
||||||
|
1000, 10, 70, 100, 5, 1000, 20, 70, 10, 30,
|
||||||
|
])) as ArrayRef,
|
||||||
|
DataType::Utf8 => match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(StringArray::from(vec![
|
||||||
|
"MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT",
|
||||||
|
])) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(StringArray::from(vec![
|
||||||
|
"CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL",
|
||||||
|
])) as ArrayRef,
|
||||||
|
_ => Arc::new(StringArray::from(vec![
|
||||||
|
"CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT",
|
||||||
|
])) as ArrayRef,
|
||||||
|
},
|
||||||
|
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
|
||||||
|
Arc::new(TimestampNanosecondArray::from_vec(
|
||||||
|
vec![1000, 7000, 100, 50, 5, 2000, 7000, 500, 50, 5],
|
||||||
|
None,
|
||||||
|
)) as ArrayRef
|
||||||
|
}
|
||||||
|
DataType::Dictionary(key, value)
|
||||||
|
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||||
|
{
|
||||||
|
match field.name().as_str() {
|
||||||
|
"tag1" => Arc::new(
|
||||||
|
vec!["MT", "MT", "CT", "AL", "MT", "MT", "MT", "CT", "AL", "MT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
"tag2" => Arc::new(
|
||||||
|
vec!["CT", "AL", "CT", "MA", "AL", "CT", "AL", "CT", "MA", "AL"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
_ => Arc::new(
|
||||||
|
vec!["CT", "MT", "AL", "AL", "MT", "CT", "MT", "AL", "AL", "MT"]
|
||||||
|
.into_iter()
|
||||||
|
.collect::<DictionaryArray<Int32Type>>(),
|
||||||
|
) as ArrayRef,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => unimplemented!(
|
||||||
|
"Unimplemented data type for test database: {:?}",
|
||||||
|
field.data_type()
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let batch = RecordBatch::try_new(schema.into(), columns).expect("made record batch");
|
||||||
|
|
||||||
|
self.table_data.push(Arc::new(batch));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns all columns of the table
|
/// Returns all columns of the table
|
||||||
pub fn all_column_names(&self) -> Option<StringSet> {
|
pub fn all_column_names(&self) -> Option<StringSet> {
|
||||||
let column_names = self.table_schema.as_ref().map(|schema| {
|
let column_names = self.table_schema.as_ref().map(|schema| {
|
||||||
|
|
Loading…
Reference in New Issue