Merge branch 'main' into dom/nesting

pull/24376/head
Dom 2023-03-03 16:21:55 +00:00 committed by GitHub
commit 5b7a8beff8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 25 deletions

View File

@ -15,7 +15,7 @@ use arrow::{compute::SortOptions, datatypes::SchemaRef};
use datafusion::{
common::DFSchemaRef,
error::{DataFusionError, Result},
execution::context::TaskContext,
execution::{context::TaskContext, memory_pool::MemoryConsumer},
logical_expr::{LogicalPlan, UserDefinedLogicalNode},
physical_expr::{create_physical_expr, execution_props::ExecutionProps, PhysicalSortExpr},
physical_plan::{
@ -412,22 +412,22 @@ impl ExecutionPlan for GapFillExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if self.output_partitioning().partition_count() <= partition {
if partition != 0 {
return Err(DataFusionError::Internal(format!(
"GapFillExec invalid partition {partition}"
"GapFillExec invalid partition {partition}, there can be only one partition"
)));
}
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let output_batch_size = context.session_config().batch_size();
let reservation = MemoryConsumer::new(format!("GapFillExec[{partition}]"))
.register(context.memory_pool());
let input_stream = self.input.execute(partition, context)?;
Ok(Box::pin(GapFillStream::try_new(
self.schema(),
&self.sort_expr,
&self.aggr_expr,
&self.params,
self,
output_batch_size,
input_stream,
reservation,
baseline_metrics,
)?))
}
@ -480,6 +480,7 @@ mod test {
use datafusion::{
datasource::empty::EmptyTable,
error::Result,
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
logical_expr::{logical_plan, Extension},
physical_plan::{collect, expressions::lit as phys_lit, memory::MemoryExec},
prelude::{col, lit, lit_timestamp_nano, SessionConfig, SessionContext},
@ -696,7 +697,10 @@ mod test {
output_batch_size,
params,
};
let batches = tc.run().await?;
// For this simple test case, also test that
// memory is tracked correctly, which is done by
// TestCase when running with a memory limit.
let batches = tc.run_with_memory_limit(16384).await?;
let expected = [
"+----+--------------------------+----+",
"| g0 | time | a0 |",
@ -1230,6 +1234,28 @@ mod test {
}
}
#[tokio::test]
async fn test_gapfill_oom() {
// Show that a graceful error is produced if memory limit is exceeded
test_helpers::maybe_start_logging();
let input_batch_size = 128;
let output_batch_size = 128;
let batch = TestRecords {
group_cols: vec![vec![Some("a"), Some("a")]],
time_col: vec![Some(1_000), Some(1_100)],
agg_cols: vec![vec![Some(10), Some(11)]],
input_batch_size,
};
let params = get_params_ms(&batch, 25, Some(975), 1_125);
let tc = TestCase {
test_records: batch,
output_batch_size,
params,
};
let result = tc.run_with_memory_limit(1).await;
assert_error!(result, DataFusionError::ResourcesExhausted(_));
}
fn assert_batch_count(actual_batches: &[RecordBatch], batch_size: usize) {
let num_rows = actual_batches.iter().map(|b| b.num_rows()).sum::<usize>();
let expected_batch_count = f64::ceil(num_rows as f64 / batch_size as f64) as usize;
@ -1341,6 +1367,32 @@ mod test {
impl TestCase {
async fn run(self) -> Result<Vec<RecordBatch>> {
let session_ctx = SessionContext::with_config(
SessionConfig::default().with_batch_size(self.output_batch_size),
)
.into();
Self::execute_with_config(&session_ctx, self.plan()?).await
}
async fn run_with_memory_limit(self, limit: usize) -> Result<Vec<RecordBatch>> {
let session_ctx = SessionContext::with_config_rt(
SessionConfig::default().with_batch_size(self.output_batch_size),
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(limit, 1.0))?.into(),
)
.into();
let result = Self::execute_with_config(&session_ctx, self.plan()?).await;
if result.is_ok() {
// Verify that the operator reports usage in a
// symmetrical way.
let pool = &session_ctx.runtime_env().memory_pool;
assert_eq!(0, pool.reserved());
}
result
}
fn plan(self) -> Result<Arc<GapFillExec>> {
let schema = self.test_records.schema();
let (group_expr, aggr_expr) = self.test_records.exprs()?;
@ -1359,18 +1411,20 @@ mod test {
self.output_batch_size
);
let input = Arc::new(MemoryExec::try_new(&[batches], schema, None)?);
let plan = Arc::new(GapFillExec::try_new(
input,
group_expr,
aggr_expr,
self.params,
self.params.clone(),
)?);
Ok(plan)
}
let session_ctx = SessionContext::with_config(
SessionConfig::default().with_batch_size(self.output_batch_size),
);
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
async fn execute_with_config(
session_ctx: &Arc<SessionContext>,
plan: Arc<GapFillExec>,
) -> Result<Vec<RecordBatch>> {
let task_ctx = Arc::new(TaskContext::from(session_ctx.as_ref()));
collect(plan, task_ctx).await
}
}

View File

@ -13,16 +13,16 @@ use arrow::{
use arrow_util::optimize::optimize_dictionaries;
use datafusion::{
error::{DataFusionError, Result},
physical_expr::PhysicalSortExpr,
execution::memory_pool::MemoryReservation,
physical_plan::{
expressions::Column,
metrics::{BaselineMetrics, RecordOutput},
PhysicalExpr, RecordBatchStream, SendableRecordBatchStream,
ExecutionPlan, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream,
},
};
use futures::{ready, Stream, StreamExt};
use super::{algo::GapFiller, params::GapFillParams, GapFillExecParams};
use super::{algo::GapFiller, params::GapFillParams, GapFillExec};
/// An implementation of a gap-filling operator that uses the [Stream] trait.
#[allow(dead_code)]
@ -48,6 +48,8 @@ pub(super) struct GapFillStream {
gap_filler: GapFiller,
/// This is true as long as there are more input record batches to read from `input`.
more_input: bool,
/// For tracking memory.
reservation: MemoryReservation,
/// Baseline metrics.
baseline_metrics: BaselineMetrics,
}
@ -55,14 +57,20 @@ pub(super) struct GapFillStream {
impl GapFillStream {
/// Creates a new GapFillStream.
pub fn try_new(
schema: SchemaRef,
sort_expr: &[PhysicalSortExpr],
aggr_expr: &[Arc<dyn PhysicalExpr>],
exec_params: &GapFillExecParams,
exec: &GapFillExec,
batch_size: usize,
input: SendableRecordBatchStream,
reservation: MemoryReservation,
metrics: BaselineMetrics,
) -> Result<Self> {
let schema = exec.schema();
let GapFillExec {
sort_expr,
aggr_expr,
params,
..
} = exec;
if sort_expr.is_empty() {
return Err(DataFusionError::Internal(
"empty sort_expr vector for gap filling; should have at least a time expression"
@ -75,10 +83,10 @@ impl GapFillStream {
.collect::<Vec<_>>();
let aggr_expr = aggr_expr.to_owned();
let time_expr = group_expr.split_off(group_expr.len() - 1).pop().unwrap();
let params = GapFillParams::try_new(Arc::clone(&schema), exec_params)?;
let params = GapFillParams::try_new(Arc::clone(&schema), params)?;
let gap_filler = GapFiller::new(params);
Ok(Self {
schema: Arc::clone(&schema),
schema,
time_expr,
group_expr,
aggr_expr,
@ -87,6 +95,7 @@ impl GapFillStream {
buffered_input_batches: vec![],
gap_filler,
more_input: true,
reservation,
baseline_metrics: metrics,
})
}
@ -122,7 +131,10 @@ impl Stream for GapFillStream {
) -> Poll<Option<Result<RecordBatch>>> {
while self.more_input && self.buffered_input_row_count() < self.batch_size + 2 {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => self.buffered_input_batches.push(batch),
Some(Ok(batch)) => {
self.reservation.try_grow(batch.get_array_memory_size())?;
self.buffered_input_batches.push(batch);
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}
@ -139,6 +151,7 @@ impl Stream for GapFillStream {
if self.gap_filler.done(input_batch.num_rows()) {
// leave the input batch taken so that its reference
// count goes to zero.
self.reservation.shrink(input_batch.get_array_memory_size());
return Poll::Ready(None);
}
@ -151,6 +164,9 @@ impl Stream for GapFillStream {
Ok((output_batch, remaining_input_batch)) => {
self.buffered_input_batches.push(remaining_input_batch);
assert_eq!(1, self.buffered_input_batches.len());
self.reservation
.shrink(output_batch.get_array_memory_size());
Poll::Ready(Some(Ok(output_batch)))
}
Err(e) => Poll::Ready(Some(Err(e))),
@ -176,9 +192,11 @@ impl GapFillStream {
let mut v = vec![];
std::mem::swap(&mut v, &mut self.buffered_input_batches);
let old_size = v.iter().map(|rb| rb.get_array_memory_size()).sum();
let mut batch = arrow::compute::concat_batches(&self.schema, &v)
.map_err(DataFusionError::ArrowError)?;
self.reservation.try_grow(batch.get_array_memory_size())?;
if v.len() > 1 {
// Optimize the dictionaries. The output of this operator uses the take kernel to produce
@ -186,6 +204,8 @@ impl GapFillStream {
// be less work to optimize here vs optimizing the output.
batch = optimize_dictionaries(&batch).map_err(DataFusionError::ArrowError)?;
}
self.reservation.shrink(old_size);
Ok(Some(batch))
}
@ -223,7 +243,11 @@ impl GapFillStream {
.record_output(&self.baseline_metrics)?;
timer.done();
// Slice the input so array data that is no longer referenced can be dropped.
self.reservation
.try_grow(output_batch.get_array_memory_size())?;
// Slice the input to just what is needed moving forward, with one context
// row before the next input offset.
input_batch = self.gap_filler.slice_input_batch(input_batch)?;
Ok((output_batch, input_batch))