Merge pull request #1610 from influxdata/ntran/deduplicate

test: prepare tests for deduplicate work
pull/24376/head
kodiakhq[bot] 2021-06-02 17:47:02 +00:00 committed by GitHub
commit 30f7bdf724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 229 additions and 12 deletions

View File

@ -199,7 +199,7 @@ impl<C: PartitionChunk + 'static> TableProvider for ChunkTableProvider<C> {
filters: &[Expr],
_limit: Option<usize>,
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
debug!("Input Filters to Scan: {:#?}", filters);
debug!(?filters, "Input Filters to Scan");
// Note that `filters` don't actually need to be evaluated in
// the scan for the plans to be correct, they are an extra

View File

@ -296,7 +296,7 @@ impl PartitionChunk for DbChunk {
// Predicate is not required to be applied for correctness. We only pushed it down
// when possible for performance gain
debug!("Input Predicate to read_filter: {:#?}", predicate);
debug!(?predicate, "Input Predicate to read_filter");
match &self.state {
State::MutableBuffer { chunk, .. } => {
@ -312,6 +312,8 @@ impl PartitionChunk for DbChunk {
Err(_) => read_buffer::Predicate::default(),
};
debug!(?rb_predicate, "Predicate pushed down to RUB");
let read_results = chunk
.read_filter(table_name, rb_predicate, selection)
.context(ReadBufferChunkError {

View File

@ -328,6 +328,89 @@ impl DbSetup for TwoMeasurementsManyFieldsTwoChunks {
}
}
#[derive(Debug)]
/// Setup for four chunks with duplicates for deduplicate tests
pub struct OneMeasurementThreeChunksWithDuplicates {}
#[async_trait]
impl DbSetup for OneMeasurementThreeChunksWithDuplicates {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
// Chunk 1:
// . time range: 50-250
// . no duplicates in its own chunk
let lp_lines = vec![
"h2o,state=MA,city=Boston min_temp=70.4 50",
"h2o,state=MA,city=Bedford min_temp=71.59 150",
"h2o,state=MA,city=Boston max_temp=75.4 250",
"h2o,state=MA,city=Andover max_temp=69.2, 250",
];
write_lp(&db, &lp_lines.join("\n"));
db.rollover_partition(partition_key, "h2o").await.unwrap();
db.load_chunk_to_read_buffer(partition_key, "h2o", 0, &Default::default())
.await
.unwrap();
// Chunk 2: overlaps with chunk 1
// . time range: 150 - 300
// . no duplicates in its own chunk
let lp_lines = vec![
"h2o,state=MA,city=Bedford max_temp=78.75,area=742u 150", // new field (area) and update available NULL (max_temp)
"h2o,state=MA,city=Boston min_temp=65.4 250", // update min_temp from NULL
"h2o,state=MA,city=Reading min_temp=53.4, 250",
"h2o,state=CA,city=SF min_temp=79.0,max_temp=87.2,area=500u 300",
"h2o,state=CA,city=SJ min_temp=78.5,max_temp=88.0 300",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 350",
];
write_lp(&db, &lp_lines.join("\n"));
db.rollover_partition(partition_key, "h2o").await.unwrap();
db.load_chunk_to_read_buffer(partition_key, "h2o", 1, &Default::default())
.await
.unwrap();
// Chunk 3: no overlap
// . time range: 400 - 500
// . duplicates in its own chunk
let lp_lines = vec![
"h2o,state=MA,city=Bedford max_temp=80.75,area=742u 400",
"h2o,state=MA,city=Boston min_temp=68.4 400",
"h2o,state=MA,city=Bedford min_temp=65.22,area=750u 400", // duplicate
"h2o,state=MA,city=Boston min_temp=65.40,max_temp=82.67 400", // duplicate
"h2o,state=CA,city=SJ min_temp=77.0,max_temp=90.7 450",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=88.2 500",
];
write_lp(&db, &lp_lines.join("\n"));
db.rollover_partition(partition_key, "h2o").await.unwrap();
db.load_chunk_to_read_buffer(partition_key, "h2o", 2, &Default::default())
.await
.unwrap();
// Chunk 4: no overlap
// . time range: 600 - 700
// . no duplicates
let lp_lines = vec![
"h2o,state=MA,city=Bedford max_temp=88.75,area=742u 600",
"h2o,state=MA,city=Boston min_temp=67.4 600",
"h2o,state=MA,city=Reading min_temp=60.4, 600",
"h2o,state=CA,city=SF min_temp=68.4,max_temp=85.7,area=500u 650",
"h2o,state=CA,city=SJ min_temp=69.5,max_temp=89.2 650",
"h2o,state=CA,city=SJ min_temp=75.5,max_temp=84.08 700",
];
write_lp(&db, &lp_lines.join("\n"));
db.rollover_partition(partition_key, "h2o").await.unwrap();
db.load_chunk_to_read_buffer(partition_key, "h2o", 3, &Default::default())
.await
.unwrap();
vec![DbScenario {
scenario_name: "Data in open chunk of mutable buffer and read buffer".into(),
db,
}]
}
}
#[derive(Debug)]
/// This has a single scenario with all the life cycle operations to
/// test queries that depend on that

View File

@ -40,8 +40,7 @@ macro_rules! run_sql_test_case {
};
}
/// runs table_names(predicate) and compares it to the expected
/// output
/// runs sql and compares it to the expected
macro_rules! run_sql_explain_test_case {
($DB_SETUP:expr, $SQL:expr, $EXPECTED_LINES:expr) => {
test_helpers::maybe_start_logging();
@ -613,8 +612,9 @@ async fn sql_predicate_pushdown_correctness() {
&expected
);
// Test 11: three push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and
// time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') rewritten to time GT INT(130)
// Test 11: four push-down expression: system > 5.0 and town != 'tewsbury' and system < 7.0 and
// time > to_timestamp('1970-01-01T00:00:00.000000120+00:00') (rewritten to time GT int(130))
//
let expected = vec!["++", "++"];
run_sql_test_case!(
TwoMeasurementsPredicatePushDown {},
@ -705,28 +705,53 @@ async fn sql_predicate_pushdown_explain() {
&expected
);
// Check the plan
// Test 2.2: One push-down expression: count > 200.0
let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) |",
"| | Filter: #count Gt Float64(200) |",
"| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) |",
"| | Filter: #count Gt Float64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #count Gt Int64(200) |",
"| | Filter: #count Gt Float64(200) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: CAST(count AS Int64) > 200 |",
"| | FilterExec: CAST(count AS Float64) > 200 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
];
run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200",
"EXPLAIN VERBOSE SELECT * from restaurant where count > 200.0",
&expected
);
// Test 2.3: One push-down expression: system > 4.0
let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) |",
"| | TableScan: restaurant projection=None |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| logical_plan after projection_push_down | Projection: #count, #system, #time, #town |",
"| | Filter: #system Gt Float64(4) |",
"| | TableScan: restaurant projection=Some([0, 1, 2, 3]) |",
"| physical_plan | ProjectionExec: expr=[count, system, time, town] |",
"| | FilterExec: system > 4 |",
"| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
];
run_sql_explain_test_case!(
TwoMeasurementsPredicatePushDown {},
"EXPLAIN VERBOSE SELECT * from restaurant where system > 4.0",
&expected
);
@ -958,3 +983,110 @@ async fn sql_predicate_pushdown_explain() {
&expected
);
}
#[tokio::test]
async fn sql_deduplicate() {
// This current expected is wrong because deduplicate is not available yet
let sql =
"select time, state, city, min_temp, max_temp, area from h2o order by time, state, city";
let expected = vec![
"+-------------------------------+-------+---------+----------+----------+------+",
"| time | state | city | min_temp | max_temp | area |",
"+-------------------------------+-------+---------+----------+----------+------+",
"| 1970-01-01 00:00:00.000000050 | MA | Boston | 70.4 | | |",
"| 1970-01-01 00:00:00.000000150 | MA | Bedford | | 78.75 | 742 |",
"| 1970-01-01 00:00:00.000000150 | MA | Bedford | 71.59 | | |", // duplicate
"| 1970-01-01 00:00:00.000000250 | MA | Andover | | 69.2 | |",
"| 1970-01-01 00:00:00.000000250 | MA | Boston | | 75.4 | |",
"| 1970-01-01 00:00:00.000000250 | MA | Boston | 65.4 | | |", // duplicate
"| 1970-01-01 00:00:00.000000250 | MA | Reading | 53.4 | | |",
"| 1970-01-01 00:00:00.000000300 | CA | SF | 79 | 87.2 | 500 |",
"| 1970-01-01 00:00:00.000000300 | CA | SJ | 78.5 | 88 | |", // duplicate
"| 1970-01-01 00:00:00.000000350 | CA | SJ | 75.5 | 84.08 | |",
"| 1970-01-01 00:00:00.000000400 | MA | Bedford | | 80.75 | 742 |",
"| 1970-01-01 00:00:00.000000400 | MA | Bedford | 65.22 | | 750 |", // duplicate
"| 1970-01-01 00:00:00.000000400 | MA | Boston | 65.4 | 82.67 | |",
"| 1970-01-01 00:00:00.000000400 | MA | Boston | 68.4 | | |", // duplicate
"| 1970-01-01 00:00:00.000000450 | CA | SJ | 77 | 90.7 | |",
"| 1970-01-01 00:00:00.000000500 | CA | SJ | 69.5 | 88.2 | |",
"| 1970-01-01 00:00:00.000000600 | MA | Bedford | | 88.75 | 742 |",
"| 1970-01-01 00:00:00.000000600 | MA | Boston | 67.4 | | |",
"| 1970-01-01 00:00:00.000000600 | MA | Reading | 60.4 | | |",
"| 1970-01-01 00:00:00.000000650 | CA | SF | 68.4 | 85.7 | 500 |",
"| 1970-01-01 00:00:00.000000650 | CA | SJ | 69.5 | 89.2 | |",
"| 1970-01-01 00:00:00.000000700 | CA | SJ | 75.5 | 84.08 | |",
"+-------------------------------+-------+---------+----------+----------+------+",
];
run_sql_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
// Plan with order by
let expected = vec![
"+-----------------------------------------+----------------------------------------------------------------------------+",
"| plan_type | plan |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
"| logical_plan | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |",
"| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=None |",
"| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |",
"| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| logical_plan after projection_push_down | Sort: #time ASC NULLS FIRST, #state ASC NULLS FIRST, #city ASC NULLS FIRST |",
"| | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| physical_plan | SortExec: [time ASC,state ASC,city ASC] |",
"| | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
"+-----------------------------------------+----------------------------------------------------------------------------+",
];
let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o order by time, state, city";
run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
// plan without order by
let expected = vec![
"+-----------------------------------------+--------------------------------------------------------------------+",
"| plan_type | plan |",
"+-----------------------------------------+--------------------------------------------------------------------+",
"| logical_plan | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=None |",
"| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| logical_plan after projection_push_down | Projection: #time, #state, #city, #min_temp, #max_temp, #area |",
"| | TableScan: h2o projection=Some([0, 1, 2, 3, 4, 5]) |",
"| physical_plan | ProjectionExec: expr=[time, state, city, min_temp, max_temp, area] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
"+-----------------------------------------+--------------------------------------------------------------------+",
];
let sql = "explain verbose select time, state, city, min_temp, max_temp, area from h2o";
run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
// Union plan
let sql =
"EXPLAIN VERBOSE select state as name from h2o UNION ALL select city as name from h2o";
let expected = vec![
"+-----------------------------------------+---------------------------------------------------------------------+",
"| plan_type | plan |",
"+-----------------------------------------+---------------------------------------------------------------------+",
"| logical_plan | Union |",
"| | Projection: #state AS name |",
"| | TableScan: h2o projection=None |",
"| | Projection: #city AS name |",
"| | TableScan: h2o projection=None |",
"| logical_plan after projection_push_down | Union |",
"| | Projection: #state AS name |",
"| | TableScan: h2o projection=Some([4]) |",
"| | Projection: #city AS name |",
"| | TableScan: h2o projection=Some([1]) |",
"| logical_plan after projection_push_down | Union |",
"| | Projection: #state AS name |",
"| | TableScan: h2o projection=Some([4]) |",
"| | Projection: #city AS name |",
"| | TableScan: h2o projection=Some([1]) |",
"| physical_plan | ExecutionPlan(PlaceHolder) |",
"| | ProjectionExec: expr=[state as name] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
"| | ProjectionExec: expr=[city as name] |",
"| | IOxReadFilterNode: table_name=h2o, chunks=4 predicate=Predicate |",
"+-----------------------------------------+---------------------------------------------------------------------+",
];
run_sql_explain_test_case!(OneMeasurementThreeChunksWithDuplicates {}, sql, &expected);
}