refactor: use less executors and threads during tests (#5086)
`Executor` is only used as a performance boundary, not as a correctness or data boundary so let's try to re-use it. This also simplifies profiling of tests since we don't end up with hundreds (or even thousands) of threads. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
54039e8ae5
commit
607831585c
|
|
@ -2414,6 +2414,7 @@ dependencies = [
|
|||
"mutable_batch_lp",
|
||||
"object_store",
|
||||
"observability_deps",
|
||||
"once_cell",
|
||||
"parquet_file",
|
||||
"schema",
|
||||
"sharder",
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ metric = { path = "../metric" }
|
|||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
object_store = "0.3.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
once_cell = { version = "1.13.0", features = ["parking_lot"] }
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
schema = { path = "../schema" }
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ use iox_time::{MockProvider, Time, TimeProvider};
|
|||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use object_store::{memory::InMemory, DynObjectStore};
|
||||
use observability_deps::tracing::debug;
|
||||
use once_cell::sync::Lazy;
|
||||
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
|
||||
use schema::{
|
||||
selection::Selection,
|
||||
|
|
@ -29,6 +30,9 @@ use schema::{
|
|||
use std::sync::Arc;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Global executor used by all test catalogs.
|
||||
static GLOBAL_EXEC: Lazy<Arc<Executor>> = Lazy::new(|| Arc::new(Executor::new(1)));
|
||||
|
||||
/// Catalog for tests
|
||||
#[derive(Debug)]
|
||||
#[allow(missing_docs)]
|
||||
|
|
@ -42,12 +46,21 @@ pub struct TestCatalog {
|
|||
|
||||
impl TestCatalog {
|
||||
/// Initialize the catalog
|
||||
///
|
||||
/// All test catalogs use the same [`Executor`]. Use [`with_exec`](Self::with_exec) if you need a special or
|
||||
/// dedicated executor.
|
||||
pub fn new() -> Arc<Self> {
|
||||
let exec = Arc::clone(&GLOBAL_EXEC);
|
||||
|
||||
Self::with_exec(exec)
|
||||
}
|
||||
|
||||
/// Initialize with given executor.
|
||||
pub fn with_exec(exec: Arc<Executor>) -> Arc<Self> {
|
||||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
|
||||
let object_store = Arc::new(InMemory::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp(0, 0)));
|
||||
let exec = Arc::new(Executor::new(1));
|
||||
|
||||
Arc::new(Self {
|
||||
metric_registry,
|
||||
|
|
|
|||
|
|
@ -5,16 +5,12 @@ mod setup;
|
|||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::{display::pretty_format_batches, test_util::sort_record_batch};
|
||||
use iox_query::{
|
||||
exec::{Executor, ExecutorType},
|
||||
frontend::sql::SqlQueryPlanner,
|
||||
};
|
||||
use iox_query::frontend::sql::SqlQueryPlanner;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::{
|
||||
io::LineWriter,
|
||||
io::Write,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use self::{
|
||||
|
|
@ -22,7 +18,6 @@ use self::{
|
|||
setup::TestSetup,
|
||||
};
|
||||
use crate::scenarios::{DbScenario, DbSetup};
|
||||
use iox_query::exec::ExecutorConfig;
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
#[derive(Debug, Snafu)]
|
||||
|
|
@ -276,14 +271,7 @@ impl<W: Write> Runner<W> {
|
|||
writeln!(self.log, "Running scenario '{}'", scenario_name)?;
|
||||
writeln!(self.log, "SQL: '{:#?}'", sql)?;
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let executor = Arc::new(Executor::new_with_config(ExecutorConfig {
|
||||
num_threads: 1,
|
||||
target_query_partitions: 4,
|
||||
}));
|
||||
let ctx = executor
|
||||
.new_execution_config(ExecutorType::Query)
|
||||
.with_default_catalog(db.as_catalog_provider_arc())
|
||||
.build();
|
||||
let ctx = db.new_query_context(None);
|
||||
|
||||
let physical_plan = planner
|
||||
.query(sql, &ctx)
|
||||
|
|
|
|||
|
|
@ -21,9 +21,11 @@ use ingester::{
|
|||
querier_handler::prepare_data_to_querier,
|
||||
};
|
||||
use iox_catalog::interface::get_schema_by_name;
|
||||
use iox_query::exec::{Executor, ExecutorConfig};
|
||||
use iox_tests::util::{TestCatalog, TestNamespace, TestSequencer};
|
||||
use itertools::Itertools;
|
||||
use mutable_batch_lp::LinesConverter;
|
||||
use once_cell::sync::Lazy;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use querier::{
|
||||
IngesterConnectionImpl, IngesterFlightClient, IngesterFlightClientError,
|
||||
|
|
@ -638,10 +640,20 @@ struct MockIngester {
|
|||
sequence_counter: i64,
|
||||
}
|
||||
|
||||
/// Query-test specific executor with static properties that may be relevant for the query optimizer and therefore may
|
||||
/// change `EXPLAIN` plans.
|
||||
static GLOBAL_EXEC: Lazy<Arc<Executor>> = Lazy::new(|| {
|
||||
Arc::new(Executor::new_with_config(ExecutorConfig {
|
||||
num_threads: 1,
|
||||
target_query_partitions: 4,
|
||||
}))
|
||||
});
|
||||
|
||||
impl MockIngester {
|
||||
/// Create new empty ingester.
|
||||
async fn new() -> Self {
|
||||
let catalog = TestCatalog::new();
|
||||
let exec = Arc::clone(&GLOBAL_EXEC);
|
||||
let catalog = TestCatalog::with_exec(exec);
|
||||
let ns = catalog.create_namespace("test_db").await;
|
||||
let sequencer = ns.create_sequencer(1).await;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue