diff --git a/ingester2/src/query/mod.rs b/ingester2/src/query/mod.rs index 34262518cd..cb3f79b9ec 100644 --- a/ingester2/src/query/mod.rs +++ b/ingester2/src/query/mod.rs @@ -9,6 +9,7 @@ pub(crate) mod response; // Instrumentation pub(crate) mod exec_instrumentation; +pub(crate) mod result_instrumentation; pub(crate) mod tracing; #[cfg(test)] diff --git a/ingester2/src/query/result_instrumentation.rs b/ingester2/src/query/result_instrumentation.rs new file mode 100644 index 0000000000..fba3ad98f4 --- /dev/null +++ b/ingester2/src/query/result_instrumentation.rs @@ -0,0 +1,1147 @@ +//! Instrumentation of query results streamed from a [`QueryExec`] +//! implementation. +//! +//! The top-level [`QueryResultInstrumentation`] decorator implements the +//! [`QueryExec`] trait, wrapping the response of the inner implementation with +//! instrumentation. +//! +//! ```text +//! ┌ QueryResultInstrumentation ───┐ +//! │ │ +//! │ ╔═══════════════════╗ │ +//! ┌─────────│ ║ Inner QueryExec ║ │ +//! │ │ ╚═══════════════════╝ │ +//! │ └───────────────║───────────────┘ +//! Injects ║ +//! │ ║ +//! │ ║ +//! ▼ ┌ Observe ─ ▼ ─ ─ ─ ─ ─ ┐ +//! ┌──────────────────┐ ╔═══════════════════╗ +//! │QueryMetricContext│─ ─ ┤ ║ QueryResponse ║ │ +//! └──────────────────┘ ╚═══════════════════╝ +//! │ └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ +//! │ +//! │ │ +//! ┼ +//! │ ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐ +//! ╔═══════════════════╗ +//! └ ─ ─ ─ ─ ─ ─ ▶ ║ PartitionResponse ║ │──────────────┐ +//! ╚═══════════════════╝ Injects +//! └ ─ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ─ ┘ │ +//! │ ▼ +//! │ ┌───────────────────┐ +//! ┼ │BatchStreamRecorder│ +//! ┌ Observe ─╱┴╲─ ─ ─ ─ ─ ┐ └───────────────────┘ +//! ╔═══════════════════╗ │ +//! │ ║ RecordBatchStream ║ ├ ─ ─ ─ ─ ─ ─ ─ +//! ╚═══════════════════╝ +//! └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ +//! ``` +//! +//! The [`QueryMetricContext`] is injected into the [`QueryResponse`], recording +//! the lifetime of the [`QueryResponse`] itself, and further injecting +//! instances of [`BatchStreamRecorder`] into each [`PartitionResponse`] to +//! observe the per-partition stream of [`RecordBatch`] that are yielded from +//! it. + +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; + +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use data_types::{NamespaceId, TableId}; +use datafusion::{ + error::DataFusionError, + physical_plan::{RecordBatchStream, SendableRecordBatchStream}, +}; +use futures::Stream; +use iox_time::{SystemProvider, Time, TimeProvider}; +use metric::{DurationHistogram, Metric, U64Histogram, U64HistogramOptions}; +use observability_deps::tracing::debug; +use pin_project::{pin_project, pinned_drop}; +use trace::span::Span; + +use crate::query::{ + partition_response::PartitionResponse, + response::{PartitionStream, QueryResponse}, + QueryError, QueryExec, +}; + +/// A [`QueryExec`] decorator adding instrumentation to the [`QueryResponse`] +/// returned by the inner implementation. +/// +/// The wall-clock duration of time taken for the caller to consume or drop the +/// query results is recorded, faceted by success/error and completion state +/// (fully consumed all [`RecordBatch`], or dropped before the stream ended). +/// +/// Additionally the distribution of row, partition and [`RecordBatch`] counts +/// are recorded. +#[derive(Debug, Clone)] +pub(crate) struct QueryResultInstrumentation { + inner: T, + time_provider: P, + + /// A histogram to capture the consume time for a stream that was entirely + /// consumed (yielded [`Poll::Ready(None)`]) without ever observing an + /// [`Err`]. + completed_ok: DurationHistogram, + + /// As above but the stream returned at least one [`Err`] item; the stream + /// was still consumed to completion. + completed_err: DurationHistogram, + + /// Like [`Self::completed_ok`], but for a stream that was not consumed to + /// completion (dropped before returning [`Poll::Ready(None)`])]). + aborted_ok: DurationHistogram, + aborted_err: DurationHistogram, + + // Histograms to capture the distribution of row/batch/partition + // counts per query at the end of the query. + row_hist: U64Histogram, + record_batch_hist: U64Histogram, + partition_hist: U64Histogram, +} + +impl QueryResultInstrumentation { + pub(crate) fn new(inner: T, metrics: &metric::Registry) -> Self { + let duration: Metric = metrics.register_metric( + "ingester_query_stream_duration", + "duration of time RPC clients take to stream results for a single query", + ); + + // A wide range of bucket values to capture the highly variable row + // count. + let row_hist: U64Histogram = metrics + .register_metric_with_options::( + "ingester_query_result_row", + "distribution of query result row count", + || { + U64HistogramOptions::new([ + 1 << 5, // 32 + 1 << 6, // 64 + 1 << 7, // 128 + 1 << 8, // 256 + 1 << 9, // 512 + 1 << 10, // 1,024 + 1 << 11, // 2,048 + 1 << 12, // 4,096 + 1 << 13, // 8,192 + 1 << 14, // 16,384 + 1 << 15, // 32,768 + 1 << 16, // 65,536 + 1 << 17, // 131,072 + 1 << 18, // 262,144 + ]) + }, + ) + .recorder(&[]); + + let record_batch_hist: U64Histogram = metrics + .register_metric_with_options::( + "ingester_query_result_record_batch", + "distribution of query result record batch count", + || { + U64HistogramOptions::new([ + 1 << 1, // 2 + 1 << 2, // 4 + 1 << 3, // 8 + 1 << 4, // 16 + 1 << 5, // 32 + 1 << 6, // 64 + 1 << 7, // 128 + 1 << 8, // 256 + ]) + }, + ) + .recorder(&[]); + + // And finally, the number of partitions + let partition_hist: U64Histogram = metrics + .register_metric_with_options::( + "ingester_query_result_partition", + "distribution of query result partition count", + || U64HistogramOptions::new([1, 2, 3, 4, 5]), + ) + .recorder(&[]); + + Self { + inner, + time_provider: Default::default(), + completed_ok: duration.recorder(&[("request", "complete"), ("has_error", "false")]), + completed_err: duration.recorder(&[("request", "complete"), ("has_error", "true")]), + aborted_ok: duration.recorder(&[("request", "incomplete"), ("has_error", "false")]), + aborted_err: duration.recorder(&[("request", "incomplete"), ("has_error", "true")]), + row_hist, + record_batch_hist, + partition_hist, + } + } +} + +impl QueryResultInstrumentation { + #[cfg(test)] + fn with_time_provider(self, time_provider: U) -> QueryResultInstrumentation + where + U: TimeProvider, + { + QueryResultInstrumentation { + inner: self.inner, + time_provider, + completed_ok: self.completed_ok, + completed_err: self.completed_err, + aborted_ok: self.aborted_ok, + aborted_err: self.aborted_err, + row_hist: self.row_hist, + record_batch_hist: self.record_batch_hist, + partition_hist: self.partition_hist, + } + } +} + +#[async_trait] +impl QueryExec for QueryResultInstrumentation +where + T: QueryExec, + P: TimeProvider + Clone, +{ + type Response = QueryResponse; + + async fn query_exec( + &self, + namespace_id: NamespaceId, + table_id: TableId, + columns: Vec, + span: Option, + ) -> Result { + let started_at = self.time_provider.now(); + + let stream = self + .inner + .query_exec(namespace_id, table_id, columns, span) + .await?; + + let stream = QueryMetricContext::new( + stream.into_partition_stream(), + started_at, + self.time_provider.clone(), + self.completed_ok.clone(), + self.completed_err.clone(), + self.aborted_ok.clone(), + self.aborted_err.clone(), + self.row_hist.clone(), + self.record_batch_hist.clone(), + self.partition_hist.clone(), + ); + + Ok(QueryResponse::new(PartitionStream::new(stream))) + } +} + +/// A metric context for the lifetime of a [`QueryResponse`]. +/// +/// Once the last [`PartitionResponse`] is consumed to completion, this type is +/// dropped and the metrics it has gathered are emitted at drop time. +/// +/// This type is responsible for decorating all [`PartitionResponse`] yielded +/// from the result stream with [`BatchStreamRecorder`] instances, in turn +/// capturing the statistics of each [`RecordBatch`] in the +/// [`PartitionResponse`]. +#[pin_project(PinnedDrop)] +#[derive(Debug)] +struct QueryMetricContext +where + P: TimeProvider, +{ + time_provider: P, + + /// The instrumented stream. + #[pin] + inner: S, + + /// The metric state shared with child [`BatchStreamRecorder`] instances. + state: Arc, + + /// The timestamp at which the read request began, inclusive of the work + /// required to acquire the inner stream (which may involve fetching all the + /// data if the result is only pretending to be a stream). + started_at: Time, + /// The timestamp at which the stream completed (yielding + /// [`Poll::Ready(None)`]). + /// + /// [`None`] if the stream has not yet completed. + completed_at: Option