fix: always pass proper context to `InfluxRpcPlanner` (#5144)

There were some instances were we forgot to pass context (and therefore
tracing) information to `InfluxRpcPlanner`. This removes the `Default`
implementation requires to always pass a context when creating
`InfluxRpcPlanner` to prevent this type of bug.

Ref #5129.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-07-18 16:45:22 +02:00 committed by GitHub
parent fc0b705c8c
commit 9c2b6cd96c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 27 additions and 37 deletions

View File

@ -196,7 +196,7 @@ impl From<DataFusionError> for Error {
/// While the underlying storage is the same for columns in different
/// categories with the same data type, columns of different
/// categories are treated differently in the different query types.
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct InfluxRpcPlanner {
/// Optional executor currently only used to provide span context for tracing.
ctx: IOxSessionContext,
@ -204,13 +204,7 @@ pub struct InfluxRpcPlanner {
impl InfluxRpcPlanner {
/// Create a new instance of the RPC planner
pub fn new() -> Self {
Self {
ctx: IOxSessionContext::default(),
}
}
pub fn with_execution_context(self, ctx: IOxSessionContext) -> Self {
pub fn new(ctx: IOxSessionContext) -> Self {
Self { ctx }
}
@ -1741,7 +1735,7 @@ mod tests {
async fn test_predicate_rewrite_table_names() {
run_test(|test_db, rpc_predicate| {
async move {
InfluxRpcPlanner::new()
InfluxRpcPlanner::new(IOxSessionContext::default())
.table_names(test_db, rpc_predicate)
.await
.expect("creating plan");
@ -1755,7 +1749,7 @@ mod tests {
async fn test_predicate_rewrite_tag_keys() {
run_test(|test_db, rpc_predicate| {
async move {
InfluxRpcPlanner::new()
InfluxRpcPlanner::new(IOxSessionContext::default())
.tag_keys(test_db, rpc_predicate)
.await
.expect("creating plan");
@ -1769,7 +1763,7 @@ mod tests {
async fn test_predicate_rewrite_tag_values() {
run_test(|test_db, rpc_predicate| {
async move {
InfluxRpcPlanner::new()
InfluxRpcPlanner::new(IOxSessionContext::default())
.tag_values(test_db, "foo", rpc_predicate)
.await
.expect("creating plan");
@ -1783,7 +1777,7 @@ mod tests {
async fn test_predicate_rewrite_field_columns() {
run_test(|test_db, rpc_predicate| {
async move {
InfluxRpcPlanner::new()
InfluxRpcPlanner::new(IOxSessionContext::default())
.field_columns(test_db, rpc_predicate)
.await
.expect("creating plan");
@ -1797,7 +1791,7 @@ mod tests {
async fn test_predicate_rewrite_read_filter() {
run_test(|test_db, rpc_predicate| {
async move {
InfluxRpcPlanner::new()
InfluxRpcPlanner::new(IOxSessionContext::default())
.read_filter(test_db, rpc_predicate)
.await
.expect("creating plan");
@ -1813,7 +1807,7 @@ mod tests {
async move {
let agg = Aggregate::None;
let group_columns = &["foo"];
InfluxRpcPlanner::new()
InfluxRpcPlanner::new(IOxSessionContext::default())
.read_group(test_db, rpc_predicate, agg, group_columns)
.await
.expect("creating plan");
@ -1830,7 +1824,7 @@ mod tests {
let agg = Aggregate::First;
let every = WindowDuration::from_months(1, false);
let offset = WindowDuration::from_months(1, false);
InfluxRpcPlanner::new()
InfluxRpcPlanner::new(IOxSessionContext::default())
.read_window_aggregate(test_db, rpc_predicate, agg, every, offset)
.await
.expect("creating plan");

View File

@ -28,8 +28,8 @@ async fn run_field_columns_test_case<D>(
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.field_columns(db.as_query_database(), predicate.clone())

View File

@ -61,14 +61,14 @@ async fn run_read_filter(
predicate: InfluxRpcPredicate,
db: Arc<dyn AbstractDb>,
) -> Result<Vec<String>, String> {
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.read_filter(db.as_query_database(), predicate)
.await
.map_err(|e| e.to_string())?;
let ctx = db.new_query_context(None);
run_series_set_plan_maybe_error(&ctx, plan)
.await
.map_err(|e| e.to_string())

View File

@ -34,8 +34,8 @@ async fn run_read_group_test_case<D>(
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plans = planner
.read_group(

View File

@ -25,8 +25,8 @@ async fn run_read_window_aggregate_test_case<D>(
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.read_window_aggregate(

View File

@ -25,8 +25,8 @@ async fn run_table_names_test_case<D>(
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.table_names(db.as_query_database(), predicate.clone())

View File

@ -27,8 +27,8 @@ async fn run_tag_keys_test_case<D>(
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_keys(db.as_query_database(), predicate.clone())

View File

@ -26,8 +26,8 @@ async fn run_tag_values_test_case<D>(
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_values(db.as_query_database(), tag_name, predicate.clone())
@ -311,7 +311,8 @@ async fn list_tag_values_field_col_on_tag() {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
let planner = InfluxRpcPlanner::default();
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
// Test: temp is a field, not a tag
let tag_name = "temp";

View File

@ -36,7 +36,7 @@ impl Planner {
pub async fn sql(&self, query: impl Into<String> + Send) -> Result<Arc<dyn ExecutionPlan>> {
let planner = SqlQueryPlanner::new();
let query = query.into();
let ctx = self.ctx.child_ctx("sql");
let ctx = self.ctx.child_ctx("planner sql");
self.ctx
.run(async move { planner.query(&query, &ctx).await })
@ -53,7 +53,7 @@ impl Planner {
where
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::default();
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner table_names"));
self.ctx
.run(async move {
@ -75,8 +75,7 @@ impl Planner {
where
D: QueryDatabase + 'static,
{
let planner =
InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner"));
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_keys"));
self.ctx
.run(async move {
@ -100,8 +99,7 @@ impl Planner {
D: QueryDatabase + 'static,
{
let tag_name = tag_name.into();
let planner =
InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner"));
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_values"));
self.ctx
.run(async move {
@ -123,8 +121,7 @@ impl Planner {
where
D: QueryDatabase + 'static,
{
let planner =
InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner"));
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner field_columns"));
self.ctx
.run(async move {
@ -146,8 +143,7 @@ impl Planner {
where
D: QueryDatabase + 'static,
{
let planner =
InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner"));
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_filter"));
self.ctx
.run(async move {
@ -171,8 +167,7 @@ impl Planner {
where
D: QueryDatabase + 'static,
{
let planner =
InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner"));
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_group"));
self.ctx
.run(async move {
@ -197,7 +192,7 @@ impl Planner {
where
D: QueryDatabase + 'static,
{
let planner = InfluxRpcPlanner::default();
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_window_aggregate"));
self.ctx
.run(async move {