feat: expose DataFusion mem pool metrics (#8492)

Closes #8466.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-08-15 17:19:11 +02:00 committed by GitHub
parent 0bfa0a7b38
commit 39a08fab69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 278 additions and 1 deletions

View File

@ -5,6 +5,7 @@ pub(crate) mod context;
pub mod field;
pub mod fieldlist;
pub mod gapfill;
mod metrics;
mod non_null_checker;
pub mod query_tracing;
mod schema_pivot;
@ -34,8 +35,12 @@ use datafusion::{
pub use context::{IOxSessionConfig, IOxSessionContext, SessionContextIOxExt};
use schema_pivot::SchemaPivotNode;
use crate::exec::metrics::DataFusionMemoryPoolMetricsBridge;
use self::{non_null_checker::NonNullCheckerNode, split::StreamSplitNode};
const TESTING_MEM_POOL_SIZE: usize = 1024 * 1024 * 1024; // 1GB
/// Configuration for an Executor
#[derive(Debug, Clone)]
pub struct ExecutorConfig {
@ -172,7 +177,7 @@ impl Executor {
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: HashMap::default(),
metric_registry: Arc::new(Registry::default()),
mem_pool_size: 1024 * 1024 * 1024, // 1GB
mem_pool_size: TESTING_MEM_POOL_SIZE,
};
let executors = Arc::new(DedicatedExecutors::new_testing());
Self::new_with_config_and_executors(config, executors)
@ -199,6 +204,14 @@ impl Executor {
register_iox_object_store(&runtime, id, Arc::clone(store));
}
config
.metric_registry
.register_instrument(
"datafusion_pool",
DataFusionMemoryPoolMetricsBridge::default,
)
.register_pool(&runtime.memory_pool, config.mem_pool_size);
Self {
executors,
config,
@ -367,9 +380,17 @@ mod tests {
};
use datafusion::{
datasource::{provider_as_source, MemTable},
error::DataFusionError,
logical_expr::LogicalPlanBuilder,
physical_expr::PhysicalSortExpr,
physical_plan::{
expressions::Column, sorts::sort::SortExec, DisplayAs, ExecutionPlan, RecordBatchStream,
},
};
use futures::{stream::BoxStream, Stream, StreamExt};
use metric::{Observation, RawReporter};
use stringset::StringSet;
use tokio::sync::Barrier;
use super::*;
use crate::exec::stringset::StringSetRef;
@ -534,6 +555,54 @@ mod tests {
assert_eq!(results, to_set(&["f1", "f2"]));
}
#[tokio::test]
async fn test_metrics_integration() {
let exec = Executor::new_testing();
// start w/o any reservation
assert_eq!(
PoolMetrics::read(&exec.config.metric_registry),
PoolMetrics {
reserved: 0,
limit: TESTING_MEM_POOL_SIZE as u64,
},
);
// block some reservation
let plan = Arc::new(TestExec::default());
let barrier = Arc::clone(&plan.barrier);
let schema = plan.schema();
let plan = Arc::new(SortExec::new(
vec![PhysicalSortExpr {
expr: Arc::new(Column::new_with_schema("c", &schema).unwrap()),
options: Default::default(),
}],
plan,
));
let ctx = exec.new_context(ExecutorType::Query);
let handle = tokio::spawn(async move {
ctx.collect(plan).await.unwrap();
});
barrier.wait().await;
assert_eq!(
PoolMetrics::read(&exec.config.metric_registry),
PoolMetrics {
reserved: 896,
limit: TESTING_MEM_POOL_SIZE as u64,
},
);
// end w/o any reservation
handle.await.unwrap();
assert_eq!(
PoolMetrics::read(&exec.config.metric_registry),
PoolMetrics {
reserved: 0,
limit: TESTING_MEM_POOL_SIZE as u64,
},
);
}
/// return a set for testing
fn to_set(strs: &[&str]) -> StringSetRef {
StringSetRef::new(strs.iter().map(|s| s.to_string()).collect::<StringSet>())
@ -559,4 +628,145 @@ mod tests {
.build()
.unwrap()
}
#[derive(Debug)]
struct TestExec {
schema: SchemaRef,
barrier: Arc<Barrier>,
}
impl Default for TestExec {
fn default() -> Self {
Self {
schema: Arc::new(arrow::datatypes::Schema::new(vec![Field::new(
"c",
DataType::Int64,
true,
)])),
barrier: Arc::new(Barrier::new(2)),
}
}
}
impl DisplayAs for TestExec {
fn fmt_as(
&self,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
write!(f, "TestExec")
}
}
impl ExecutionPlan for TestExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream>
{
let barrier = Arc::clone(&self.barrier);
let schema = Arc::clone(&self.schema);
let stream = futures::stream::iter([Ok(RecordBatch::try_new(
Arc::clone(&self.schema),
vec![Arc::new(Int64Array::from(vec![1i64; 100]))],
)
.unwrap())])
.chain(futures::stream::once(async move {
barrier.wait().await;
Ok(RecordBatch::new_empty(schema))
}));
let stream = BoxRecordBatchStream {
schema: Arc::clone(&self.schema),
inner: stream.boxed(),
};
Ok(Box::pin(stream))
}
fn statistics(&self) -> datafusion::physical_plan::Statistics {
Default::default()
}
}
struct BoxRecordBatchStream {
schema: SchemaRef,
inner: BoxStream<'static, Result<RecordBatch, DataFusionError>>,
}
impl Stream for BoxRecordBatchStream {
type Item = Result<RecordBatch, DataFusionError>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = &mut *self;
this.inner.poll_next_unpin(cx)
}
}
impl RecordBatchStream for BoxRecordBatchStream {
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[derive(Debug, PartialEq, Eq)]
struct PoolMetrics {
reserved: u64,
limit: u64,
}
impl PoolMetrics {
fn read(registry: &Registry) -> Self {
let mut reporter = RawReporter::default();
registry.report(&mut reporter);
let metric = reporter.metric("datafusion_mem_pool_bytes").unwrap();
let reserved = metric
.observation(&[("pool_id", "0"), ("state", "reserved")])
.unwrap();
let Observation::U64Gauge(reserved) = reserved else {
panic!("wrong metric type")
};
let limit = metric
.observation(&[("pool_id", "0"), ("state", "limit")])
.unwrap();
let Observation::U64Gauge(limit) = limit else {
panic!("wrong metric type")
};
Self {
reserved: *reserved,
limit: *limit,
}
}
}
}

View File

@ -0,0 +1,67 @@
use std::{
borrow::Cow,
sync::{Arc, Weak},
};
use datafusion::execution::memory_pool::MemoryPool;
use metric::{Attributes, Instrument, MetricKind, Observation, Reporter};
use parking_lot::Mutex;
/// Hooks DataFusion [`MemoryPool`] into our [`metric`] crate.
#[derive(Debug, Clone, Default)]
pub struct DataFusionMemoryPoolMetricsBridge {
pools: Arc<Mutex<Vec<Pool>>>,
}
impl DataFusionMemoryPoolMetricsBridge {
/// Register new pool.
pub fn register_pool(&self, pool: &Arc<dyn MemoryPool>, limit: usize) {
self.pools.lock().push(Pool {
pool: Arc::downgrade(pool),
limit,
});
}
}
impl Instrument for DataFusionMemoryPoolMetricsBridge {
fn report(&self, reporter: &mut dyn Reporter) {
reporter.start_metric(
"datafusion_mem_pool_bytes",
"Number of bytes within the datafusion memory pool",
MetricKind::U64Gauge,
);
let pools = self.pools.lock();
for (idx, pool) in pools.iter().enumerate() {
let Some(pool_arc) = pool.pool.upgrade() else {
continue;
};
reporter.report_observation(
&Attributes::from([
("pool_id", Cow::Owned(idx.to_string())),
("state", Cow::Borrowed("limit")),
]),
Observation::U64Gauge(pool.limit as u64),
);
reporter.report_observation(
&Attributes::from([
("pool_id", Cow::Owned(idx.to_string())),
("state", Cow::Borrowed("reserved")),
]),
Observation::U64Gauge(pool_arc.reserved() as u64),
);
}
reporter.finish_metric();
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[derive(Debug)]
struct Pool {
pool: Weak<dyn MemoryPool>,
limit: usize,
}