diff --git a/query/src/exec/context.rs b/query/src/exec/context.rs index 70de804bde..2e0429af9b 100644 --- a/query/src/exec/context.rs +++ b/query/src/exec/context.rs @@ -21,7 +21,7 @@ use crate::exec::{ split::StreamSplitExec, }; -use observability_deps::tracing::debug; +use observability_deps::tracing::{debug, trace}; // Reuse DataFusion error and Result types for this module pub use datafusion::error::{DataFusionError as Error, Result}; @@ -164,21 +164,21 @@ impl IOxExecutionContext { /// Prepare a SQL statement for execution. This assumes that any /// tables referenced in the SQL have been registered with this context pub fn prepare_sql(&mut self, sql: &str) -> Result> { - debug!(text=%sql, "SQL"); + debug!(text=%sql, "planning SQL query"); let logical_plan = self.inner.sql(sql)?.to_logical_plan(); self.prepare_plan(&logical_plan) } /// Prepare (optimize + plan) a pre-created logical plan for execution pub fn prepare_plan(&self, plan: &LogicalPlan) -> Result> { - debug!(text=%plan.display_indent_schema(), "initial plan"); + debug!(text=%plan.display_indent_schema(), "prepare_plan: initial plan"); let plan = self.inner.optimize(&plan)?; - debug!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan"); + trace!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan"); let physical_plan = self.inner.create_physical_plan(&plan)?; - debug!(text=%displayable(physical_plan.as_ref()).indent(), "optimized physical plan"); + debug!(text=%displayable(physical_plan.as_ref()).indent(), "prepare_plan: plan to run"); Ok(physical_plan) } diff --git a/query/src/exec/seriesset.rs b/query/src/exec/seriesset.rs index 861f72e5d2..75faae9416 100644 --- a/query/src/exec/seriesset.rs +++ b/query/src/exec/seriesset.rs @@ -27,7 +27,7 @@ use arrow::{ }; use datafusion::physical_plan::SendableRecordBatchStream; -use observability_deps::tracing::debug; +use observability_deps::tracing::trace; use snafu::{ResultExt, Snafu}; use std::sync::Arc; use tokio::sync::mpsc::error::SendError; @@ -313,7 +313,7 @@ impl SeriesSetConverter { // for now, always treat the last row as ending a series bitmap.add(num_rows as u32); - debug!( + trace!( rows = ?bitmap.to_vec(), ?col_idx, "row transitions for results" diff --git a/query/src/exec/split.rs b/query/src/exec/split.rs index b27af30641..8ba6040901 100644 --- a/query/src/exec/split.rs +++ b/query/src/exec/split.rs @@ -26,7 +26,7 @@ use datafusion::{ }; use futures::StreamExt; -use observability_deps::tracing::debug; +use observability_deps::tracing::{debug, trace}; use tokio::sync::{mpsc::Sender, Mutex}; use crate::exec::stream::AdapterStream; @@ -186,7 +186,7 @@ impl ExecutionPlan for StreamSplitExec { /// * partition 0 are the rows for which the split_expr evaluates to true /// * partition 1 are the rows for which the split_expr does not evaluate to true (e.g. Null or false) async fn execute(&self, partition: usize) -> Result { - debug!(partition, "SplitExec::execute"); + trace!(partition, "SplitExec::execute"); self.start_if_needed().await?; let mut state = self.state.lock().await; @@ -229,7 +229,7 @@ impl StreamSplitExec { "need exactly one input partition for stream split exec" ); - debug!("Setting up SplitStreamExec state"); + trace!("Setting up SplitStreamExec state"); let input_stream = self.input.execute(0).await?; let (tx0, rx0) = tokio::sync::mpsc::channel(2); @@ -267,7 +267,7 @@ impl StreamSplitExec { } // Input task completed successfully Ok(Ok(())) => { - debug!("All input tasks completed successfully"); + trace!("All input tasks completed successfully"); } } }); @@ -291,7 +291,7 @@ async fn split_the_stream( ) -> Result<()> { while let Some(batch) = input_stream.next().await { let batch = batch?; - debug!(num_rows = batch.num_rows(), "Processing batch"); + trace!(num_rows = batch.num_rows(), "Processing batch"); let mut tx0_done = false; let mut tx1_done = false; @@ -362,7 +362,7 @@ async fn split_the_stream( } } - debug!("Splitting done successfully"); + trace!("Splitting done successfully"); Ok(()) } diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 5d5f6c61ec..f1c05d5cc0 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -15,7 +15,7 @@ use internal_types::{ schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME}, selection::Selection, }; -use observability_deps::tracing::debug; +use observability_deps::tracing::{debug, trace}; use snafu::{ensure, ResultExt, Snafu}; use crate::{ @@ -251,8 +251,6 @@ impl InfluxRpcPlanner { continue; } let table_name = chunk.table_name(); - let chunk_id = chunk.id(); - debug!(table_name, chunk_id, "finding columns in table"); // get only tag columns from metadata let schema = chunk.schema(); @@ -272,7 +270,7 @@ impl InfluxRpcPlanner { match maybe_names { Some(mut names) => { - debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata"); + debug!(table_name, names=?names, chunk_id = chunk.id(), "column names found from metadata"); known_columns.append(&mut names); } None => { @@ -359,8 +357,6 @@ impl InfluxRpcPlanner { continue; } let table_name = chunk.table_name(); - let chunk_id = chunk.id(); - debug!(table_name, chunk_id, "finding columns in table"); // use schema to validate column type let schema = chunk.schema(); @@ -399,7 +395,7 @@ impl InfluxRpcPlanner { match maybe_values { Some(mut names) => { - debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata"); + debug!(table_name, names=?names, chunk_id = chunk.id(), "column values found from metadata"); known_values.append(&mut names); } None => { @@ -593,7 +589,13 @@ impl InfluxRpcPlanner { where D: QueryDatabase + 'static, { - debug!(predicate=?predicate, "planning read_window_aggregate"); + debug!( + ?predicate, + ?agg, + ?every, + ?offset, + "planning read_window_aggregate" + ); // group tables by chunk, pruning if possible let chunks = database.chunks(&predicate); @@ -1122,8 +1124,8 @@ impl InfluxRpcPlanner { // to evaluate the predicate (if not, it means no rows can // match and thus we should skip this plan) if !schema_has_all_expr_columns(&schema, &filter_expr) { - debug!(table_name=table_name, - schema=?schema, + trace!(table_name=table_name, + ?schema, filter_expr=?filter_expr, "Skipping table as schema doesn't have all filter_expr columns"); return Ok(None); diff --git a/query/src/provider.rs b/query/src/provider.rs index 9da0bc67f7..69260461ad 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -17,7 +17,7 @@ use datafusion::{ }; use datafusion_util::AsPhysicalExpr; use internal_types::schema::{merge::SchemaMerger, Schema}; -use observability_deps::tracing::debug; +use observability_deps::tracing::{debug, trace}; use crate::{ predicate::{Predicate, PredicateBuilder}, @@ -237,8 +237,6 @@ impl TableProvider for ChunkTableProvider { filters: &[Expr], _limit: Option, ) -> std::result::Result, DataFusionError> { - debug!(?filters, "Input Filters to Scan"); - // Note that `filters` don't actually need to be evaluated in // the scan for the plans to be correct, they are an extra // optimization for providers which can offer them @@ -261,7 +259,7 @@ impl TableProvider for ChunkTableProvider { // This debug shows the self.arrow_schema() includes all columns in all chunks // which means the schema of all chunks are merged before invoking this scan - debug!("all chunks schema: {:#?}", self.arrow_schema()); + trace!("all chunks schema: {:#?}", self.arrow_schema()); let mut deduplicate = Deduplicater::new(); let plan = deduplicate.build_scan_plan( @@ -506,7 +504,7 @@ impl Deduplicater { let pk_schema = Self::compute_pk_schema(&chunks); let input_schema = Self::compute_input_schema(&output_schema, &pk_schema); - debug!( + trace!( ?output_schema, ?pk_schema, ?input_schema, diff --git a/query/src/provider/deduplicate/algo.rs b/query/src/provider/deduplicate/algo.rs index 4dcab6939e..271d276a74 100644 --- a/query/src/provider/deduplicate/algo.rs +++ b/query/src/provider/deduplicate/algo.rs @@ -12,7 +12,7 @@ use arrow::{ use datafusion::physical_plan::{ coalesce_batches::concat_batches, expressions::PhysicalSortExpr, PhysicalExpr, SQLMetric, }; -use observability_deps::tracing::debug; +use observability_deps::tracing::trace; // Handles the deduplication across potentially multiple // [`RecordBatch`]es which are already sorted on a primary key, @@ -141,10 +141,10 @@ impl RecordBatchDeduplicator { // Special case when no ranges are duplicated (so just emit input as output) if num_dupes == 0 { - debug!(num_rows = batch.num_rows(), "No dupes"); + trace!(num_rows = batch.num_rows(), "No dupes"); Self::slice_record_batch(&batch, 0, ranges.len()) } else { - debug!(num_dupes, num_rows = batch.num_rows(), "dupes"); + trace!(num_dupes, num_rows = batch.num_rows(), "dupes"); // Use take kernel let sort_key_indices = self.compute_sort_key_indices(&ranges); diff --git a/query/src/pruning.rs b/query/src/pruning.rs index d798dc1cce..43e42e37d1 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -34,7 +34,7 @@ where O: PruningObserver, { let num_chunks = summaries.len(); - debug!(num_chunks, %predicate, "Pruning chunks"); + trace!(num_chunks, %predicate, "Pruning chunks"); let filter_expr = match predicate.filter_expr() { Some(expr) => expr, @@ -55,6 +55,7 @@ where let num_remaining_chunks = pruned_summaries.len(); debug!( + %predicate, num_chunks, num_pruned_chunks = num_chunks - num_remaining_chunks, num_remaining_chunks,