feat: add read_window_aggregate tracing

pull/24376/head
Edd Robinson 2022-03-03 14:16:31 +00:00
parent ea32bc366a
commit de7c46c9bb
4 changed files with 29 additions and 12 deletions

View File

@ -368,7 +368,6 @@ impl SchemaProvider for DbSchemaProvider {
schema
};
// TODO(edd): wire up execution provider
let mut builder = ProviderBuilder::new(table_name, schema);
builder =
builder.add_pruner(Arc::clone(&self.chunk_access) as Arc<dyn ChunkPruner<DbChunk>>);

View File

@ -47,7 +47,7 @@ impl futures::Stream for ReadFilterResultsStream {
let mut ctx = self.ctx.child_ctx("next_row_group");
let rb = self.read_results.next();
if let Some(rb) = &rb {
ctx.set_metadata("rows", rb.num_rows() as i64);
ctx.set_metadata("output_rows", rb.num_rows() as i64);
}
Poll::Ready(Ok(rb).transpose())

View File

@ -729,7 +729,7 @@ impl InfluxRpcPlanner {
where
D: QueryDatabase + 'static,
{
let _ctx = self.ctx.child_ctx("planning_read_filter");
let ctx = self.ctx.child_ctx("planning_read_filter");
debug!(?rpc_predicate, "planning read_filter");
let table_predicates = rpc_predicate
@ -748,7 +748,13 @@ impl InfluxRpcPlanner {
.table_schema(table_name)
.context(TableRemovedSnafu { table_name })?;
let ss_plan = self.read_filter_plan(table_name, schema, predicate, chunks)?;
let ss_plan = self.read_filter_plan(
ctx.child_ctx("read_filter plan"),
table_name,
schema,
predicate,
chunks,
)?;
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan);
@ -809,9 +815,13 @@ impl InfluxRpcPlanner {
.context(TableRemovedSnafu { table_name })?;
let ss_plan = match agg {
Aggregate::None => {
self.read_filter_plan(table_name, Arc::clone(&schema), predicate, chunks)?
}
Aggregate::None => self.read_filter_plan(
ctx.child_ctx("read_filter plan"),
table_name,
Arc::clone(&schema),
predicate,
chunks,
)?,
_ => self.read_group_plan(
ctx.child_ctx("read_group plan"),
table_name,
@ -853,6 +863,7 @@ impl InfluxRpcPlanner {
where
D: QueryDatabase + 'static,
{
let ctx = self.ctx.child_ctx("read_window_aggregate planning");
debug!(
?rpc_predicate,
?agg,
@ -879,7 +890,14 @@ impl InfluxRpcPlanner {
.context(TableRemovedSnafu { table_name })?;
let ss_plan = self.read_window_aggregate_plan(
table_name, schema, predicate, agg, &every, &offset, chunks,
ctx.child_ctx("read_window_aggregate plan"),
table_name,
schema,
predicate,
agg,
&every,
&offset,
chunks,
)?;
// If we have to do real work, add it to the list of plans
@ -1096,6 +1114,7 @@ impl InfluxRpcPlanner {
/// Scan
fn read_filter_plan<C>(
&self,
ctx: IOxExecutionContext,
table_name: impl AsRef<str>,
schema: Arc<Schema>,
predicate: &Predicate,
@ -1106,7 +1125,7 @@ impl InfluxRpcPlanner {
{
let table_name = table_name.as_ref();
let scan_and_filter = self.scan_and_filter(
self.ctx.child_ctx("scan_and_filter planning"),
ctx.child_ctx("scan_and_filter planning"),
table_name,
schema,
predicate,
@ -1329,6 +1348,7 @@ impl InfluxRpcPlanner {
#[allow(clippy::too_many_arguments)]
fn read_window_aggregate_plan<C>(
&self,
ctx: IOxExecutionContext,
table_name: impl Into<String>,
schema: Arc<Schema>,
predicate: &Predicate,
@ -1342,7 +1362,7 @@ impl InfluxRpcPlanner {
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(
self.ctx.child_ctx("scan_and_filter planning"),
ctx.child_ctx("scan_and_filter planning"),
&table_name,
schema,
predicate,

View File

@ -72,7 +72,6 @@ impl ReorgPlanner {
{
let table_name = chunk.table_name();
// Prepare the plan for the table
// TODO(edd): wire in execution context
let mut builder = ProviderBuilder::new(table_name, schema);
// There are no predicates in these plans, so no need to prune them
@ -228,7 +227,6 @@ impl ReorgPlanner {
let table_name = &table_name;
// Prepare the plan for the table
// TODO(edd): wire up the correct execution context...
let mut builder = ProviderBuilder::new(table_name, schema)
// There are no predicates in these plans, so no need to prune them
.add_no_op_pruner()