diff --git a/Cargo.lock b/Cargo.lock index a60d04718e..208c89800d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2963,7 +2963,6 @@ dependencies = [ "mutable_batch_lp", "object_store", "observability_deps", - "once_cell", "parquet_file", "predicate", "schema", diff --git a/clap_blocks/src/compactor2.rs b/clap_blocks/src/compactor2.rs index c5c179acc6..49796e99a3 100644 --- a/clap_blocks/src/compactor2.rs +++ b/clap_blocks/src/compactor2.rs @@ -58,7 +58,7 @@ pub struct Compactor2Config { env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT", action )] - pub query_exec_thread_count: Option, + pub query_exec_thread_count: Option, /// Size of memory pool used during compaction plan execution, in /// bytes. diff --git a/clap_blocks/src/querier.rs b/clap_blocks/src/querier.rs index b2b17f33ea..e91d235375 100644 --- a/clap_blocks/src/querier.rs +++ b/clap_blocks/src/querier.rs @@ -3,7 +3,9 @@ use crate::ingester_address::IngesterAddress; use data_types::{IngesterMapping, ShardIndex}; use serde::Deserialize; use snafu::{ResultExt, Snafu}; -use std::{collections::HashMap, fs, io, path::PathBuf, str::FromStr, sync::Arc}; +use std::{ + collections::HashMap, fs, io, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc, +}; #[derive(Debug, Snafu)] #[allow(missing_docs)] @@ -55,7 +57,7 @@ pub struct QuerierConfig { env = "INFLUXDB_IOX_NUM_QUERY_THREADS", action )] - pub num_query_threads: Option, + pub num_query_threads: Option, /// Size of memory pool used during query exec, in bytes. /// @@ -273,7 +275,7 @@ pub struct QuerierConfig { impl QuerierConfig { /// Get the querier config's num query threads. #[must_use] - pub fn num_query_threads(&self) -> Option { + pub fn num_query_threads(&self) -> Option { self.num_query_threads } @@ -465,7 +467,10 @@ mod tests { let actual = QuerierConfig::try_parse_from(["my_binary", "--num-query-threads", "42"]).unwrap(); - assert_eq!(actual.num_query_threads(), Some(42)); + assert_eq!( + actual.num_query_threads(), + Some(NonZeroUsize::new(42).unwrap()) + ); assert!(matches!( actual.ingester_addresses().unwrap(), IngesterAddresses::None, diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index b713b98e68..5a0e40f358 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -878,6 +878,8 @@ async fn update_catalog( #[cfg(test)] mod tests { + use std::num::NonZeroUsize; + use crate::parquet_file::CompactorParquetFile; use super::*; @@ -1066,7 +1068,7 @@ mod tests { /// Create data that is pre-sorted and deduplicated async fn new_for_sort() -> Self { // Ensure we have at least run at least partitions to test cross partition merging - let catalog = TestCatalog::with_target_query_partitions(2); + let catalog = TestCatalog::with_target_query_partitions(NonZeroUsize::new(2).unwrap()); let ns = catalog.create_namespace_1hr_retention("ns").await; let shard = ns.create_shard(1).await; let table = ns.create_table("table").await; diff --git a/executor/src/lib.rs b/executor/src/lib.rs index 2d238a8824..a0e2ab372e 100644 --- a/executor/src/lib.rs +++ b/executor/src/lib.rs @@ -17,6 +17,7 @@ use once_cell::sync::Lazy; use parking_lot::Mutex; use pin_project::{pin_project, pinned_drop}; use std::{ + num::NonZeroUsize, panic::AssertUnwindSafe, pin::Pin, sync::{ @@ -115,7 +116,7 @@ pub struct DedicatedExecutor { state: Arc>, /// Number of threads - num_threads: usize, + num_threads: NonZeroUsize, /// Used for testing. /// @@ -173,7 +174,7 @@ impl std::fmt::Debug for DedicatedExecutor { /// [`DedicatedExecutor`] for testing purposes. static TESTING_EXECUTOR: Lazy = - Lazy::new(|| DedicatedExecutor::new_inner("testing", 1, true)); + Lazy::new(|| DedicatedExecutor::new_inner("testing", NonZeroUsize::new(1).unwrap(), true)); impl DedicatedExecutor { /// Creates a new `DedicatedExecutor` with a dedicated tokio @@ -192,11 +193,11 @@ impl DedicatedExecutor { /// drop a runtime in a context where blocking is not allowed. This /// happens when a runtime is dropped from within an asynchronous /// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21 - pub fn new(thread_name: &str, num_threads: usize) -> Self { + pub fn new(thread_name: &str, num_threads: NonZeroUsize) -> Self { Self::new_inner(thread_name, num_threads, false) } - fn new_inner(thread_name: &str, num_threads: usize, testing: bool) -> Self { + fn new_inner(thread_name: &str, num_threads: NonZeroUsize, testing: bool) -> Self { let thread_name = thread_name.to_string(); let thread_counter = Arc::new(AtomicUsize::new(1)); @@ -215,7 +216,7 @@ impl DedicatedExecutor { thread_counter.fetch_add(1, Ordering::SeqCst) ) }) - .worker_threads(num_threads) + .worker_threads(num_threads.get()) .on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY)) .build() .expect("Creating tokio runtime"); @@ -267,7 +268,7 @@ impl DedicatedExecutor { } /// Number of threads that back this executor. - pub fn num_threads(&self) -> usize { + pub fn num_threads(&self) -> NonZeroUsize { self.num_threads } @@ -409,7 +410,7 @@ mod tests { async fn basic() { let barrier = Arc::new(Barrier::new(2)); - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier))); // Note the dedicated task will never complete if it runs on @@ -427,7 +428,7 @@ mod tests { #[tokio::test] async fn basic_clone() { let barrier = Arc::new(Barrier::new(2)); - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); // Run task on clone should work fine let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier))); barrier.wait(); @@ -439,7 +440,7 @@ mod tests { #[tokio::test] async fn drop_clone() { let barrier = Arc::new(Barrier::new(2)); - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); drop(exec.clone()); @@ -461,7 +462,7 @@ mod tests { } } - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); let _s = S(exec); // this must not lead to a double-panic and SIGILL @@ -473,7 +474,7 @@ mod tests { let barrier = Arc::new(Barrier::new(3)); // make an executor with two threads - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(2).unwrap()); let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier))); let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier))); @@ -489,7 +490,7 @@ mod tests { #[tokio::test] async fn worker_priority() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(2).unwrap()); let dedicated_task = exec.spawn(async move { get_current_thread_priority() }); @@ -500,7 +501,7 @@ mod tests { #[tokio::test] async fn tokio_spawn() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(2).unwrap()); // spawn a task that spawns to other tasks and ensure they run on the dedicated // executor @@ -528,7 +529,7 @@ mod tests { #[tokio::test] async fn panic_on_executor_str() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); let dedicated_task = exec.spawn(async move { if true { panic!("At the disco, on the dedicated task scheduler"); @@ -549,7 +550,7 @@ mod tests { #[tokio::test] async fn panic_on_executor_string() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); let dedicated_task = exec.spawn(async move { if true { panic!("{} {}", 1, 2); @@ -567,7 +568,7 @@ mod tests { #[tokio::test] async fn panic_on_executor_other() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); let dedicated_task = exec.spawn(async move { if true { panic_any(1) @@ -588,7 +589,7 @@ mod tests { let barrier = Arc::new(Barrier::new(2)); let captured = Arc::clone(&barrier); - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); let dedicated_task = exec.spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; do_work(42, captured).await @@ -606,7 +607,7 @@ mod tests { #[tokio::test] async fn executor_submit_task_after_shutdown() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); // Simulate trying to submit tasks once executor has shutdown exec.shutdown(); @@ -624,7 +625,7 @@ mod tests { #[tokio::test] async fn executor_submit_task_after_clone_shutdown() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); // shutdown the clone (but not the exec) exec.clone().join().await; @@ -644,14 +645,14 @@ mod tests { #[tokio::test] async fn executor_join() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); // test it doesn't hang exec.join().await; } #[tokio::test] async fn executor_join2() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); // test it doesn't hang exec.join().await; exec.join().await; @@ -660,7 +661,7 @@ mod tests { #[tokio::test] #[allow(clippy::redundant_clone)] async fn executor_clone_join() { - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); // test it doesn't hang exec.clone().join().await; exec.clone().join().await; @@ -670,7 +671,7 @@ mod tests { #[tokio::test] async fn drop_receiver() { // create empty executor - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); assert_eq!(exec.tasks(), 0); // create first blocked task @@ -701,7 +702,7 @@ mod tests { #[tokio::test] async fn detach_receiver() { // create empty executor - let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1); + let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap()); assert_eq!(exec.tasks(), 0); // create first task diff --git a/influxdb_iox/src/commands/compactor.rs b/influxdb_iox/src/commands/compactor.rs index 9a28a7d9e4..ecb899333e 100644 --- a/influxdb_iox/src/commands/compactor.rs +++ b/influxdb_iox/src/commands/compactor.rs @@ -10,7 +10,7 @@ use object_store::DynObjectStore; use object_store_metrics::ObjectStoreMetrics; use parquet_file::storage::{ParquetStorage, StorageId}; use snafu::prelude::*; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, num::NonZeroUsize, sync::Arc}; use crate::process_info::{setup_metric_registry, USIZE_MAX}; @@ -40,7 +40,7 @@ pub enum Command { default_value = "4", action )] - query_exec_thread_count: usize, + query_exec_thread_count: NonZeroUsize, /// Size of memory pool used during query exec, in bytes. #[clap( diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index e967c94681..2259b3576f 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -470,7 +470,7 @@ impl Config { compaction_job_concurrency: NonZeroUsize::new(1).unwrap(), compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(), compaction_partition_minute_threshold: 10, - query_exec_thread_count: Some(1), + query_exec_thread_count: Some(NonZeroUsize::new(1).unwrap()), exec_mem_pool_bytes, max_desired_file_size_bytes: 30_000, percentage_max_file_size: 30, @@ -585,7 +585,8 @@ pub async fn command(config: Config) -> Result<()> { // TODO: make num_threads a parameter (other modes have it // configured by a command line) - let num_threads = num_cpus::get(); + let num_threads = NonZeroUsize::new(num_cpus::get()) + .unwrap_or_else(|| NonZeroUsize::new(1).expect("1 is valid")); info!(%num_threads, "Creating shared query executor"); let parquet_store_real = ParquetStorage::new(Arc::clone(&object_store), StorageId::from("iox")); diff --git a/influxdb_iox/src/commands/run/compactor.rs b/influxdb_iox/src/commands/run/compactor.rs index d0ad94cb77..462f180892 100644 --- a/influxdb_iox/src/commands/run/compactor.rs +++ b/influxdb_iox/src/commands/run/compactor.rs @@ -7,6 +7,7 @@ use object_store_metrics::ObjectStoreMetrics; use observability_deps::tracing::*; use parquet_file::storage::{ParquetStorage, StorageId}; use std::collections::HashMap; +use std::num::NonZeroUsize; use std::sync::Arc; use thiserror::Error; @@ -75,7 +76,7 @@ pub struct Config { default_value = "4", action )] - pub query_exec_thread_count: usize, + pub query_exec_thread_count: NonZeroUsize, /// Size of memory pool used during query exec, in bytes. #[clap( diff --git a/influxdb_iox/src/commands/run/compactor2.rs b/influxdb_iox/src/commands/run/compactor2.rs index 26940ee7ed..eb77c3ba4b 100644 --- a/influxdb_iox/src/commands/run/compactor2.rs +++ b/influxdb_iox/src/commands/run/compactor2.rs @@ -7,6 +7,7 @@ use object_store::DynObjectStore; use object_store_metrics::ObjectStoreMetrics; use observability_deps::tracing::*; use parquet_file::storage::{ParquetStorage, StorageId}; +use std::num::NonZeroUsize; use std::sync::Arc; use thiserror::Error; @@ -109,7 +110,10 @@ pub async fn command(config: Config) -> Result<(), Error> { let num_threads = config .compactor_config .query_exec_thread_count - .unwrap_or_else(|| num_cpus::get() - 1_usize); + .unwrap_or_else(|| { + NonZeroUsize::new(num_cpus::get().saturating_sub(1)) + .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()) + }); info!(%num_threads, "using specified number of threads"); let exec = Arc::new(Executor::new_with_config(ExecutorConfig { diff --git a/influxdb_iox/src/commands/run/ingest_replica.rs b/influxdb_iox/src/commands/run/ingest_replica.rs index 1785e0b9f5..459c5fd5f1 100644 --- a/influxdb_iox/src/commands/run/ingest_replica.rs +++ b/influxdb_iox/src/commands/run/ingest_replica.rs @@ -12,7 +12,7 @@ use ioxd_common::{ }; use ioxd_ingest_replica::create_ingest_replica_server_type; use observability_deps::tracing::*; -use std::sync::Arc; +use std::{num::NonZeroUsize, sync::Arc}; use thiserror::Error; #[derive(Debug, Error)] @@ -64,7 +64,7 @@ pub struct Config { default_value = "4", action )] - pub exec_thread_count: usize, + pub exec_thread_count: NonZeroUsize, /// Size of memory pool used during query exec, in bytes. #[clap( diff --git a/influxdb_iox/src/commands/run/ingester.rs b/influxdb_iox/src/commands/run/ingester.rs index 19e9c03410..47603bce40 100644 --- a/influxdb_iox/src/commands/run/ingester.rs +++ b/influxdb_iox/src/commands/run/ingester.rs @@ -13,6 +13,7 @@ use ioxd_ingester::create_ingester_server_type; use object_store::DynObjectStore; use object_store_metrics::ObjectStoreMetrics; use observability_deps::tracing::*; +use std::num::NonZeroUsize; use std::sync::Arc; use thiserror::Error; @@ -74,7 +75,7 @@ pub struct Config { default_value = "4", action )] - pub query_exec_thread_count: usize, + pub query_exec_thread_count: NonZeroUsize, /// Size of memory pool used during query exec, in bytes. #[clap( diff --git a/influxdb_iox/src/commands/run/ingester2.rs b/influxdb_iox/src/commands/run/ingester2.rs index 6e6105669d..ac8e793144 100644 --- a/influxdb_iox/src/commands/run/ingester2.rs +++ b/influxdb_iox/src/commands/run/ingester2.rs @@ -18,7 +18,7 @@ use object_store_metrics::ObjectStoreMetrics; use observability_deps::tracing::*; use panic_logging::make_panics_fatal; use parquet_file::storage::{ParquetStorage, StorageId}; -use std::sync::Arc; +use std::{num::NonZeroUsize, sync::Arc}; use thiserror::Error; #[derive(Debug, Error)] @@ -73,7 +73,7 @@ pub struct Config { default_value = "4", action )] - pub exec_thread_count: usize, + pub exec_thread_count: NonZeroUsize, /// Size of memory pool used during query exec, in bytes. #[clap( diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index 55d1b847c0..2586593bfd 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -17,7 +17,7 @@ use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs}; use object_store::DynObjectStore; use object_store_metrics::ObjectStoreMetrics; use observability_deps::tracing::*; -use std::sync::Arc; +use std::{num::NonZeroUsize, sync::Arc}; use thiserror::Error; #[derive(Debug, Error)] @@ -93,7 +93,9 @@ pub async fn command(config: Config) -> Result<(), Error> { let time_provider = Arc::new(SystemProvider::new()); let num_query_threads = config.querier_config.num_query_threads(); - let num_threads = num_query_threads.unwrap_or_else(num_cpus::get); + let num_threads = num_query_threads.unwrap_or_else(|| { + NonZeroUsize::new(num_cpus::get()).unwrap_or_else(|| NonZeroUsize::new(1).unwrap()) + }); info!(%num_threads, "using specified number of threads per thread pool"); let rpc_write = std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok(); diff --git a/iox_query/src/exec.rs b/iox_query/src/exec.rs index 269732d3b6..8ea54d6d8e 100644 --- a/iox_query/src/exec.rs +++ b/iox_query/src/exec.rs @@ -17,7 +17,7 @@ use parquet_file::storage::StorageId; use trace::span::{SpanExt, SpanRecorder}; mod cross_rt_stream; -use std::{collections::HashMap, fmt::Display, sync::Arc}; +use std::{collections::HashMap, fmt::Display, num::NonZeroUsize, sync::Arc}; use datafusion::{ self, @@ -40,10 +40,10 @@ use self::{non_null_checker::NonNullCheckerNode, split::StreamSplitNode}; #[derive(Debug, Clone)] pub struct ExecutorConfig { /// Number of threads per thread pool - pub num_threads: usize, + pub num_threads: NonZeroUsize, /// Target parallelism for query execution - pub target_query_partitions: usize, + pub target_query_partitions: NonZeroUsize, /// Object stores pub object_stores: HashMap>, @@ -72,11 +72,11 @@ pub struct DedicatedExecutors { reorg_exec: DedicatedExecutor, /// Number of threads per thread pool - num_threads: usize, + num_threads: NonZeroUsize, } impl DedicatedExecutors { - pub fn new(num_threads: usize) -> Self { + pub fn new(num_threads: NonZeroUsize) -> Self { let query_exec = DedicatedExecutor::new("IOx Query", num_threads); let reorg_exec = DedicatedExecutor::new("IOx Reorg", num_threads); @@ -88,14 +88,18 @@ impl DedicatedExecutors { } pub fn new_testing() -> Self { + let query_exec = DedicatedExecutor::new_testing(); + let reorg_exec = DedicatedExecutor::new_testing(); + assert_eq!(query_exec.num_threads(), reorg_exec.num_threads()); + let num_threads = query_exec.num_threads(); Self { - query_exec: DedicatedExecutor::new_testing(), - reorg_exec: DedicatedExecutor::new_testing(), - num_threads: 1, + query_exec, + reorg_exec, + num_threads, } } - pub fn num_threads(&self) -> usize { + pub fn num_threads(&self) -> NonZeroUsize { self.num_threads } } @@ -133,7 +137,7 @@ pub enum ExecutorType { impl Executor { /// Creates a new executor with a two dedicated thread pools, each /// with num_threads - pub fn new(num_threads: usize, mem_pool_size: usize) -> Self { + pub fn new(num_threads: NonZeroUsize, mem_pool_size: usize) -> Self { Self::new_with_config(ExecutorConfig { num_threads, target_query_partitions: num_threads, @@ -152,8 +156,8 @@ impl Executor { /// to preserve resources. pub fn new_testing() -> Self { let config = ExecutorConfig { - num_threads: 1, - target_query_partitions: 1, + num_threads: NonZeroUsize::new(1).unwrap(), + target_query_partitions: NonZeroUsize::new(1).unwrap(), object_stores: HashMap::default(), mem_pool_size: 1024 * 1024 * 1024, // 1GB }; diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 51b69b1362..7793113465 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -54,7 +54,7 @@ use executor::DedicatedExecutor; use futures::{Stream, StreamExt, TryStreamExt}; use observability_deps::tracing::debug; use query_functions::{register_scalar_functions, selectors::register_selector_aggregates}; -use std::{convert::TryInto, fmt, sync::Arc}; +use std::{convert::TryInto, fmt, num::NonZeroUsize, sync::Arc}; use trace::{ ctx::SpanContext, span::{MetaValue, Span, SpanExt, SpanRecorder}, @@ -200,10 +200,10 @@ impl IOxSessionConfig { } /// Set execution concurrency - pub fn with_target_partitions(mut self, target_partitions: usize) -> Self { + pub fn with_target_partitions(mut self, target_partitions: NonZeroUsize) -> Self { self.session_config = self .session_config - .with_target_partitions(target_partitions); + .with_target_partitions(target_partitions.get()); self } diff --git a/iox_tests/Cargo.toml b/iox_tests/Cargo.toml index 2f38b30eb7..64a6b2f54d 100644 --- a/iox_tests/Cargo.toml +++ b/iox_tests/Cargo.toml @@ -18,7 +18,6 @@ metric = { path = "../metric" } mutable_batch_lp = { path = "../mutable_batch_lp" } object_store = "0.5.4" observability_deps = { path = "../observability_deps" } -once_cell = { version = "1.17", features = ["parking_lot"] } parquet_file = { path = "../parquet_file" } predicate = { path = "../predicate" } iox_query = { path = "../iox_query" } diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index b649eab82d..7e6c660f4f 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -27,7 +27,6 @@ use iox_time::{MockProvider, Time, TimeProvider}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use object_store::{memory::InMemory, DynObjectStore}; use observability_deps::tracing::debug; -use once_cell::sync::Lazy; use parquet_file::{ chunk::ParquetChunk, metadata::IoxMetadata, @@ -37,13 +36,9 @@ use schema::{ sort::{adjust_sort_key_columns, compute_sort_key, SortKey}, Projection, Schema, }; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, num::NonZeroUsize, sync::Arc}; use uuid::Uuid; -/// Global executor used by all test catalogs. -static GLOBAL_EXEC: Lazy> = - Lazy::new(|| Arc::new(DedicatedExecutors::new(1))); - /// Common retention period used throughout tests pub const TEST_RETENTION_PERIOD_NS: Option = Some(3_600 * 1_000_000_000); @@ -65,19 +60,21 @@ impl TestCatalog { /// All test catalogs use the same [`Executor`]. Use [`with_execs`](Self::with_execs) if you need a special or /// dedicated executor. pub fn new() -> Arc { - let exec = Arc::clone(&GLOBAL_EXEC); - - Self::with_execs(exec, 1) + let exec = Arc::new(DedicatedExecutors::new_testing()); + Self::with_execs(exec, NonZeroUsize::new(1).unwrap()) } /// Initialize with partitions - pub fn with_target_query_partitions(target_query_partitions: usize) -> Arc { - let exec = Arc::clone(&GLOBAL_EXEC); + pub fn with_target_query_partitions(target_query_partitions: NonZeroUsize) -> Arc { + let exec = Arc::new(DedicatedExecutors::new_testing()); Self::with_execs(exec, target_query_partitions) } /// Initialize with given executors and partitions - pub fn with_execs(exec: Arc, target_query_partitions: usize) -> Arc { + pub fn with_execs( + exec: Arc, + target_query_partitions: NonZeroUsize, + ) -> Arc { let metric_registry = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metric_registry))); let object_store = Arc::new(InMemory::new());