From de7c46c9bbd84afebc01ca6f06c2da923e65c1fa Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 3 Mar 2022 14:16:31 +0000 Subject: [PATCH] feat: add read_window_aggregate tracing --- db/src/access.rs | 1 - db/src/streams.rs | 2 +- query/src/frontend/influxrpc.rs | 36 +++++++++++++++++++++++++-------- query/src/frontend/reorg.rs | 2 -- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/db/src/access.rs b/db/src/access.rs index e67fc05d48..f81bb8aaeb 100644 --- a/db/src/access.rs +++ b/db/src/access.rs @@ -368,7 +368,6 @@ impl SchemaProvider for DbSchemaProvider { schema }; - // TODO(edd): wire up execution provider let mut builder = ProviderBuilder::new(table_name, schema); builder = builder.add_pruner(Arc::clone(&self.chunk_access) as Arc>); diff --git a/db/src/streams.rs b/db/src/streams.rs index 2d0646ab5c..6327db4688 100644 --- a/db/src/streams.rs +++ b/db/src/streams.rs @@ -47,7 +47,7 @@ impl futures::Stream for ReadFilterResultsStream { let mut ctx = self.ctx.child_ctx("next_row_group"); let rb = self.read_results.next(); if let Some(rb) = &rb { - ctx.set_metadata("rows", rb.num_rows() as i64); + ctx.set_metadata("output_rows", rb.num_rows() as i64); } Poll::Ready(Ok(rb).transpose()) diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index f5ee1685cc..c51589ca4b 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -729,7 +729,7 @@ impl InfluxRpcPlanner { where D: QueryDatabase + 'static, { - let _ctx = self.ctx.child_ctx("planning_read_filter"); + let ctx = self.ctx.child_ctx("planning_read_filter"); debug!(?rpc_predicate, "planning read_filter"); let table_predicates = rpc_predicate @@ -748,7 +748,13 @@ impl InfluxRpcPlanner { .table_schema(table_name) .context(TableRemovedSnafu { table_name })?; - let ss_plan = self.read_filter_plan(table_name, schema, predicate, chunks)?; + let ss_plan = self.read_filter_plan( + ctx.child_ctx("read_filter plan"), + table_name, + schema, + predicate, + chunks, + )?; // If we have to do real work, add it to the list of plans if let Some(ss_plan) = ss_plan { ss_plans.push(ss_plan); @@ -809,9 +815,13 @@ impl InfluxRpcPlanner { .context(TableRemovedSnafu { table_name })?; let ss_plan = match agg { - Aggregate::None => { - self.read_filter_plan(table_name, Arc::clone(&schema), predicate, chunks)? - } + Aggregate::None => self.read_filter_plan( + ctx.child_ctx("read_filter plan"), + table_name, + Arc::clone(&schema), + predicate, + chunks, + )?, _ => self.read_group_plan( ctx.child_ctx("read_group plan"), table_name, @@ -853,6 +863,7 @@ impl InfluxRpcPlanner { where D: QueryDatabase + 'static, { + let ctx = self.ctx.child_ctx("read_window_aggregate planning"); debug!( ?rpc_predicate, ?agg, @@ -879,7 +890,14 @@ impl InfluxRpcPlanner { .context(TableRemovedSnafu { table_name })?; let ss_plan = self.read_window_aggregate_plan( - table_name, schema, predicate, agg, &every, &offset, chunks, + ctx.child_ctx("read_window_aggregate plan"), + table_name, + schema, + predicate, + agg, + &every, + &offset, + chunks, )?; // If we have to do real work, add it to the list of plans @@ -1096,6 +1114,7 @@ impl InfluxRpcPlanner { /// Scan fn read_filter_plan( &self, + ctx: IOxExecutionContext, table_name: impl AsRef, schema: Arc, predicate: &Predicate, @@ -1106,7 +1125,7 @@ impl InfluxRpcPlanner { { let table_name = table_name.as_ref(); let scan_and_filter = self.scan_and_filter( - self.ctx.child_ctx("scan_and_filter planning"), + ctx.child_ctx("scan_and_filter planning"), table_name, schema, predicate, @@ -1329,6 +1348,7 @@ impl InfluxRpcPlanner { #[allow(clippy::too_many_arguments)] fn read_window_aggregate_plan( &self, + ctx: IOxExecutionContext, table_name: impl Into, schema: Arc, predicate: &Predicate, @@ -1342,7 +1362,7 @@ impl InfluxRpcPlanner { { let table_name = table_name.into(); let scan_and_filter = self.scan_and_filter( - self.ctx.child_ctx("scan_and_filter planning"), + ctx.child_ctx("scan_and_filter planning"), &table_name, schema, predicate, diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index b6aaefe204..c9000a0156 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -72,7 +72,6 @@ impl ReorgPlanner { { let table_name = chunk.table_name(); // Prepare the plan for the table - // TODO(edd): wire in execution context let mut builder = ProviderBuilder::new(table_name, schema); // There are no predicates in these plans, so no need to prune them @@ -228,7 +227,6 @@ impl ReorgPlanner { let table_name = &table_name; // Prepare the plan for the table - // TODO(edd): wire up the correct execution context... let mut builder = ProviderBuilder::new(table_name, schema) // There are no predicates in these plans, so no need to prune them .add_no_op_pruner()