From 9c2b6cd96c72ae23be6d6fc80977c8b0bbce879f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 18 Jul 2022 16:45:22 +0200 Subject: [PATCH] fix: always pass proper context to `InfluxRpcPlanner` (#5144) There were some instances were we forgot to pass context (and therefore tracing) information to `InfluxRpcPlanner`. This removes the `Default` implementation requires to always pass a context when creating `InfluxRpcPlanner` to prevent this type of bug. Ref #5129. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_query/src/frontend/influxrpc.rs | 24 +++++++------------ query_tests/src/influxrpc/field_columns.rs | 2 +- query_tests/src/influxrpc/read_filter.rs | 4 ++-- query_tests/src/influxrpc/read_group.rs | 2 +- .../src/influxrpc/read_window_aggregate.rs | 2 +- query_tests/src/influxrpc/table_names.rs | 2 +- query_tests/src/influxrpc/tag_keys.rs | 2 +- query_tests/src/influxrpc/tag_values.rs | 5 ++-- service_common/src/planner.rs | 21 +++++++--------- 9 files changed, 27 insertions(+), 37 deletions(-) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 7e9c05f456..b152a55eb9 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -196,7 +196,7 @@ impl From for Error { /// While the underlying storage is the same for columns in different /// categories with the same data type, columns of different /// categories are treated differently in the different query types. -#[derive(Default, Debug)] +#[derive(Debug)] pub struct InfluxRpcPlanner { /// Optional executor currently only used to provide span context for tracing. ctx: IOxSessionContext, @@ -204,13 +204,7 @@ pub struct InfluxRpcPlanner { impl InfluxRpcPlanner { /// Create a new instance of the RPC planner - pub fn new() -> Self { - Self { - ctx: IOxSessionContext::default(), - } - } - - pub fn with_execution_context(self, ctx: IOxSessionContext) -> Self { + pub fn new(ctx: IOxSessionContext) -> Self { Self { ctx } } @@ -1741,7 +1735,7 @@ mod tests { async fn test_predicate_rewrite_table_names() { run_test(|test_db, rpc_predicate| { async move { - InfluxRpcPlanner::new() + InfluxRpcPlanner::new(IOxSessionContext::default()) .table_names(test_db, rpc_predicate) .await .expect("creating plan"); @@ -1755,7 +1749,7 @@ mod tests { async fn test_predicate_rewrite_tag_keys() { run_test(|test_db, rpc_predicate| { async move { - InfluxRpcPlanner::new() + InfluxRpcPlanner::new(IOxSessionContext::default()) .tag_keys(test_db, rpc_predicate) .await .expect("creating plan"); @@ -1769,7 +1763,7 @@ mod tests { async fn test_predicate_rewrite_tag_values() { run_test(|test_db, rpc_predicate| { async move { - InfluxRpcPlanner::new() + InfluxRpcPlanner::new(IOxSessionContext::default()) .tag_values(test_db, "foo", rpc_predicate) .await .expect("creating plan"); @@ -1783,7 +1777,7 @@ mod tests { async fn test_predicate_rewrite_field_columns() { run_test(|test_db, rpc_predicate| { async move { - InfluxRpcPlanner::new() + InfluxRpcPlanner::new(IOxSessionContext::default()) .field_columns(test_db, rpc_predicate) .await .expect("creating plan"); @@ -1797,7 +1791,7 @@ mod tests { async fn test_predicate_rewrite_read_filter() { run_test(|test_db, rpc_predicate| { async move { - InfluxRpcPlanner::new() + InfluxRpcPlanner::new(IOxSessionContext::default()) .read_filter(test_db, rpc_predicate) .await .expect("creating plan"); @@ -1813,7 +1807,7 @@ mod tests { async move { let agg = Aggregate::None; let group_columns = &["foo"]; - InfluxRpcPlanner::new() + InfluxRpcPlanner::new(IOxSessionContext::default()) .read_group(test_db, rpc_predicate, agg, group_columns) .await .expect("creating plan"); @@ -1830,7 +1824,7 @@ mod tests { let agg = Aggregate::First; let every = WindowDuration::from_months(1, false); let offset = WindowDuration::from_months(1, false); - InfluxRpcPlanner::new() + InfluxRpcPlanner::new(IOxSessionContext::default()) .read_window_aggregate(test_db, rpc_predicate, agg, every, offset) .await .expect("creating plan"); diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index 0bb83ec3de..f604bac2fe 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -28,8 +28,8 @@ async fn run_field_columns_test_case( } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); - let planner = InfluxRpcPlanner::default(); let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner .field_columns(db.as_query_database(), predicate.clone()) diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index e96a139ae9..cbe4427d13 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -61,14 +61,14 @@ async fn run_read_filter( predicate: InfluxRpcPredicate, db: Arc, ) -> Result, String> { - let planner = InfluxRpcPlanner::default(); + let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner .read_filter(db.as_query_database(), predicate) .await .map_err(|e| e.to_string())?; - let ctx = db.new_query_context(None); run_series_set_plan_maybe_error(&ctx, plan) .await .map_err(|e| e.to_string()) diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 568de3af2f..f42fedce74 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -34,8 +34,8 @@ async fn run_read_group_test_case( } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); - let planner = InfluxRpcPlanner::default(); let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plans = planner .read_group( diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 7b26fe54ee..566e637493 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -25,8 +25,8 @@ async fn run_read_window_aggregate_test_case( } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); - let planner = InfluxRpcPlanner::default(); let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner .read_window_aggregate( diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index 4ca33dc842..9bdd6d26c9 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -25,8 +25,8 @@ async fn run_table_names_test_case( } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); - let planner = InfluxRpcPlanner::default(); let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner .table_names(db.as_query_database(), predicate.clone()) diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index 953de31b5a..f977200ce8 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -27,8 +27,8 @@ async fn run_tag_keys_test_case( } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); - let planner = InfluxRpcPlanner::default(); let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner .tag_keys(db.as_query_database(), predicate.clone()) diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index efb5cd34c4..d433cdd7ab 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -26,8 +26,8 @@ async fn run_tag_values_test_case( } = scenario; println!("Running scenario '{}'", scenario_name); println!("Predicate: '{:#?}'", predicate); - let planner = InfluxRpcPlanner::default(); let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner .tag_values(db.as_query_database(), tag_name, predicate.clone()) @@ -311,7 +311,8 @@ async fn list_tag_values_field_col_on_tag() { scenario_name, db, .. } = scenario; println!("Running scenario '{}'", scenario_name); - let planner = InfluxRpcPlanner::default(); + let ctx = db.new_query_context(None); + let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); // Test: temp is a field, not a tag let tag_name = "temp"; diff --git a/service_common/src/planner.rs b/service_common/src/planner.rs index 97ccb28cb9..07bcf527aa 100644 --- a/service_common/src/planner.rs +++ b/service_common/src/planner.rs @@ -36,7 +36,7 @@ impl Planner { pub async fn sql(&self, query: impl Into + Send) -> Result> { let planner = SqlQueryPlanner::new(); let query = query.into(); - let ctx = self.ctx.child_ctx("sql"); + let ctx = self.ctx.child_ctx("planner sql"); self.ctx .run(async move { planner.query(&query, &ctx).await }) @@ -53,7 +53,7 @@ impl Planner { where D: QueryDatabase + 'static, { - let planner = InfluxRpcPlanner::default(); + let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner table_names")); self.ctx .run(async move { @@ -75,8 +75,7 @@ impl Planner { where D: QueryDatabase + 'static, { - let planner = - InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner")); + let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_keys")); self.ctx .run(async move { @@ -100,8 +99,7 @@ impl Planner { D: QueryDatabase + 'static, { let tag_name = tag_name.into(); - let planner = - InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner")); + let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_values")); self.ctx .run(async move { @@ -123,8 +121,7 @@ impl Planner { where D: QueryDatabase + 'static, { - let planner = - InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner")); + let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner field_columns")); self.ctx .run(async move { @@ -146,8 +143,7 @@ impl Planner { where D: QueryDatabase + 'static, { - let planner = - InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner")); + let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_filter")); self.ctx .run(async move { @@ -171,8 +167,7 @@ impl Planner { where D: QueryDatabase + 'static, { - let planner = - InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner")); + let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_group")); self.ctx .run(async move { @@ -197,7 +192,7 @@ impl Planner { where D: QueryDatabase + 'static, { - let planner = InfluxRpcPlanner::default(); + let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_window_aggregate")); self.ctx .run(async move {