diff --git a/iox_query/src/physical_optimizer/chunk_extraction.rs b/iox_query/src/physical_optimizer/chunk_extraction.rs index d891c78f19..ccf4bdb910 100644 --- a/iox_query/src/physical_optimizer/chunk_extraction.rs +++ b/iox_query/src/physical_optimizer/chunk_extraction.rs @@ -101,9 +101,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use data_types::ChunkId; use datafusion::{ - execution::context::TaskContext, physical_plan::filter::FilterExec, - prelude::{col, lit, SessionConfig, SessionContext}, + prelude::{col, lit}, }; use predicate::Predicate; @@ -186,7 +185,7 @@ mod tests { None, vec![Arc::new(chunk1)], Predicate::default(), - task_ctx(), + 2, ); let plan = FilterExec::try_new( df_physical_expr(plan.as_ref(), col("tag1").eq(lit("foo"))).unwrap(), @@ -198,24 +197,12 @@ mod tests { #[track_caller] fn assert_roundtrip(schema: Schema, chunks: Vec>) { - let plan = chunks_to_physical_nodes( - &schema, - None, - chunks.clone(), - Predicate::default(), - task_ctx(), - ); + let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2); let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found"); assert_eq!(schema, schema2); assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2)); } - fn task_ctx() -> Arc { - let session_ctx = - SessionContext::with_config(SessionConfig::default().with_target_partitions(2)); - Arc::new(TaskContext::from(&session_ctx)) - } - fn chunk_ids(chunks: &[Arc]) -> Vec { let mut ids = chunks.iter().map(|c| c.id()).collect::>(); ids.sort(); diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index b4063f171e..0f2b95b860 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -1093,7 +1093,7 @@ impl Deduplicater { output_sort_key, vec![Arc::clone(&chunk)], predicate, - ctx.inner().task_ctx(), + ctx.inner().task_ctx().session_config().target_partitions(), ); // Add Filter operator, FilterExec, if the chunk has delete predicates @@ -1271,7 +1271,7 @@ impl Deduplicater { output_sort_key, chunks.into_no_duplicates(deduplication), predicate, - ctx.inner().task_ctx(), + ctx.inner().task_ctx().session_config().target_partitions(), )); return Ok(plans); } @@ -1456,7 +1456,11 @@ mod test { None, vec![Arc::clone(&chunk)], Predicate::default(), - IOxSessionContext::with_testing().inner().task_ctx(), + IOxSessionContext::with_testing() + .inner() + .task_ctx() + .session_config() + .target_partitions(), ); // plan should not have sort operator @@ -1540,7 +1544,11 @@ mod test { None, vec![Arc::clone(&chunk)], Predicate::default(), - IOxSessionContext::with_testing().inner().task_ctx(), + IOxSessionContext::with_testing() + .inner() + .task_ctx() + .session_config() + .target_partitions(), ); let batch = test_collect(Arc::clone(&input)).await; // data in its original non-sorted form diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index 62cd4a83ff..21daba6628 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -6,7 +6,6 @@ use crate::{ }; use datafusion::{ datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}, - execution::context::TaskContext, physical_plan::{ empty::EmptyExec, file_format::{FileScanConfig, ParquetExec}, @@ -124,7 +123,7 @@ pub fn chunks_to_physical_nodes( output_sort_key: Option<&SortKey>, chunks: Vec>, predicate: Predicate, - context: Arc, + target_partitions: usize, ) -> Arc { if chunks.is_empty() { return Arc::new(EmptyExec::new(false, iox_schema.as_arrow())); @@ -167,7 +166,6 @@ pub fn chunks_to_physical_nodes( } let mut parquet_chunks: Vec<_> = parquet_chunks.into_iter().collect(); parquet_chunks.sort_by_key(|(url_str, _)| url_str.clone()); - let target_partitions = context.session_config().target_partitions(); for (_url_str, chunk_list) in parquet_chunks { let ParquetChunkList { object_store_url, @@ -235,7 +233,6 @@ where #[cfg(test)] mod tests { - use datafusion::prelude::{SessionConfig, SessionContext}; use schema::sort::SortKeyBuilder; use crate::{ @@ -305,7 +302,7 @@ mod tests { #[test] fn test_chunks_to_physical_nodes_empty() { let schema = TestChunk::new("table").schema().clone(); - let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), task_ctx()); + let plan = chunks_to_physical_nodes(&schema, None, vec![], Predicate::new(), 2); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @r###" @@ -319,13 +316,8 @@ mod tests { fn test_chunks_to_physical_nodes_recordbatch() { let chunk = TestChunk::new("table"); let schema = chunk.schema().clone(); - let plan = chunks_to_physical_nodes( - &schema, - None, - vec![Arc::new(chunk)], - Predicate::new(), - task_ctx(), - ); + let plan = + chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @r###" @@ -340,13 +332,8 @@ mod tests { fn test_chunks_to_physical_nodes_parquet_one_file() { let chunk = TestChunk::new("table").with_dummy_parquet_file(); let schema = chunk.schema().clone(); - let plan = chunks_to_physical_nodes( - &schema, - None, - vec![Arc::new(chunk)], - Predicate::new(), - task_ctx(), - ); + let plan = + chunks_to_physical_nodes(&schema, None, vec![Arc::new(chunk)], Predicate::new(), 2); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @r###" @@ -368,7 +355,7 @@ mod tests { None, vec![Arc::new(chunk1), Arc::new(chunk2), Arc::new(chunk3)], Predicate::new(), - task_ctx(), + 2, ); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @@ -394,7 +381,7 @@ mod tests { None, vec![Arc::new(chunk1), Arc::new(chunk2)], Predicate::new(), - task_ctx(), + 2, ); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @@ -417,7 +404,7 @@ mod tests { None, vec![Arc::new(chunk1), Arc::new(chunk2)], Predicate::new(), - task_ctx(), + 2, ); insta::assert_yaml_snapshot!( format_execution_plan(&plan), @@ -429,10 +416,4 @@ mod tests { "### ); } - - fn task_ctx() -> Arc { - let session_ctx = - SessionContext::with_config(SessionConfig::default().with_target_partitions(2)); - Arc::new(TaskContext::from(&session_ctx)) - } }