diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 0f4f4da8ce..8727850615 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -1953,7 +1953,7 @@ mod tests { #[tokio::test] async fn test_predicate_rewrite_table_names() { - run_test::<_, TestDatabase>(|test_db, rpc_predicate| { + run_test(&|test_db, rpc_predicate| { InfluxRpcPlanner::new() .table_names(test_db, rpc_predicate) .expect("creating plan"); @@ -1963,7 +1963,7 @@ mod tests { #[tokio::test] async fn test_predicate_rewrite_tag_keys() { - run_test::<_, TestDatabase>(|test_db, rpc_predicate| { + run_test(&|test_db, rpc_predicate| { InfluxRpcPlanner::new() .tag_keys(test_db, rpc_predicate) .expect("creating plan"); @@ -1973,7 +1973,7 @@ mod tests { #[tokio::test] async fn test_predicate_rewrite_tag_values() { - run_test::<_, TestDatabase>(|test_db, rpc_predicate| { + run_test(&|test_db, rpc_predicate| { InfluxRpcPlanner::new() .tag_values(test_db, "foo", rpc_predicate) .expect("creating plan"); @@ -1983,7 +1983,7 @@ mod tests { #[tokio::test] async fn test_predicate_rewrite_field_columns() { - run_test::<_, TestDatabase>(|test_db, rpc_predicate| { + run_test(&|test_db, rpc_predicate| { InfluxRpcPlanner::new() .field_columns(test_db, rpc_predicate) .expect("creating plan"); @@ -1993,7 +1993,7 @@ mod tests { #[tokio::test] async fn test_predicate_rewrite_read_filter() { - run_test::<_, TestDatabase>(|test_db, rpc_predicate| { + run_test(&|test_db, rpc_predicate| { InfluxRpcPlanner::new() .read_filter(test_db, rpc_predicate) .expect("creating plan"); @@ -2003,7 +2003,7 @@ mod tests { #[tokio::test] async fn test_predicate_read_group() { - run_test::<_, TestDatabase>(|test_db, rpc_predicate| { + run_test(&|test_db, rpc_predicate| { let agg = Aggregate::None; let group_columns = &["foo"]; InfluxRpcPlanner::new() @@ -2015,7 +2015,7 @@ mod tests { #[tokio::test] async fn test_predicate_read_window_aggregate() { - run_test::<_, TestDatabase>(|test_db, rpc_predicate| { + run_test(&|test_db, rpc_predicate| { let agg = Aggregate::First; let every = WindowDuration::from_months(1, false); let offset = WindowDuration::from_months(1, false); @@ -2026,17 +2026,15 @@ mod tests { .await } - /// Runs func() and checks that predicates are simplified prior to sending them off - async fn run_test(f: F) - where - F: FnOnce(&TestDatabase, InfluxRpcPredicate) + Send, - { - let chunk0 = Arc::new( - TestChunk::new("h2o") - .with_id(0) - .with_tag_column("foo") - .with_time_column(), - ); + /// Given a `TestDatabase` plans a InfluxRPC query + /// (e.g. read_filter, read_window_aggregate, etc). The test below + /// ensures that predicates are simplified during query planning. + type PlanRPCFunc = dyn Fn(&TestDatabase, InfluxRpcPredicate) + Send + Sync; + + /// Runs func() and checks that predicates are simplified prior to + /// sending them down to the chunks for processing. + async fn run_test(func: &'static PlanRPCFunc) { + // ------------- Test 1 ---------------- // this is what happens with a grpc predicate on a tag // @@ -2053,22 +2051,74 @@ mod tests { .add_expr(expr.eq(lit("bar"))) .build(); + // verify that the predicate was rewritten to `foo = 'bar'` + let expr = col("foo").eq(lit("bar")); + let expected_predicate = PredicateBuilder::new().add_expr(expr).build(); + + run_test_with_predicate(&func, silly_predicate, expected_predicate).await; + + // ------------- Test 2 ---------------- + // Validate that _measurement predicates are translated + // + // https://github.com/influxdata/influxdb_iox/issues/3601 + // _measurement = 'foo' + let silly_predicate = PredicateBuilder::new() + .add_expr(col("_measurement").eq(lit("foo"))) + .build(); + + // verify that the predicate was rewritten to `false` as the + // measurement name is `h20` + let expr = lit(false); + + let expected_predicate = PredicateBuilder::new().add_expr(expr).build(); + run_test_with_predicate(&func, silly_predicate, expected_predicate).await; + + // ------------- Test 3 ---------------- + // more complicated _measurement predicates are translated + // + // https://github.com/influxdata/influxdb_iox/issues/3601 + // (_measurement = 'foo' or measurement = 'h2o') AND time > 5 + let silly_predicate = PredicateBuilder::new() + .add_expr( + col("_measurement") + .eq(lit("foo")) + .or(col("_measurement").eq(lit("h2o"))) + .and(col("time").gt(lit(5))), + ) + .build(); + + // verify that the predicate was rewritten to time > 5 + let expr = col("time").gt(lit(5)); + + let expected_predicate = PredicateBuilder::new().add_expr(expr).build(); + run_test_with_predicate(&func, silly_predicate, expected_predicate).await; + } + + /// Runs func() with the specified predicate and verifies + /// `expected_predicate` is received by the chunk + async fn run_test_with_predicate( + func: &PlanRPCFunc, + predicate: Predicate, + expected_predicate: Predicate, + ) { + let chunk0 = Arc::new( + TestChunk::new("h2o") + .with_id(0) + .with_tag_column("foo") + .with_time_column(), + ); + let executor = Arc::new(Executor::new(1)); let test_db = TestDatabase::new(Arc::clone(&executor)); test_db.add_chunk("my_partition_key", Arc::clone(&chunk0)); - let rpc_predicate = InfluxRpcPredicate::new(None, silly_predicate); + let rpc_predicate = InfluxRpcPredicate::new(None, predicate); // run the function - f(&test_db, rpc_predicate); + func(&test_db, rpc_predicate); let actual_predicate = test_db.get_chunks_predicate(); - // verify that the predicate was rewritten to `foo = 'bar'` - let expr = col("foo").eq(lit("bar")); - - let expected_predicate = PredicateBuilder::new().add_expr(expr).build(); - assert_eq!( actual_predicate, expected_predicate, "\nActual: {:?}\nExpected: {:?}",