From 787a848bf597882bdd3bf56789b90640755654d4 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 3 Mar 2022 09:31:58 +0000 Subject: [PATCH] feat: add tracing for tag_values --- db/src/chunk.rs | 9 ++- influxdb_iox/src/influxdb_ioxd/planner.rs | 3 +- ingester/src/query.rs | 1 + query/src/frontend/influxrpc.rs | 76 +++++++++++++++++++---- query/src/lib.rs | 1 + query/src/test.rs | 1 + 6 files changed, 77 insertions(+), 14 deletions(-) diff --git a/db/src/chunk.rs b/db/src/chunk.rs index 86b587d3bf..4e800b615b 100644 --- a/db/src/chunk.rs +++ b/db/src/chunk.rs @@ -510,9 +510,14 @@ impl QueryChunk for DbChunk { fn column_values( &self, + mut ctx: IOxExecutionContext, column_name: &str, predicate: &Predicate, ) -> Result, Self::Error> { + ctx.set_metadata("storage", self.state.state_name()); + ctx.set_metadata("column_name", column_name.to_string()); + ctx.set_metadata("predicate", format!("{}", &predicate)); + match &self.state { State::MutableBuffer { .. } => { // There is no advantage to manually implementing this @@ -527,6 +532,7 @@ impl QueryChunk for DbChunk { return Ok(None); } }; + ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate)); self.access_recorder.record_access(); let mut values = chunk @@ -553,6 +559,7 @@ impl QueryChunk for DbChunk { column_name ), })?; + ctx.set_metadata("output_values", values.len() as i64); Ok(Some(values)) } @@ -638,7 +645,7 @@ mod tests { let t3 = time.inc(Duration::from_secs(1)); let column_values = snapshot - .column_values("tag", &Default::default()) + .column_values(IOxExecutionContext::default(), "tag", &Default::default()) .unwrap() .is_some(); let m5 = chunk.access_recorder().get_metrics(); diff --git a/influxdb_iox/src/influxdb_ioxd/planner.rs b/influxdb_iox/src/influxdb_ioxd/planner.rs index 506039a356..5443fe5f4e 100644 --- a/influxdb_iox/src/influxdb_ioxd/planner.rs +++ b/influxdb_iox/src/influxdb_ioxd/planner.rs @@ -99,7 +99,8 @@ impl Planner { D: QueryDatabase + 'static, { let tag_name = tag_name.into(); - let planner = InfluxRpcPlanner::default(); + let planner = + InfluxRpcPlanner::new().with_execution_context(self.ctx.child_ctx("influxrpc_planner")); self.ctx .run(async move { diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 1cd795e67d..0f4b8bf893 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -205,6 +205,7 @@ impl QueryChunk for QueryableBatch { /// The requested columns must all have String type. fn column_values( &self, + _ctx: IOxExecutionContext, _column_name: &str, _predicate: &Predicate, ) -> Result, Self::Error> { diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 3a2d1e9201..9af0da11a8 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -444,7 +444,13 @@ impl InfluxRpcPlanner { .table_schema(table_name) .context(TableRemovedSnafu { table_name })?; - let plan = self.tag_keys_plan(table_name, schema, predicate, chunks)?; + let plan = self.tag_keys_plan( + ctx.child_ctx("tag_keys_plan"), + table_name, + schema, + predicate, + chunks, + )?; if let Some(plan) = plan { builder = builder.append_other(plan) @@ -472,6 +478,7 @@ impl InfluxRpcPlanner { where D: QueryDatabase + 'static, { + let ctx = self.ctx.child_ctx("tag_values planning"); debug!(?rpc_predicate, tag_name, "planning tag_values"); // The basic algorithm is: @@ -541,7 +548,11 @@ impl InfluxRpcPlanner { if !do_full_plan { // try and get the list of values directly from metadata let maybe_values = chunk - .column_values(tag_name, predicate) + .column_values( + self.ctx.child_ctx("tag_values execution"), + tag_name, + predicate, + ) .map_err(|e| Box::new(e) as _) .context(FindingColumnValuesSnafu)?; @@ -591,8 +602,13 @@ impl InfluxRpcPlanner { .table_schema(table_name) .context(TableRemovedSnafu { table_name })?; - let scan_and_filter = - self.scan_and_filter(table_name, schema, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter( + ctx.child_ctx("scan_and_filter planning"), + table_name, + schema, + predicate, + chunks, + )?; // if we have any data to scan, make a plan! if let Some(TableScanAndFilter { @@ -872,6 +888,7 @@ impl InfluxRpcPlanner { /// ``` fn tag_keys_plan( &self, + ctx: IOxExecutionContext, table_name: &str, schema: Arc, predicate: &Predicate, @@ -880,7 +897,13 @@ impl InfluxRpcPlanner { where C: QueryChunk + 'static, { - let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter( + ctx.child_ctx("scan_and_filter planning"), + table_name, + schema, + predicate, + chunks, + )?; let TableScanAndFilter { plan_builder, @@ -948,7 +971,13 @@ impl InfluxRpcPlanner { where C: QueryChunk + 'static, { - let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter( + self.ctx.child_ctx("scan_and_filter planning"), + table_name, + schema, + predicate, + chunks, + )?; let TableScanAndFilter { plan_builder, schema, @@ -1007,7 +1036,13 @@ impl InfluxRpcPlanner { C: QueryChunk + 'static, { debug!(%table_name, "Creating table_name full plan"); - let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter( + self.ctx.child_ctx("scan_and_filter planning"), + table_name, + schema, + predicate, + chunks, + )?; let TableScanAndFilter { plan_builder, schema, @@ -1054,7 +1089,13 @@ impl InfluxRpcPlanner { C: QueryChunk + 'static, { let table_name = table_name.as_ref(); - let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter( + self.ctx.child_ctx("scan_and_filter planning"), + table_name, + schema, + predicate, + chunks, + )?; let TableScanAndFilter { plan_builder, @@ -1163,7 +1204,13 @@ impl InfluxRpcPlanner { where C: QueryChunk + 'static, { - let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter( + self.ctx.child_ctx("scan_and_filter planning"), + table_name, + schema, + predicate, + chunks, + )?; let TableScanAndFilter { plan_builder, @@ -1277,7 +1324,13 @@ impl InfluxRpcPlanner { C: QueryChunk + 'static, { let table_name = table_name.into(); - let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter( + self.ctx.child_ctx("scan_and_filter planning"), + &table_name, + schema, + predicate, + chunks, + )?; let TableScanAndFilter { plan_builder, @@ -1346,6 +1399,7 @@ impl InfluxRpcPlanner { /// ``` fn scan_and_filter( &self, + ctx: IOxExecutionContext, table_name: &str, schema: Arc, predicate: &Predicate, @@ -1354,8 +1408,6 @@ impl InfluxRpcPlanner { where C: QueryChunk + 'static, { - let ctx = self.ctx.child_ctx("scan_and_filter"); - // Scan all columns to begin with (DataFusion projection // push-down optimization will prune out unneeded columns later) let projection = None; diff --git a/query/src/lib.rs b/query/src/lib.rs index de992c474c..6e077b6152 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -203,6 +203,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { /// The requested columns must all have String type. fn column_values( &self, + ctx: IOxExecutionContext, column_name: &str, predicate: &Predicate, ) -> Result, Self::Error>; diff --git a/query/src/test.rs b/query/src/test.rs index ec6e405fa9..3cbaa9073c 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -952,6 +952,7 @@ impl QueryChunk for TestChunk { fn column_values( &self, + _ctx: IOxExecutionContext, _column_name: &str, _predicate: &Predicate, ) -> Result, Self::Error> {