test: Delete tests for Influxrpc queries

pull/24376/head
Nga Tran 2021-10-15 17:26:36 -04:00
parent d6b7b56f16
commit 9244e9fc4e
8 changed files with 903 additions and 3 deletions

View File

@ -68,6 +68,8 @@ async fn test_field_columns_no_predicate() {
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
// NGA todo: add delete tests when the TwoMeasurementsManyFieldsWithDelete available
#[tokio::test]
async fn test_field_columns_with_pred() {
// get only fields from h20 (but both chunks)
@ -188,3 +190,47 @@ async fn test_field_name_plan() {
assert_batches_eq!(expected, &results);
}
}
// BUG: https://github.com/influxdata/influxdb_iox/issues/2860
#[ignore]
#[tokio::test]
async fn test_field_name_plan_with_delete() {
test_helpers::maybe_start_logging();
// Tests that the ordering that comes out is reasonable
let scenarios = OneMeasurementManyFieldsWithDelete {}.make().await;
for scenario in scenarios {
let predicate = PredicateBuilder::default().timestamp_range(0, 200).build();
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::new();
let ctx = db.executor().new_context(ExecutorType::Query);
let plan = planner
.field_columns(db.as_ref(), predicate.clone())
.expect("built plan successfully");
let mut plans = plan.plans;
let plan = plans.pop().unwrap();
assert!(plans.is_empty()); // only one plan
// run the created plan directly, ensuring the output is as
// expected (specifically that the column ordering is correct)
let results = ctx.run_logical_plan(plan).await.expect("ok running plan");
// Todo: After the panic bug is fixed, this result should be recheck. I think column field4 will disappear from the result
let expected = vec![
"+--------+--------+--------+--------+--------------------------------+",
"| field1 | field2 | field3 | field4 | time |",
"+--------+--------+--------+--------+--------------------------------+",
"| 70.5 | ss | 2 | | 1970-01-01T00:00:00.000000100Z |",
"+--------+--------+--------+--------+--------------------------------+",
];
assert_batches_eq!(expected, &results);
}
}

View File

@ -9,6 +9,7 @@ use async_trait::async_trait;
use data_types::timestamp::TimestampRange;
use datafusion::logical_plan::{col, lit};
use predicate::{
delete_expr::DeleteExpr,
delete_predicate::DeletePredicate,
predicate::{Predicate, PredicateBuilder, EMPTY_PREDICATE},
};
@ -58,7 +59,8 @@ impl DbSetup for TwoMeasurementsMultiSeriesWithDelete {
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
// pred: delete from h20 where 120 <= time <= 250
// pred: delete from h2o where 120 <= time <= 250
// 2 rows of h2o with timestamp 200 and 350 will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
@ -239,6 +241,55 @@ async fn test_read_filter_data_filter() {
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_filter_with_delete() {
// filter out one row in h20 but the leftover row was deleted to nothing will be returned
let predicate = PredicateBuilder::default()
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("CA"))) // state=CA
.build();
let expected_results = vec![];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
predicate,
expected_results.clone(),
)
.await;
// Same results via a != predicate.
let predicate = PredicateBuilder::default()
.timestamp_range(200, 300)
.add_expr(col("state").not_eq(lit("MA"))) // state=CA
.build();
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
predicate,
expected_results,
)
.await;
// Use different predicate to have data returned
let predicate = PredicateBuilder::default()
.timestamp_range(100, 300)
.add_expr(col("state").eq(lit("MA"))) // state=MA
.add_expr(col("_measurement").eq(lit("h2o")))
.build();
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100], values: [70.4]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
predicate,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_filter_data_filter_fields() {
// filter out one row in h20
@ -255,6 +306,8 @@ async fn test_read_filter_data_filter_fields() {
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
// NGA todo: add delete tests here after we have delete scenarios for 2 chunks for 1 table
#[tokio::test]
async fn test_read_filter_data_filter_measurement_pred() {
// use an expr on table name to pick just the last row from o2
@ -378,6 +431,50 @@ async fn test_read_filter_data_pred_using_regex_match() {
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_match_with_delete() {
let predicate = PredicateBuilder::default()
.timestamp_range(200, 300)
// will match CA state
.build_regex_match_expr("state", "C.*")
.build();
// the selected row was soft deleted
let expected_results = vec![];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
predicate,
expected_results,
)
.await;
// Different predicate to have data returned
let predicate = PredicateBuilder::default()
.timestamp_range(200, 400)
// will match CA state
.build_regex_match_expr("state", "C.*")
.build();
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [90.0]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
predicate.clone(),
expected_results,
)
.await;
// Try same predicate but on delete_all data
let expected_results = vec![];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDeleteAll {},
predicate,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_not_match() {
let predicate = PredicateBuilder::default()
@ -413,10 +510,60 @@ async fn test_read_filter_data_pred_unsupported_in_scan() {
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
// // correct one
// let expected_results = vec![
// "Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 350], values: [90.0, 90.0]",
// "Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
// "Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
// ];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_unsupported_in_scan_with_delete() {
test_helpers::maybe_start_logging();
// These predicates can't be pushed down into chunks, but they can
// be evaluated by the general purpose DataFusion plan
// https://github.com/influxdata/influxdb_iox/issues/883
// (STATE = 'CA') OR (READING > 0)
let predicate = PredicateBuilder::default()
.add_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0))))
.build();
// Note these results are incorrect (they do not include data from h2o where
// state = CA)
let expected_results = vec![
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
// // correct one
// let expected_results = vec![
// "Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [90.0]",
// "Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
// "Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
// ];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDelete {},
predicate.clone(),
expected_results,
)
.await;
// With delete all from h2o, no rows from h2p should be returned
let expected_results = vec![
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeriesWithDeleteAll {},
predicate,
expected_results,
)
.await;
}
#[derive(Debug)]
pub struct MeasurementsSortableTags {}
#[async_trait]
@ -436,6 +583,47 @@ impl DbSetup for MeasurementsSortableTags {
}
}
#[derive(Debug)]
pub struct MeasurementsSortableTagsWithDelete {}
#[async_trait]
impl DbSetup for MeasurementsSortableTagsWithDelete {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"h2o,zz_tag=A,state=MA,city=Kingston temp=70.1 800",
"h2o,state=MA,city=Kingston,zz_tag=B temp=70.2 100",
"h2o,state=CA,city=Boston temp=70.3 250", // soft deleted
"h2o,state=MA,city=Boston,zz_tag=A temp=70.4 1000",
"h2o,state=MA,city=Boston temp=70.5,other=5.0 250",
];
// pred: delete from h2o where 120 <= time <= 350 and state=CA
// 1 rows of h2o with timestamp 250 will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 120,
end: 350,
},
exprs: vec![DeleteExpr::new(
"state".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String(("CA").to_string()),
)],
};
all_scenarios_for_one_chunk(
vec![&pred],
vec![],
lp_lines,
delete_table_name,
partition_key,
)
.await
}
}
#[tokio::test]
async fn test_read_filter_data_plan_order() {
test_helpers::maybe_start_logging();
@ -451,3 +639,23 @@ async fn test_read_filter_data_plan_order() {
run_read_filter_test_case(MeasurementsSortableTags {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_plan_order_with_delete() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default();
let expected_results = vec![
"Series tags={_field=other, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [5.0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.5]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA, zz_tag=A}\n FloatPoints timestamps: [1000], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=A}\n FloatPoints timestamps: [800], values: [70.1]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=B}\n FloatPoints timestamps: [100], values: [70.2]",
];
run_read_filter_test_case(
MeasurementsSortableTagsWithDelete {},
predicate,
expected_results,
)
.await;
}

View File

@ -7,8 +7,13 @@ use crate::{
};
use async_trait::async_trait;
use data_types::timestamp::TimestampRange;
use datafusion::prelude::*;
use predicate::predicate::{Predicate, PredicateBuilder};
use predicate::{
delete_expr::DeleteExpr,
delete_predicate::DeletePredicate,
predicate::{Predicate, PredicateBuilder},
};
use query::{frontend::influxrpc::InfluxRpcPlanner, group_by::Aggregate};
/// runs read_group(predicate) and compares it to the expected
@ -67,9 +72,65 @@ impl DbSetup for OneMeasurementNoTags {
}
}
struct OneMeasurementNoTagsWithDelete {}
#[async_trait]
impl DbSetup for OneMeasurementNoTagsWithDelete {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec!["m0 foo=1.0 1", "m0 foo=2.0 2"];
// pred: delete from m0 where 1 <= time <= 1 and foo=1.0
// 1 row of m0 with timestamp 1
let delete_table_name = "m0";
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 1 },
exprs: vec![DeleteExpr::new(
"foo".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::F64((1.0).into()),
)],
};
all_scenarios_for_one_chunk(
vec![&pred],
vec![],
lp_lines,
delete_table_name,
partition_key,
)
.await
}
}
struct OneMeasurementNoTagsWithDeleteAll {}
#[async_trait]
impl DbSetup for OneMeasurementNoTagsWithDeleteAll {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec!["m0 foo=1.0 1", "m0 foo=2.0 2"];
// pred: delete from m0 where 1 <= time <= 2
let delete_table_name = "m0";
let pred = DeletePredicate {
range: TimestampRange { start: 1, end: 2 },
exprs: vec![],
};
all_scenarios_for_one_chunk(
vec![&pred],
vec![],
lp_lines,
delete_table_name,
partition_key,
)
.await
}
}
#[tokio::test]
async fn test_read_group_data_no_tag_columns() {
let predicate = Predicate::default();
// Count
let agg = Aggregate::Count;
let group_columns = vec![];
let expected_results = vec![
@ -79,6 +140,95 @@ async fn test_read_group_data_no_tag_columns() {
run_read_group_test_case(
OneMeasurementNoTags {},
predicate.clone(),
agg,
group_columns.clone(),
expected_results,
)
.await;
// min
let agg = Aggregate::Min;
let expected_results = vec![
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n FloatPoints timestamps: [1], values: [1.0]",
];
run_read_group_test_case(
OneMeasurementNoTags {},
predicate,
agg,
group_columns,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_group_data_no_tag_columns_with_delete() {
let predicate = Predicate::default();
// count
let agg = Aggregate::Count;
let group_columns = vec![];
let expected_results = vec![
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n IntegerPoints timestamps: [2], values: [1]",
];
run_read_group_test_case(
OneMeasurementNoTagsWithDelete {},
predicate.clone(),
agg,
group_columns.clone(),
expected_results,
)
.await;
// min
let agg = Aggregate::Min;
let expected_results = vec![
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n FloatPoints timestamps: [2], values: [2.0]",
];
run_read_group_test_case(
OneMeasurementNoTagsWithDelete {},
predicate.clone(),
agg,
group_columns.clone(),
expected_results,
)
.await;
}
// BUG: https://github.com/influxdata/influxdb_iox/issues/2859
// Inconsistent results when data in MUB and RUB
#[ignore]
#[tokio::test]
async fn test_read_group_data_no_tag_columns_with_delete_all() {
let predicate = Predicate::default();
// count
let agg = Aggregate::Count;
let group_columns = vec![];
let expected_results = vec![
"Group tag_keys: _field, _measurement partition_key_vals: ",
"Series tags={_field=foo, _measurement=m0}\n IntegerPoints timestamps: [0], values: [0]",
];
run_read_group_test_case(
OneMeasurementNoTagsWithDeleteAll {},
predicate.clone(),
agg,
group_columns.clone(),
expected_results,
)
.await;
// min
let agg = Aggregate::Min;
let expected_results = vec!["Group tag_keys: _field, _measurement partition_key_vals: "];
run_read_group_test_case(
OneMeasurementNoTagsWithDeleteAll {},
predicate,
agg,
group_columns,
@ -106,6 +256,8 @@ impl DbSetup for OneMeasurementForAggs {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_read_group_data_pred() {
let predicate = PredicateBuilder::default()
@ -176,6 +328,8 @@ impl DbSetup for AnotherMeasurementForAggs {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_grouped_series_set_plan_sum() {
let predicate = PredicateBuilder::default()
@ -295,6 +449,8 @@ impl DbSetup for TwoMeasurementForAggs {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_grouped_series_set_plan_count_measurement_pred() {
let predicate = PredicateBuilder::default()
@ -343,6 +499,8 @@ impl DbSetup for MeasurementForSelectors {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_grouped_series_set_plan_first() {
let predicate = PredicateBuilder::default()
@ -418,6 +576,8 @@ impl DbSetup for MeasurementForMin {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_grouped_series_set_plan_min() {
let predicate = PredicateBuilder::default()
@ -463,6 +623,8 @@ impl DbSetup for MeasurementForMax {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_grouped_series_set_plan_max() {
let predicate = PredicateBuilder::default()
@ -513,6 +675,8 @@ impl DbSetup for MeasurementForGroupKeys {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_grouped_series_set_plan_group_by_state_city() {
// no predicate

View File

@ -4,11 +4,15 @@ use crate::{
scenarios::{util::all_scenarios_for_one_chunk, *},
};
use data_types::timestamp::TimestampRange;
use server::{db::test_helpers::write_lp, utils::make_db};
use async_trait::async_trait;
use datafusion::prelude::*;
use predicate::predicate::{Predicate, PredicateBuilder};
use predicate::{
delete_predicate::DeletePredicate,
predicate::{Predicate, PredicateBuilder},
};
use query::{
frontend::influxrpc::InfluxRpcPlanner,
group_by::{Aggregate, WindowDuration},
@ -99,6 +103,8 @@ impl DbSetup for MeasurementForWindowAggregate {
}
}
// NGA todo: add delete DbSetup after all scenarios are done for 2 chunks
#[tokio::test]
async fn test_read_window_aggregate_nanoseconds() {
let predicate = PredicateBuilder::default()
@ -214,6 +220,8 @@ impl DbSetup for MeasurementForWindowAggregateMonths {
}
}
// NGA todo: add delete DbSetup
#[tokio::test]
async fn test_read_window_aggregate_months() {
let predicate = PredicateBuilder::default().build();
@ -262,6 +270,71 @@ impl DbSetup for MeasurementForDefect2697 {
}
}
struct MeasurementForDefect2697WithDelete {}
#[async_trait]
impl DbSetup for MeasurementForDefect2697WithDelete {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "2021-01-01T00";
let lp = vec![
"mm,section=1a bar=5.0 1609459201000000011",
"mm,section=1a bar=0.28 1609459201000000031",
"mm,section=2b bar=4.0 1609459201000000009",
"mm,section=2b bar=6.0 1609459201000000015",
"mm,section=2b bar=1.2 1609459201000000022",
"mm,section=1a foo=1.0 1609459201000000001",
"mm,section=1a foo=3.0 1609459201000000005",
"mm,section=1a foo=11.24 1609459201000000024",
"mm,section=2b foo=2.0 1609459201000000002",
];
// pred: delete from mm where 1609459201000000022 <= time <= 1609459201000000022
// 1 row of m0 with timestamp 1609459201000000022 (section=2b bar=1.2)
let delete_table_name = "mm";
let pred = DeletePredicate {
range: TimestampRange {
start: 1609459201000000022,
end: 1609459201000000022,
},
exprs: vec![],
};
all_scenarios_for_one_chunk(vec![&pred], vec![], lp, delete_table_name, partition_key).await
}
}
struct MeasurementForDefect2697WithDeleteAll {}
#[async_trait]
impl DbSetup for MeasurementForDefect2697WithDeleteAll {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "2021-01-01T00";
let lp = vec![
"mm,section=1a bar=5.0 1609459201000000011",
"mm,section=1a bar=0.28 1609459201000000031",
"mm,section=2b bar=4.0 1609459201000000009",
"mm,section=2b bar=6.0 1609459201000000015",
"mm,section=2b bar=1.2 1609459201000000022",
"mm,section=1a foo=1.0 1609459201000000001",
"mm,section=1a foo=3.0 1609459201000000005",
"mm,section=1a foo=11.24 1609459201000000024",
"mm,section=2b foo=2.0 1609459201000000002",
];
// pred: delete from mm where 1 <= time <= 1609459201000000031
let delete_table_name = "mm";
let pred = DeletePredicate {
range: TimestampRange {
start: 1,
end: 1609459201000000031,
},
exprs: vec![],
};
all_scenarios_for_one_chunk(vec![&pred], vec![], lp, delete_table_name, partition_key).await
}
}
// See https://github.com/influxdata/influxdb_iox/issues/2697
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
@ -295,6 +368,47 @@ async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
.await;
}
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697_with_delete() {
let predicate = PredicateBuilder::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031)
.build();
let agg = Aggregate::Min;
let every = WindowDuration::from_nanoseconds(10);
let offset = WindowDuration::from_nanoseconds(0);
// one row deleted
let expected_results = vec![
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000011], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000001, 1609459201000000024], values: [1.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000009, 1609459201000000015], values: [4.0, 6.0]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000002], values: [2.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697WithDelete {},
predicate.clone(),
agg,
every.clone(),
offset.clone(),
expected_results,
)
.await;
// all rows deleted
let expected_results = vec![];
run_read_window_aggregate_test_case(
MeasurementForDefect2697WithDeleteAll {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
// See https://github.com/influxdata/influxdb_iox/issues/2697
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() {
@ -326,3 +440,47 @@ async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() {
)
.await;
}
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697_with_delete() {
let predicate = PredicateBuilder::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1609459201000000001, 1609459201000000031)
.build();
let agg = Aggregate::Sum;
let every = WindowDuration::from_nanoseconds(10);
let offset = WindowDuration::from_nanoseconds(0);
// one row deleted
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD).
// For each distinct series the window defines the `time` column
let expected_results = vec![
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000020], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000020], values: [4.0, 6.0]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697WithDelete {},
predicate.clone(),
agg,
every.clone(),
offset.clone(),
expected_results,
)
.await;
// all rows deleted
let expected_results = vec![];
run_read_window_aggregate_test_case(
MeasurementForDefect2697WithDeleteAll {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}

View File

@ -53,26 +53,114 @@ async fn list_table_names_no_data_pred() {
run_table_names_test_case(TwoMeasurements {}, EMPTY_PREDICATE, vec!["cpu", "disk"]).await;
}
#[tokio::test]
async fn list_table_names_no_data_pred_with_delete() {
run_table_names_test_case(
TwoMeasurementsWithDelete {},
EMPTY_PREDICATE,
vec!["cpu", "disk"],
)
.await;
}
// https://github.com/influxdata/influxdb_iox/issues/2861
// And all other ignored tests
#[ignore]
#[tokio::test]
async fn list_table_names_no_data_pred_with_delete_all() {
run_table_names_test_case(
TwoMeasurementsWithDeleteAll {},
EMPTY_PREDICATE,
vec!["disk"],
)
.await;
}
#[tokio::test]
async fn list_table_names_data_pred_0_201() {
run_table_names_test_case(TwoMeasurements {}, tsp(0, 201), vec!["cpu", "disk"]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_0_201_with_delete() {
run_table_names_test_case(
TwoMeasurementsWithDelete {},
tsp(0, 201),
vec!["cpu", "disk"],
)
.await;
}
#[ignore]
#[tokio::test]
async fn list_table_names_data_pred_0_201_with_delete_all() {
run_table_names_test_case(TwoMeasurementsWithDeleteAll {}, tsp(0, 201), vec!["disk"]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_0_200() {
run_table_names_test_case(TwoMeasurements {}, tsp(0, 200), vec!["cpu"]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_0_200_with_delete() {
run_table_names_test_case(TwoMeasurementsWithDelete {}, tsp(0, 200), vec!["cpu"]).await;
}
#[ignore]
#[tokio::test]
async fn list_table_names_data_pred_0_200_with_delete_all() {
run_table_names_test_case(TwoMeasurementsWithDeleteAll {}, tsp(0, 200), vec![]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_50_101() {
run_table_names_test_case(TwoMeasurements {}, tsp(50, 101), vec!["cpu"]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_50_101_with_delete() {
run_table_names_test_case(TwoMeasurementsWithDelete {}, tsp(50, 101), vec!["cpu"]).await;
}
#[ignore]
#[tokio::test]
async fn list_table_names_data_pred_50_101_with_delete_all() {
run_table_names_test_case(TwoMeasurementsWithDeleteAll {}, tsp(50, 101), vec![]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_101_160() {
run_table_names_test_case(TwoMeasurements {}, tsp(101, 160), vec!["cpu"]).await;
}
#[ignore]
#[tokio::test]
async fn list_table_names_data_pred_101_160_with_delete() {
run_table_names_test_case(TwoMeasurementsWithDelete {}, tsp(101, 160), vec![]).await;
}
#[ignore]
#[tokio::test]
async fn list_table_names_data_pred_101_160_with_delete_all() {
run_table_names_test_case(TwoMeasurementsWithDeleteAll {}, tsp(101, 160), vec![]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_250_300() {
run_table_names_test_case(TwoMeasurements {}, tsp(250, 300), vec![]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_250_300_with_delete() {
run_table_names_test_case(TwoMeasurementsWithDelete {}, tsp(250, 300), vec![]).await;
}
#[tokio::test]
async fn list_table_names_data_pred_250_300_with_delete_all() {
run_table_names_test_case(TwoMeasurementsWithDeleteAll {}, tsp(250, 300), vec![]).await;
}
// Note when table names supports general purpose predicates, add a
// test here with a `_measurement` predicate
// https://github.com/influxdata/influxdb_iox/issues/762

View File

@ -53,6 +53,8 @@ async fn list_tag_columns_no_predicate() {
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
}
// NGA todo: add delete tests when TwoMeasurementsManyNullsWithDelete available
#[tokio::test]
async fn list_tag_columns_timestamp() {
let predicate = PredicateBuilder::default()
@ -142,6 +144,18 @@ async fn list_tag_name_end_to_end() {
run_tag_keys_test_case(EndToEndTest {}, predicate, expected_tag_keys).await;
}
// https://github.com/influxdata/influxdb_iox/issues/2863
#[ignore]
#[tokio::test]
async fn list_tag_name_end_to_end_with_delete() {
let predicate = PredicateBuilder::default()
.timestamp_range(0, 10000)
.add_expr(col("host").eq(lit("server01")))
.build();
let expected_tag_keys = vec!["host", "region"];
run_tag_keys_test_case(EndToEndTestWithDelete {}, predicate, expected_tag_keys).await;
}
fn to_stringset(v: &[&str]) -> StringSetRef {
v.into_stringset().unwrap()
}

View File

@ -62,6 +62,8 @@ async fn list_tag_values_no_tag() {
.await;
}
// NGA todo: add delete tests when TwoMeasurementsManyNullsWithDelete available
#[tokio::test]
async fn list_tag_values_no_predicate_state_col() {
let predicate = PredicateBuilder::default().build();
@ -76,6 +78,37 @@ async fn list_tag_values_no_predicate_state_col() {
.await;
}
// https://github.com/influxdata/influxdb_iox/issues/2864
#[ignore]
#[tokio::test]
async fn list_tag_values_no_predicate_state_col_with_delete() {
let predicate = PredicateBuilder::default().build();
let tag_name = "state";
let expected_tag_keys = vec!["CA", "MA"];
run_tag_values_test_case(
OneMeasurementManyNullTagsWithDelete {},
tag_name,
predicate,
expected_tag_keys,
)
.await;
}
#[ignore]
#[tokio::test]
async fn list_tag_values_no_predicate_state_col_with_delete_all() {
let predicate = PredicateBuilder::default().build();
let tag_name = "state";
let expected_tag_keys = vec![];
run_tag_values_test_case(
OneMeasurementManyNullTagsWithDeleteAll {},
tag_name,
predicate,
expected_tag_keys,
)
.await;
}
#[tokio::test]
async fn list_tag_values_no_predicate_city_col() {
let tag_name = "city";

View File

@ -222,6 +222,110 @@ impl DbSetup for OneMeasurementRealisticTimes {
}
}
#[derive(Debug)]
pub struct OneMeasurementManyNullTags {}
#[async_trait]
impl DbSetup for OneMeasurementManyNullTags {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"h2o,state=CA,city=LA,county=LA temp=70.4 100",
"h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250",
"h2o,state=MA,city=Boston temp=50.4 200",
"h2o,state=CA temp=79.0 300",
"h2o,state=NY temp=60.8 400",
"h2o,state=NY,city=NYC temp=61.0 500",
"h2o,state=NY,city=NYC,borough=Brooklyn temp=61.0 600",
];
// return all possible scenarios a chunk: MUB open, MUB frozen, RUB, RUB & OS, OS
all_scenarios_for_one_chunk(vec![], vec![], lp_lines, "cpu", partition_key).await
}
}
#[derive(Debug)]
pub struct OneMeasurementManyNullTagsWithDelete {}
#[async_trait]
impl DbSetup for OneMeasurementManyNullTagsWithDelete {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"h2o,state=CA,city=LA,county=LA temp=70.4 100",
"h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250",
"h2o,state=MA,city=Boston temp=50.4 200",
"h2o,state=CA temp=79.0 300",
"h2o,state=NY temp=60.8 400",
"h2o,state=NY,city=NYC temp=61.0 500",
"h2o,state=NY,city=NYC,borough=Brooklyn temp=61.0 600",
];
// pred: delete from h2o where 400 <= time <= 602 and state=NY
// 3 rows of h2o & NY state will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 400,
end: 602,
},
exprs: vec![DeleteExpr::new(
"state".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String(("NY").to_string()),
)],
};
all_scenarios_for_one_chunk(
vec![&pred],
vec![],
lp_lines,
delete_table_name,
partition_key,
)
.await
}
}
#[derive(Debug)]
pub struct OneMeasurementManyNullTagsWithDeleteAll {}
#[async_trait]
impl DbSetup for OneMeasurementManyNullTagsWithDeleteAll {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
let lp_lines = vec![
"h2o,state=CA,city=LA,county=LA temp=70.4 100",
"h2o,state=MA,city=Boston,county=Suffolk temp=72.4 250",
"h2o,state=MA,city=Boston temp=50.4 200",
"h2o,state=CA temp=79.0 300",
"h2o,state=NY temp=60.8 400",
"h2o,state=NY,city=NYC temp=61.0 500",
"h2o,state=NY,city=NYC,borough=Brooklyn temp=61.0 600",
];
// pred: delete from h2o where 100 <= time <= 602
// all rows of h2o will be deleted
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 100,
end: 602,
},
exprs: vec![],
};
all_scenarios_for_one_chunk(
vec![&pred],
vec![],
lp_lines,
delete_table_name,
partition_key,
)
.await
}
}
/// Two measurements data in a single mutable buffer chunk
#[derive(Debug)]
pub struct TwoMeasurementsMubScenario {}
@ -291,6 +395,7 @@ impl DbSetup for TwoMeasurementsWithDelete {
];
// pred: delete from cpu where 120 <= time <= 160 and region="west"
// delete 1 row from cpu with timestamp 150
let table_name = "cpu";
let pred = DeletePredicate {
range: TimestampRange {
@ -744,6 +849,45 @@ impl DbSetup for OneMeasurementManyFields {
}
}
#[derive(Debug)]
pub struct OneMeasurementManyFieldsWithDelete {}
#[async_trait]
impl DbSetup for OneMeasurementManyFieldsWithDelete {
async fn make(&self) -> Vec<DbScenario> {
let partition_key = "1970-01-01T00";
// Order this so field3 comes before field2
// (and thus the columns need to get reordered)
let lp_lines = vec![
"h2o,tag1=foo,tag2=bar field1=70.6,field3=2 100",
"h2o,tag1=foo,tag2=bar field1=70.4,field2=\"ss\" 100",
"h2o,tag1=foo,tag2=bar field1=70.5,field2=\"ss\" 100",
"h2o,tag1=foo,tag2=bar field1=70.6,field4=true 1000",
];
// pred: delete from h2o where 1000 <= time <= 1000
// 1 rows of h2o with timestamp 1000 will be deleted which means
// field4 no longer available
let delete_table_name = "h2o";
let pred = DeletePredicate {
range: TimestampRange {
start: 1000,
end: 3000,
},
exprs: vec![],
};
all_scenarios_for_one_chunk(
vec![&pred],
vec![],
lp_lines,
delete_table_name,
partition_key,
)
.await
}
}
/// This data (from end to end test)
#[derive(Debug)]
pub struct EndToEndTest {}
@ -768,6 +912,51 @@ impl DbSetup for EndToEndTest {
}
}
#[derive(Debug)]
pub struct EndToEndTestWithDelete {}
#[async_trait]
impl DbSetup for EndToEndTestWithDelete {
async fn make(&self) -> Vec<DbScenario> {
let lp_lines = vec![
"cpu_load_short,host=server01,region=us-west value=0.64 0000",
"cpu_load_short,host=server01 value=27.99 1000",
"cpu_load_short,host=server02,region=us-west value=3.89 2000",
"cpu_load_short,host=server01,region=us-east value=1234567.891011 3000",
"cpu_load_short,host=server01,region=us-west value=0.000003 4000",
"system,host=server03 uptime=1303385 5000",
"swap,host=server01,name=disk0 in=3,out=4 6000",
"status active=t 7000",
"attributes color=\"blue\" 8000",
];
let partition_key = "1970-01-01T00";
// pred: delete from swap where 6000 <= time <= 6000 and name=disk0
// 1 rows of h2o with timestamp 250 will be deleted
let delete_table_name = "swap";
let pred = DeletePredicate {
range: TimestampRange {
start: 6000,
end: 6000,
},
exprs: vec![DeleteExpr::new(
"name".to_string(),
predicate::delete_expr::Op::Eq,
predicate::delete_expr::Scalar::String(("disk0").to_string()),
)],
};
all_scenarios_for_one_chunk(
vec![&pred],
vec![],
lp_lines,
delete_table_name,
partition_key,
)
.await
}
}
/// This function loads one chunk of lp data into MUB only
///
pub(crate) async fn make_one_chunk_mub_scenario(data: &str) -> Vec<DbScenario> {