refactor: remove `TaskConfig` param from `chunks_to_physical_nodes` (#7019)

This makes it easier to use it from optimizer passes.

Ref #6098.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-02-22 10:19:59 +01:00 committed by GitHub
parent 929ac9081e
commit e9ec213b72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 48 deletions

View File

@ -101,9 +101,8 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use data_types::ChunkId;
use datafusion::{
execution::context::TaskContext,
physical_plan::filter::FilterExec,
prelude::{col, lit, SessionConfig, SessionContext},
prelude::{col, lit},
};
use predicate::Predicate;
@ -186,7 +185,7 @@ mod tests {
None,
vec![Arc::new(chunk1)],
Predicate::default(),
task_ctx(),
2,
);
let plan = FilterExec::try_new(
df_physical_expr(plan.as_ref(), col("tag1").eq(lit("foo"))).unwrap(),
@ -198,24 +197,12 @@ mod tests {
#[track_caller]
fn assert_roundtrip(schema: Schema, chunks: Vec<Arc<dyn QueryChunk>>) {
let plan = chunks_to_physical_nodes(
&schema,
None,
chunks.clone(),
Predicate::default(),
task_ctx(),
);
let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2);
let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found");
assert_eq!(schema, schema2);
assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2));
}
fn task_ctx() -> Arc<TaskContext> {
let session_ctx =
SessionContext::with_config(SessionConfig::default().with_target_partitions(2));
Arc::new(TaskContext::from(&session_ctx))
}
fn chunk_ids(chunks: &[Arc<dyn QueryChunk>]) -> Vec<ChunkId> {
let mut ids = chunks.iter().map(|c| c.id()).collect::<Vec<_>>();
ids.sort();

View File

@ -1093,7 +1093,7 @@ impl Deduplicater {
output_sort_key,
vec![Arc::clone(&chunk)],
predicate,
ctx.inner().task_ctx(),
ctx.inner().task_ctx().session_config().target_partitions(),
);
// Add Filter operator, FilterExec, if the chunk has delete predicates
@ -1271,7 +1271,7 @@ impl Deduplicater {
output_sort_key,
chunks.into_no_duplicates(deduplication),
predicate,
ctx.inner().task_ctx(),
ctx.inner().task_ctx().session_config().target_partitions(),
));
return Ok(plans);
}
@ -1456,7 +1456,11 @@ mod test {
None,
vec![Arc::clone(&chunk)],
Predicate::default(),
IOxSessionContext::with_testing().inner().task_ctx(),
IOxSessionContext::with_testing()
.inner()
.task_ctx()
.session_config()
.target_partitions(),
);
// plan should not have sort operator
@ -1540,7 +1544,11 @@ mod test {
None,
vec![Arc::clone(&chunk)],
Predicate::default(),
IOxSessionContext::with_testing().inner().task_ctx(),
IOxSessionContext::with_testing()
.inner()
.task_ctx()
.session_config()
.target_partitions(),
);
let batch = test_collect(Arc::clone(&input)).await;
// data in its original non-sorted form

View File

@ -6,7 +6,6 @@ use crate::{
};
use datafusion::{
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
execution::context::TaskContext,
physical_plan::{
empty::EmptyExec,
file_format::{FileScanConfig, ParquetExec},
@ -124,7 +123,7 @@ pub fn chunks_to_physical_nodes(
output_sort_key: Option<&SortKey>,
chunks: Vec<Arc<dyn QueryChunk>>,
predicate: Predicate,
context: Arc<TaskContext>,
target_partitions: usize,
) -> Arc<dyn ExecutionPlan> {
if chunks.is_empty() {
return Arc::new(EmptyExec::new(false, iox_schema.as_arrow()));
@ -167,7 +166,6 @@ pub fn chunks_to_physical_nodes(
}
let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect();
parquet_chunks.sort_by_key(|(url_str, _)| url_str.clone());
let target_partitions = context.session_config().target_partitions();
for (_url_str, chunk_list) in parquet_chunks {
let ParquetChunkList {
object_store_url,
@ -235,7 +233,6 @@ where
#[cfg(test)]
mod tests {
use datafusion::prelude::{SessionConfig, SessionContext};
use schema::sort::SortKeyBuilder;
use crate::{
@ -305,7 +302,7 @@ mod tests {
#[test]
fn test_chunks_to_physical_nodes_empty() {
let schema = TestChunk::new("table").schema().clone();
let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), task_ctx());
let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -319,13 +316,8 @@ mod tests {
fn test_chunks_to_physical_nodes_recordbatch() {
let chunk = TestChunk::new("table");
let schema = chunk.schema().clone();
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk)],
Predicate::new(),
task_ctx(),
);
let plan =
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -340,13 +332,8 @@ mod tests {
fn test_chunks_to_physical_nodes_parquet_one_file() {
let chunk = TestChunk::new("table").with_dummy_parquet_file();
let schema = chunk.schema().clone();
let plan = chunks_to_physical_nodes(
&schema,
None,
vec![Arc::new(chunk)],
Predicate::new(),
task_ctx(),
);
let plan =
chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@r###"
@ -368,7 +355,7 @@ mod tests {
None,
vec![Arc::new(chunk1), Arc::new(chunk2), Arc::new(chunk3)],
Predicate::new(),
task_ctx(),
2,
);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@ -394,7 +381,7 @@ mod tests {
None,
vec![Arc::new(chunk1), Arc::new(chunk2)],
Predicate::new(),
task_ctx(),
2,
);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@ -417,7 +404,7 @@ mod tests {
None,
vec![Arc::new(chunk1), Arc::new(chunk2)],
Predicate::new(),
task_ctx(),
2,
);
insta::assert_yaml_snapshot!(
format_execution_plan(&plan),
@ -429,10 +416,4 @@ mod tests {
"###
);
}
fn task_ctx() -> Arc<TaskContext> {
let session_ctx =
SessionContext::with_config(SessionConfig::default().with_target_partitions(2));
Arc::new(TaskContext::from(&session_ctx))
}
}