feat: query path instrumentation (#26106)

- spans added for buffer, parquet chunks along with number of files that
  are already in parquet cache along with the sql
pull/25997/merge
praveen-influx 2025-03-06 17:24:34 +00:00 committed by GitHub
parent 85023f075b
commit c724e06e3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 56 additions and 3 deletions

1
Cargo.lock generated
View File

@ -3303,6 +3303,7 @@ dependencies = [
"test_helpers",
"thiserror 1.0.69",
"tokio",
"trace",
"url",
"uuid",
]

View File

@ -158,6 +158,9 @@ pub trait ParquetCacheOracle: Send + Sync + Debug {
// Get a receiver that is notified when a prune takes place and how much memory was freed
fn prune_notifier(&self) -> watch::Receiver<usize>;
// check in cache already
fn in_cache(&self, path: &Path) -> bool;
}
/// Concrete implementation of the [`ParquetCacheOracle`]
@ -246,6 +249,10 @@ impl ParquetCacheOracle for MemCacheOracle {
fn prune_notifier(&self) -> watch::Receiver<usize> {
self.prune_notifier_tx.subscribe()
}
fn in_cache(&self, path: &Path) -> bool {
self.mem_store.cache.path_already_fetched(path)
}
}
/// Helper function for creation of a [`MemCachedObjectStore`] and [`MemCacheOracle`]

View File

@ -43,8 +43,8 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::Semaphore;
use trace::ctx::SpanContext;
use trace::span::{Span, SpanExt, SpanRecorder};
use trace::{ctx::SpanContext, span::MetaValue};
use trace_http::ctx::RequestLogContext;
use tracker::{
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
@ -142,6 +142,12 @@ impl QueryExecutor for QueryExecutorImpl {
) -> Result<SendableRecordBatchStream, QueryExecutorError> {
info!(%database, %query, ?params, "executing sql query");
let db = self.get_db_namespace(database, &span_ctx).await?;
let span_ctxt = span_ctx
.clone()
.map(|span| span.child("query_database_sql"));
let mut recorder = SpanRecorder::new(span_ctxt);
recorder.set_metadata("query", MetaValue::String(query.to_string().into()));
query_database_sql(
db,
query,

View File

@ -25,6 +25,7 @@ metric.workspace = true
parquet_file.workspace = true
observability_deps.workspace = true
schema.workspace = true
trace.workspace = true
# Local deps
influxdb3_cache = { path = "../influxdb3_cache" }

View File

@ -5,6 +5,7 @@ pub mod persisted_files;
pub mod queryable_buffer;
mod table_buffer;
use tokio::sync::{oneshot, watch::Receiver};
use trace::span::{MetaValue, SpanRecorder};
pub mod validator;
use crate::persister::Persister;
@ -41,14 +42,14 @@ use influxdb3_wal::{
use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition};
use influxdb3_wal::{DatabaseDefinition, FieldDefinition};
use influxdb3_wal::{DeleteDatabaseDefinition, object_store::WalObjectStore};
use iox_query::QueryChunk;
use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics};
use iox_query::{QueryChunk, exec::SessionContextIOxExt};
use iox_time::{Time, TimeProvider};
use metric::Registry;
use metrics::WriteMetrics;
use object_store::path::Path as ObjPath;
use object_store::{ObjectMeta, ObjectStore};
use observability_deps::tracing::{debug, error, warn};
use observability_deps::tracing::{debug, warn};
use parquet_file::storage::ParquetExecInput;
use queryable_buffer::QueryableBufferArgs;
use schema::Schema;
@ -331,6 +332,9 @@ impl WriteBufferImpl {
projection: Option<&Vec<usize>>,
ctx: &dyn Session,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
let span_ctx = ctx.span_ctx().map(|span| span.child("table_chunks"));
let mut recorder = SpanRecorder::new(span_ctx);
let mut chunks = self.buffer.get_table_chunks(
Arc::clone(&db_schema),
Arc::clone(&table_def),
@ -338,10 +342,20 @@ impl WriteBufferImpl {
projection,
ctx,
)?;
let num_chunks_from_buffer = chunks.len();
recorder.set_metadata(
"buffer_chunks",
MetaValue::Int(num_chunks_from_buffer as i64),
);
let parquet_files =
self.persisted_files
.get_files_filtered(db_schema.id, table_def.table_id, filter);
let num_parquet_files_needed = parquet_files.len();
recorder.set_metadata(
"parquet_files",
MetaValue::Int(num_parquet_files_needed as i64),
);
if parquet_files.len() > self.query_file_limit {
return Err(DataFusionError::External(
@ -358,6 +372,30 @@ impl WriteBufferImpl {
));
}
if let Some(parquet_cache) = &self.parquet_cache {
let num_files_already_in_cache = parquet_files
.iter()
.filter(|f| {
parquet_cache
.in_cache(&ObjPath::parse(&f.path).expect("obj path should be parseable"))
})
.count();
recorder.set_metadata(
"parquet_files_already_in_cache",
MetaValue::Int(num_files_already_in_cache as i64),
);
debug!(
num_chunks_from_buffer,
num_parquet_files_needed, num_files_already_in_cache, ">>> query chunks breakdown"
);
} else {
debug!(
num_chunks_from_buffer,
num_parquet_files_needed, ">>> query chunks breakdown (cache disabled)"
);
}
let mut chunk_order = chunks.len() as i64;
// Although this sends a cache request, it does not mean all these
// files will be cached. This depends on parquet cache's capacity