chore: Reduce debug logging in query crate (#1802)
parent
79446d45be
commit
4e7cf39b23
|
@ -21,7 +21,7 @@ use crate::exec::{
|
|||
split::StreamSplitExec,
|
||||
};
|
||||
|
||||
use observability_deps::tracing::debug;
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
|
||||
// Reuse DataFusion error and Result types for this module
|
||||
pub use datafusion::error::{DataFusionError as Error, Result};
|
||||
|
@ -164,21 +164,21 @@ impl IOxExecutionContext {
|
|||
/// Prepare a SQL statement for execution. This assumes that any
|
||||
/// tables referenced in the SQL have been registered with this context
|
||||
pub fn prepare_sql(&mut self, sql: &str) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
debug!(text=%sql, "SQL");
|
||||
debug!(text=%sql, "planning SQL query");
|
||||
let logical_plan = self.inner.sql(sql)?.to_logical_plan();
|
||||
self.prepare_plan(&logical_plan)
|
||||
}
|
||||
|
||||
/// Prepare (optimize + plan) a pre-created logical plan for execution
|
||||
pub fn prepare_plan(&self, plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
debug!(text=%plan.display_indent_schema(), "initial plan");
|
||||
debug!(text=%plan.display_indent_schema(), "prepare_plan: initial plan");
|
||||
|
||||
let plan = self.inner.optimize(&plan)?;
|
||||
debug!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan");
|
||||
trace!(text=%plan.display_indent_schema(), graphviz=%plan.display_graphviz(), "optimized plan");
|
||||
|
||||
let physical_plan = self.inner.create_physical_plan(&plan)?;
|
||||
|
||||
debug!(text=%displayable(physical_plan.as_ref()).indent(), "optimized physical plan");
|
||||
debug!(text=%displayable(physical_plan.as_ref()).indent(), "prepare_plan: plan to run");
|
||||
Ok(physical_plan)
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ use arrow::{
|
|||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
|
||||
use observability_deps::tracing::debug;
|
||||
use observability_deps::tracing::trace;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
|
@ -313,7 +313,7 @@ impl SeriesSetConverter {
|
|||
// for now, always treat the last row as ending a series
|
||||
bitmap.add(num_rows as u32);
|
||||
|
||||
debug!(
|
||||
trace!(
|
||||
rows = ?bitmap.to_vec(),
|
||||
?col_idx,
|
||||
"row transitions for results"
|
||||
|
|
|
@ -26,7 +26,7 @@ use datafusion::{
|
|||
};
|
||||
|
||||
use futures::StreamExt;
|
||||
use observability_deps::tracing::debug;
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use tokio::sync::{mpsc::Sender, Mutex};
|
||||
|
||||
use crate::exec::stream::AdapterStream;
|
||||
|
@ -186,7 +186,7 @@ impl ExecutionPlan for StreamSplitExec {
|
|||
/// * partition 0 are the rows for which the split_expr evaluates to true
|
||||
/// * partition 1 are the rows for which the split_expr does not evaluate to true (e.g. Null or false)
|
||||
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
|
||||
debug!(partition, "SplitExec::execute");
|
||||
trace!(partition, "SplitExec::execute");
|
||||
self.start_if_needed().await?;
|
||||
|
||||
let mut state = self.state.lock().await;
|
||||
|
@ -229,7 +229,7 @@ impl StreamSplitExec {
|
|||
"need exactly one input partition for stream split exec"
|
||||
);
|
||||
|
||||
debug!("Setting up SplitStreamExec state");
|
||||
trace!("Setting up SplitStreamExec state");
|
||||
|
||||
let input_stream = self.input.execute(0).await?;
|
||||
let (tx0, rx0) = tokio::sync::mpsc::channel(2);
|
||||
|
@ -267,7 +267,7 @@ impl StreamSplitExec {
|
|||
}
|
||||
// Input task completed successfully
|
||||
Ok(Ok(())) => {
|
||||
debug!("All input tasks completed successfully");
|
||||
trace!("All input tasks completed successfully");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -291,7 +291,7 @@ async fn split_the_stream(
|
|||
) -> Result<()> {
|
||||
while let Some(batch) = input_stream.next().await {
|
||||
let batch = batch?;
|
||||
debug!(num_rows = batch.num_rows(), "Processing batch");
|
||||
trace!(num_rows = batch.num_rows(), "Processing batch");
|
||||
let mut tx0_done = false;
|
||||
let mut tx1_done = false;
|
||||
|
||||
|
@ -362,7 +362,7 @@ async fn split_the_stream(
|
|||
}
|
||||
}
|
||||
|
||||
debug!("Splitting done successfully");
|
||||
trace!("Splitting done successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ use internal_types::{
|
|||
schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME},
|
||||
selection::Selection,
|
||||
};
|
||||
use observability_deps::tracing::debug;
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use snafu::{ensure, ResultExt, Snafu};
|
||||
|
||||
use crate::{
|
||||
|
@ -251,8 +251,6 @@ impl InfluxRpcPlanner {
|
|||
continue;
|
||||
}
|
||||
let table_name = chunk.table_name();
|
||||
let chunk_id = chunk.id();
|
||||
debug!(table_name, chunk_id, "finding columns in table");
|
||||
|
||||
// get only tag columns from metadata
|
||||
let schema = chunk.schema();
|
||||
|
@ -272,7 +270,7 @@ impl InfluxRpcPlanner {
|
|||
|
||||
match maybe_names {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
debug!(table_name, names=?names, chunk_id = chunk.id(), "column names found from metadata");
|
||||
known_columns.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
|
@ -359,8 +357,6 @@ impl InfluxRpcPlanner {
|
|||
continue;
|
||||
}
|
||||
let table_name = chunk.table_name();
|
||||
let chunk_id = chunk.id();
|
||||
debug!(table_name, chunk_id, "finding columns in table");
|
||||
|
||||
// use schema to validate column type
|
||||
let schema = chunk.schema();
|
||||
|
@ -399,7 +395,7 @@ impl InfluxRpcPlanner {
|
|||
|
||||
match maybe_values {
|
||||
Some(mut names) => {
|
||||
debug!(names=?names, chunk_id = chunk.id(), "column values found from metadata");
|
||||
debug!(table_name, names=?names, chunk_id = chunk.id(), "column values found from metadata");
|
||||
known_values.append(&mut names);
|
||||
}
|
||||
None => {
|
||||
|
@ -593,7 +589,13 @@ impl InfluxRpcPlanner {
|
|||
where
|
||||
D: QueryDatabase + 'static,
|
||||
{
|
||||
debug!(predicate=?predicate, "planning read_window_aggregate");
|
||||
debug!(
|
||||
?predicate,
|
||||
?agg,
|
||||
?every,
|
||||
?offset,
|
||||
"planning read_window_aggregate"
|
||||
);
|
||||
|
||||
// group tables by chunk, pruning if possible
|
||||
let chunks = database.chunks(&predicate);
|
||||
|
@ -1122,8 +1124,8 @@ impl InfluxRpcPlanner {
|
|||
// to evaluate the predicate (if not, it means no rows can
|
||||
// match and thus we should skip this plan)
|
||||
if !schema_has_all_expr_columns(&schema, &filter_expr) {
|
||||
debug!(table_name=table_name,
|
||||
schema=?schema,
|
||||
trace!(table_name=table_name,
|
||||
?schema,
|
||||
filter_expr=?filter_expr,
|
||||
"Skipping table as schema doesn't have all filter_expr columns");
|
||||
return Ok(None);
|
||||
|
|
|
@ -17,7 +17,7 @@ use datafusion::{
|
|||
};
|
||||
use datafusion_util::AsPhysicalExpr;
|
||||
use internal_types::schema::{merge::SchemaMerger, Schema};
|
||||
use observability_deps::tracing::debug;
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
|
||||
use crate::{
|
||||
predicate::{Predicate, PredicateBuilder},
|
||||
|
@ -237,8 +237,6 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
|||
filters: &[Expr],
|
||||
_limit: Option<usize>,
|
||||
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
|
||||
debug!(?filters, "Input Filters to Scan");
|
||||
|
||||
// Note that `filters` don't actually need to be evaluated in
|
||||
// the scan for the plans to be correct, they are an extra
|
||||
// optimization for providers which can offer them
|
||||
|
@ -261,7 +259,7 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
|||
|
||||
// This debug shows the self.arrow_schema() includes all columns in all chunks
|
||||
// which means the schema of all chunks are merged before invoking this scan
|
||||
debug!("all chunks schema: {:#?}", self.arrow_schema());
|
||||
trace!("all chunks schema: {:#?}", self.arrow_schema());
|
||||
|
||||
let mut deduplicate = Deduplicater::new();
|
||||
let plan = deduplicate.build_scan_plan(
|
||||
|
@ -506,7 +504,7 @@ impl<C: QueryChunk + 'static> Deduplicater<C> {
|
|||
let pk_schema = Self::compute_pk_schema(&chunks);
|
||||
let input_schema = Self::compute_input_schema(&output_schema, &pk_schema);
|
||||
|
||||
debug!(
|
||||
trace!(
|
||||
?output_schema,
|
||||
?pk_schema,
|
||||
?input_schema,
|
||||
|
|
|
@ -12,7 +12,7 @@ use arrow::{
|
|||
use datafusion::physical_plan::{
|
||||
coalesce_batches::concat_batches, expressions::PhysicalSortExpr, PhysicalExpr, SQLMetric,
|
||||
};
|
||||
use observability_deps::tracing::debug;
|
||||
use observability_deps::tracing::trace;
|
||||
|
||||
// Handles the deduplication across potentially multiple
|
||||
// [`RecordBatch`]es which are already sorted on a primary key,
|
||||
|
@ -141,10 +141,10 @@ impl RecordBatchDeduplicator {
|
|||
|
||||
// Special case when no ranges are duplicated (so just emit input as output)
|
||||
if num_dupes == 0 {
|
||||
debug!(num_rows = batch.num_rows(), "No dupes");
|
||||
trace!(num_rows = batch.num_rows(), "No dupes");
|
||||
Self::slice_record_batch(&batch, 0, ranges.len())
|
||||
} else {
|
||||
debug!(num_dupes, num_rows = batch.num_rows(), "dupes");
|
||||
trace!(num_dupes, num_rows = batch.num_rows(), "dupes");
|
||||
|
||||
// Use take kernel
|
||||
let sort_key_indices = self.compute_sort_key_indices(&ranges);
|
||||
|
|
|
@ -34,7 +34,7 @@ where
|
|||
O: PruningObserver<Observed = P>,
|
||||
{
|
||||
let num_chunks = summaries.len();
|
||||
debug!(num_chunks, %predicate, "Pruning chunks");
|
||||
trace!(num_chunks, %predicate, "Pruning chunks");
|
||||
|
||||
let filter_expr = match predicate.filter_expr() {
|
||||
Some(expr) => expr,
|
||||
|
@ -55,6 +55,7 @@ where
|
|||
|
||||
let num_remaining_chunks = pruned_summaries.len();
|
||||
debug!(
|
||||
%predicate,
|
||||
num_chunks,
|
||||
num_pruned_chunks = num_chunks - num_remaining_chunks,
|
||||
num_remaining_chunks,
|
||||
|
|
Loading…
Reference in New Issue