diff --git a/Cargo.lock b/Cargo.lock index bc680f8469..608c3b8d7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2414,6 +2414,7 @@ dependencies = [ "mutable_batch_lp", "object_store", "observability_deps", + "once_cell", "parquet_file", "schema", "sharder", diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index a5114965ed..7a055e50d8 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -16,6 +16,7 @@ metric = { path = "../metric" } mutable_batch_lp = { path = "../mutable_batch_lp" } object_store = "0.3.0" observability_deps = { path = "../observability_deps" } +once_cell = { version = "1.13.0", features = ["parking_lot"] } parquet_file = { path = "../parquet_file" } iox_query = { path = "../iox_query" } schema = { path = "../schema" } diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index e0e696c30d..9e5f61b644 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -20,6 +20,7 @@ use iox_time::{MockProvider, Time, TimeProvider}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use object_store::{memory::InMemory, DynObjectStore}; use observability_deps::tracing::debug; +use once_cell::sync::Lazy; use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; use schema::{ selection::Selection, @@ -29,6 +30,9 @@ use schema::{ use std::sync::Arc; use uuid::Uuid; +/// Global executor used by all test catalogs. +static GLOBAL_EXEC: Lazy> = Lazy::new(|| Arc::new(Executor::new(1))); + /// Catalog for tests #[derive(Debug)] #[allow(missing_docs)] @@ -42,12 +46,21 @@ pub struct TestCatalog { impl TestCatalog { /// Initialize the catalog + /// + /// All test catalogs use the same [`Executor`]. Use [`with_exec`](Self::with_exec) if you need a special or + /// dedicated executor. pub fn new() -> Arc { + let exec = Arc::clone(&GLOBAL_EXEC); + + Self::with_exec(exec) + } + + /// Initialize with given executor. + pub fn with_exec(exec: Arc) -> Arc { let metric_registry = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))); let object_store = Arc::new(InMemory::new()); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp(0, 0))); - let exec = Arc::new(Executor::new(1)); Arc::new(Self { metric_registry, diff --git a/query_tests/src/runner.rs b/query_tests/src/runner.rs index 3195a23ac1..672a337b51 100644 --- a/query_tests/src/runner.rs +++ b/query_tests/src/runner.rs @@ -5,16 +5,12 @@ mod setup; use arrow::record_batch::RecordBatch; use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch}; -use iox_query::{ - exec::{Executor, ExecutorType}, - frontend::sql::SqlQueryPlanner, -}; +use iox_query::frontend::sql::SqlQueryPlanner; use snafu::{OptionExt, ResultExt, Snafu}; use std::{ io::LineWriter, io::Write, path::{Path, PathBuf}, - sync::Arc, }; use self::{ @@ -22,7 +18,6 @@ use self::{ setup::TestSetup, }; use crate::scenarios::{DbScenario, DbSetup}; -use iox_query::exec::ExecutorConfig; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] @@ -276,14 +271,7 @@ impl Runner { writeln!(self.log, "Running scenario '{}'", scenario_name)?; writeln!(self.log, "SQL: '{:#?}'", sql)?; let planner = SqlQueryPlanner::default(); - let executor = Arc::new(Executor::new_with_config(ExecutorConfig { - num_threads: 1, - target_query_partitions: 4, - })); - let ctx = executor - .new_execution_config(ExecutorType::Query) - .with_default_catalog(db.as_catalog_provider_arc()) - .build(); + let ctx = db.new_query_context(None); let physical_plan = planner .query(sql, &ctx) diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index d5160551a1..10309c67b9 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -21,9 +21,11 @@ use ingester::{ querier_handler::prepare_data_to_querier, }; use iox_catalog::interface::get_schema_by_name; +use iox_query::exec::{Executor, ExecutorConfig}; use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer}; use itertools::Itertools; use mutable_batch_lp::LinesConverter; +use once_cell::sync::Lazy; use parquet_file::storage::ParquetStorage; use querier::{ IngesterConnectionImpl, IngesterFlightClient, IngesterFlightClientError, @@ -638,10 +640,20 @@ struct MockIngester { sequence_counter: i64, } +/// Query-test specific executor with static properties that may be relevant for the query optimizer and therefore may +/// change `EXPLAIN` plans. +static GLOBAL_EXEC: Lazy> = Lazy::new(|| { + Arc::new(Executor::new_with_config(ExecutorConfig { + num_threads: 1, + target_query_partitions: 4, + })) +}); + impl MockIngester { /// Create new empty ingester. async fn new() -> Self { - let catalog = TestCatalog::new(); + let exec = Arc::clone(&GLOBAL_EXEC); + let catalog = TestCatalog::with_exec(exec); let ns = catalog.create_namespace("test_db").await; let sequencer = ns.create_sequencer(1).await;