From 39a08fab6974526a73f34683885d87f217d0ad71 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 15 Aug 2023 17:19:11 +0200 Subject: [PATCH] feat: expose DataFusion mem pool metrics (#8492) Closes #8466. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_query/src/exec.rs | 212 +++++++++++++++++++++++++++++++++- iox_query/src/exec/metrics.rs | 67 +++++++++++ 2 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 iox_query/src/exec/metrics.rs diff --git a/iox_query/src/exec.rs b/iox_query/src/exec.rs index b579afb968..4825507aff 100644 --- a/iox_query/src/exec.rs +++ b/iox_query/src/exec.rs @@ -5,6 +5,7 @@ pub(crate) mod context; pub mod field; pub mod fieldlist; pub mod gapfill; +mod metrics; mod non_null_checker; pub mod query_tracing; mod schema_pivot; @@ -34,8 +35,12 @@ use datafusion::{ pub use context::{IOxSessionConfig, IOxSessionContext, SessionContextIOxExt}; use schema_pivot::SchemaPivotNode; +use crate::exec::metrics::DataFusionMemoryPoolMetricsBridge; + use self::{non_null_checker::NonNullCheckerNode, split::StreamSplitNode}; +const TESTING_MEM_POOL_SIZE: usize = 1024 * 1024 * 1024; // 1GB + /// Configuration for an Executor #[derive(Debug, Clone)] pub struct ExecutorConfig { @@ -172,7 +177,7 @@ impl Executor { target_query_partitions: NonZeroUsize::new(1).unwrap(), object_stores: HashMap::default(), metric_registry: Arc::new(Registry::default()), - mem_pool_size: 1024 * 1024 * 1024, // 1GB + mem_pool_size: TESTING_MEM_POOL_SIZE, }; let executors = Arc::new(DedicatedExecutors::new_testing()); Self::new_with_config_and_executors(config, executors) @@ -199,6 +204,14 @@ impl Executor { register_iox_object_store(&runtime, id, Arc::clone(store)); } + config + .metric_registry + .register_instrument( + "datafusion_pool", + DataFusionMemoryPoolMetricsBridge::default, + ) + .register_pool(&runtime.memory_pool, config.mem_pool_size); + Self { executors, config, @@ -367,9 +380,17 @@ mod tests { }; use datafusion::{ datasource::{provider_as_source, MemTable}, + error::DataFusionError, logical_expr::LogicalPlanBuilder, + physical_expr::PhysicalSortExpr, + physical_plan::{ + expressions::Column, sorts::sort::SortExec, DisplayAs, ExecutionPlan, RecordBatchStream, + }, }; + use futures::{stream::BoxStream, Stream, StreamExt}; + use metric::{Observation, RawReporter}; use stringset::StringSet; + use tokio::sync::Barrier; use super::*; use crate::exec::stringset::StringSetRef; @@ -534,6 +555,54 @@ mod tests { assert_eq!(results, to_set(&["f1", "f2"])); } + #[tokio::test] + async fn test_metrics_integration() { + let exec = Executor::new_testing(); + + // start w/o any reservation + assert_eq!( + PoolMetrics::read(&exec.config.metric_registry), + PoolMetrics { + reserved: 0, + limit: TESTING_MEM_POOL_SIZE as u64, + }, + ); + + // block some reservation + let plan = Arc::new(TestExec::default()); + let barrier = Arc::clone(&plan.barrier); + let schema = plan.schema(); + let plan = Arc::new(SortExec::new( + vec![PhysicalSortExpr { + expr: Arc::new(Column::new_with_schema("c", &schema).unwrap()), + options: Default::default(), + }], + plan, + )); + let ctx = exec.new_context(ExecutorType::Query); + let handle = tokio::spawn(async move { + ctx.collect(plan).await.unwrap(); + }); + barrier.wait().await; + assert_eq!( + PoolMetrics::read(&exec.config.metric_registry), + PoolMetrics { + reserved: 896, + limit: TESTING_MEM_POOL_SIZE as u64, + }, + ); + + // end w/o any reservation + handle.await.unwrap(); + assert_eq!( + PoolMetrics::read(&exec.config.metric_registry), + PoolMetrics { + reserved: 0, + limit: TESTING_MEM_POOL_SIZE as u64, + }, + ); + } + /// return a set for testing fn to_set(strs: &[&str]) -> StringSetRef { StringSetRef::new(strs.iter().map(|s| s.to_string()).collect::()) @@ -559,4 +628,145 @@ mod tests { .build() .unwrap() } + + #[derive(Debug)] + struct TestExec { + schema: SchemaRef, + barrier: Arc, + } + + impl Default for TestExec { + fn default() -> Self { + Self { + schema: Arc::new(arrow::datatypes::Schema::new(vec![Field::new( + "c", + DataType::Int64, + true, + )])), + barrier: Arc::new(Barrier::new(2)), + } + } + } + + impl DisplayAs for TestExec { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "TestExec") + } + } + + impl ExecutionPlan for TestExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning { + datafusion::physical_plan::Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion::error::Result> { + unimplemented!() + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion::error::Result + { + let barrier = Arc::clone(&self.barrier); + let schema = Arc::clone(&self.schema); + let stream = futures::stream::iter([Ok(RecordBatch::try_new( + Arc::clone(&self.schema), + vec![Arc::new(Int64Array::from(vec![1i64; 100]))], + ) + .unwrap())]) + .chain(futures::stream::once(async move { + barrier.wait().await; + Ok(RecordBatch::new_empty(schema)) + })); + let stream = BoxRecordBatchStream { + schema: Arc::clone(&self.schema), + inner: stream.boxed(), + }; + Ok(Box::pin(stream)) + } + + fn statistics(&self) -> datafusion::physical_plan::Statistics { + Default::default() + } + } + + struct BoxRecordBatchStream { + schema: SchemaRef, + inner: BoxStream<'static, Result>, + } + + impl Stream for BoxRecordBatchStream { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = &mut *self; + this.inner.poll_next_unpin(cx) + } + } + + impl RecordBatchStream for BoxRecordBatchStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + } + + #[derive(Debug, PartialEq, Eq)] + struct PoolMetrics { + reserved: u64, + limit: u64, + } + + impl PoolMetrics { + fn read(registry: &Registry) -> Self { + let mut reporter = RawReporter::default(); + registry.report(&mut reporter); + let metric = reporter.metric("datafusion_mem_pool_bytes").unwrap(); + + let reserved = metric + .observation(&[("pool_id", "0"), ("state", "reserved")]) + .unwrap(); + let Observation::U64Gauge(reserved) = reserved else { + panic!("wrong metric type") + }; + let limit = metric + .observation(&[("pool_id", "0"), ("state", "limit")]) + .unwrap(); + let Observation::U64Gauge(limit) = limit else { + panic!("wrong metric type") + }; + + Self { + reserved: *reserved, + limit: *limit, + } + } + } } diff --git a/iox_query/src/exec/metrics.rs b/iox_query/src/exec/metrics.rs new file mode 100644 index 0000000000..45a17f7369 --- /dev/null +++ b/iox_query/src/exec/metrics.rs @@ -0,0 +1,67 @@ +use std::{ + borrow::Cow, + sync::{Arc, Weak}, +}; + +use datafusion::execution::memory_pool::MemoryPool; +use metric::{Attributes, Instrument, MetricKind, Observation, Reporter}; +use parking_lot::Mutex; + +/// Hooks DataFusion [`MemoryPool`] into our [`metric`] crate. +#[derive(Debug, Clone, Default)] +pub struct DataFusionMemoryPoolMetricsBridge { + pools: Arc>>, +} + +impl DataFusionMemoryPoolMetricsBridge { + /// Register new pool. + pub fn register_pool(&self, pool: &Arc, limit: usize) { + self.pools.lock().push(Pool { + pool: Arc::downgrade(pool), + limit, + }); + } +} + +impl Instrument for DataFusionMemoryPoolMetricsBridge { + fn report(&self, reporter: &mut dyn Reporter) { + reporter.start_metric( + "datafusion_mem_pool_bytes", + "Number of bytes within the datafusion memory pool", + MetricKind::U64Gauge, + ); + let pools = self.pools.lock(); + for (idx, pool) in pools.iter().enumerate() { + let Some(pool_arc) = pool.pool.upgrade() else { + continue; + }; + + reporter.report_observation( + &Attributes::from([ + ("pool_id", Cow::Owned(idx.to_string())), + ("state", Cow::Borrowed("limit")), + ]), + Observation::U64Gauge(pool.limit as u64), + ); + + reporter.report_observation( + &Attributes::from([ + ("pool_id", Cow::Owned(idx.to_string())), + ("state", Cow::Borrowed("reserved")), + ]), + Observation::U64Gauge(pool_arc.reserved() as u64), + ); + } + reporter.finish_metric(); + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +#[derive(Debug)] +struct Pool { + pool: Weak, + limit: usize, +}