test: tests to show predicate simplification on chunks (#3649)

* test: tests to show predicate simplification on chunks

* fix: clippy

* refactor: less Box

* refactor: make typealias + add comments, hopefully to improve clarity

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-02-07 10:04:20 -05:00 committed by GitHub
parent 2e30483f1f
commit e6ec8ef5f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 75 additions and 25 deletions

View File

@ -1953,7 +1953,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_predicate_rewrite_table_names() { async fn test_predicate_rewrite_table_names() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| { run_test(&|test_db, rpc_predicate| {
InfluxRpcPlanner::new() InfluxRpcPlanner::new()
.table_names(test_db, rpc_predicate) .table_names(test_db, rpc_predicate)
.expect("creating plan"); .expect("creating plan");
@ -1963,7 +1963,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_predicate_rewrite_tag_keys() { async fn test_predicate_rewrite_tag_keys() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| { run_test(&|test_db, rpc_predicate| {
InfluxRpcPlanner::new() InfluxRpcPlanner::new()
.tag_keys(test_db, rpc_predicate) .tag_keys(test_db, rpc_predicate)
.expect("creating plan"); .expect("creating plan");
@ -1973,7 +1973,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_predicate_rewrite_tag_values() { async fn test_predicate_rewrite_tag_values() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| { run_test(&|test_db, rpc_predicate| {
InfluxRpcPlanner::new() InfluxRpcPlanner::new()
.tag_values(test_db, "foo", rpc_predicate) .tag_values(test_db, "foo", rpc_predicate)
.expect("creating plan"); .expect("creating plan");
@ -1983,7 +1983,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_predicate_rewrite_field_columns() { async fn test_predicate_rewrite_field_columns() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| { run_test(&|test_db, rpc_predicate| {
InfluxRpcPlanner::new() InfluxRpcPlanner::new()
.field_columns(test_db, rpc_predicate) .field_columns(test_db, rpc_predicate)
.expect("creating plan"); .expect("creating plan");
@ -1993,7 +1993,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_predicate_rewrite_read_filter() { async fn test_predicate_rewrite_read_filter() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| { run_test(&|test_db, rpc_predicate| {
InfluxRpcPlanner::new() InfluxRpcPlanner::new()
.read_filter(test_db, rpc_predicate) .read_filter(test_db, rpc_predicate)
.expect("creating plan"); .expect("creating plan");
@ -2003,7 +2003,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_predicate_read_group() { async fn test_predicate_read_group() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| { run_test(&|test_db, rpc_predicate| {
let agg = Aggregate::None; let agg = Aggregate::None;
let group_columns = &["foo"]; let group_columns = &["foo"];
InfluxRpcPlanner::new() InfluxRpcPlanner::new()
@ -2015,7 +2015,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_predicate_read_window_aggregate() { async fn test_predicate_read_window_aggregate() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| { run_test(&|test_db, rpc_predicate| {
let agg = Aggregate::First; let agg = Aggregate::First;
let every = WindowDuration::from_months(1, false); let every = WindowDuration::from_months(1, false);
let offset = WindowDuration::from_months(1, false); let offset = WindowDuration::from_months(1, false);
@ -2026,17 +2026,15 @@ mod tests {
.await .await
} }
/// Runs func() and checks that predicates are simplified prior to sending them off /// Given a `TestDatabase` plans a InfluxRPC query
async fn run_test<F, D>(f: F) /// (e.g. read_filter, read_window_aggregate, etc). The test below
where /// ensures that predicates are simplified during query planning.
F: FnOnce(&TestDatabase, InfluxRpcPredicate) + Send, type PlanRPCFunc = dyn Fn(&TestDatabase, InfluxRpcPredicate) + Send + Sync;
{
let chunk0 = Arc::new( /// Runs func() and checks that predicates are simplified prior to
TestChunk::new("h2o") /// sending them down to the chunks for processing.
.with_id(0) async fn run_test(func: &'static PlanRPCFunc) {
.with_tag_column("foo") // ------------- Test 1 ----------------
.with_time_column(),
);
// this is what happens with a grpc predicate on a tag // this is what happens with a grpc predicate on a tag
// //
@ -2053,22 +2051,74 @@ mod tests {
.add_expr(expr.eq(lit("bar"))) .add_expr(expr.eq(lit("bar")))
.build(); .build();
// verify that the predicate was rewritten to `foo = 'bar'`
let expr = col("foo").eq(lit("bar"));
let expected_predicate = PredicateBuilder::new().add_expr(expr).build();
run_test_with_predicate(&func, silly_predicate, expected_predicate).await;
// ------------- Test 2 ----------------
// Validate that _measurement predicates are translated
//
// https://github.com/influxdata/influxdb_iox/issues/3601
// _measurement = 'foo'
let silly_predicate = PredicateBuilder::new()
.add_expr(col("_measurement").eq(lit("foo")))
.build();
// verify that the predicate was rewritten to `false` as the
// measurement name is `h20`
let expr = lit(false);
let expected_predicate = PredicateBuilder::new().add_expr(expr).build();
run_test_with_predicate(&func, silly_predicate, expected_predicate).await;
// ------------- Test 3 ----------------
// more complicated _measurement predicates are translated
//
// https://github.com/influxdata/influxdb_iox/issues/3601
// (_measurement = 'foo' or measurement = 'h2o') AND time > 5
let silly_predicate = PredicateBuilder::new()
.add_expr(
col("_measurement")
.eq(lit("foo"))
.or(col("_measurement").eq(lit("h2o")))
.and(col("time").gt(lit(5))),
)
.build();
// verify that the predicate was rewritten to time > 5
let expr = col("time").gt(lit(5));
let expected_predicate = PredicateBuilder::new().add_expr(expr).build();
run_test_with_predicate(&func, silly_predicate, expected_predicate).await;
}
/// Runs func() with the specified predicate and verifies
/// `expected_predicate` is received by the chunk
async fn run_test_with_predicate(
func: &PlanRPCFunc,
predicate: Predicate,
expected_predicate: Predicate,
) {
let chunk0 = Arc::new(
TestChunk::new("h2o")
.with_id(0)
.with_tag_column("foo")
.with_time_column(),
);
let executor = Arc::new(Executor::new(1)); let executor = Arc::new(Executor::new(1));
let test_db = TestDatabase::new(Arc::clone(&executor)); let test_db = TestDatabase::new(Arc::clone(&executor));
test_db.add_chunk("my_partition_key", Arc::clone(&chunk0)); test_db.add_chunk("my_partition_key", Arc::clone(&chunk0));
let rpc_predicate = InfluxRpcPredicate::new(None, silly_predicate); let rpc_predicate = InfluxRpcPredicate::new(None, predicate);
// run the function // run the function
f(&test_db, rpc_predicate); func(&test_db, rpc_predicate);
let actual_predicate = test_db.get_chunks_predicate(); let actual_predicate = test_db.get_chunks_predicate();
// verify that the predicate was rewritten to `foo = 'bar'`
let expr = col("foo").eq(lit("bar"));
let expected_predicate = PredicateBuilder::new().add_expr(expr).build();
assert_eq!( assert_eq!(
actual_predicate, expected_predicate, actual_predicate, expected_predicate,
"\nActual: {:?}\nExpected: {:?}", "\nActual: {:?}\nExpected: {:?}",