refactor: n_threads and n_target_partitions are non-zero (#7047)
* refactor: n_threads and n_target_partitions are non-zero Zero values will just panic. Prevent that earlier. * fix: typo Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --------- Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>pull/24376/head
parent
0282eb4750
commit
08578cded5
|
@ -2963,7 +2963,6 @@ dependencies = [
|
|||
"mutable_batch_lp",
|
||||
"object_store",
|
||||
"observability_deps",
|
||||
"once_cell",
|
||||
"parquet_file",
|
||||
"predicate",
|
||||
"schema",
|
||||
|
|
|
@ -58,7 +58,7 @@ pub struct Compactor2Config {
|
|||
env = "INFLUXDB_IOX_QUERY_EXEC_THREAD_COUNT",
|
||||
action
|
||||
)]
|
||||
pub query_exec_thread_count: Option<usize>,
|
||||
pub query_exec_thread_count: Option<NonZeroUsize>,
|
||||
|
||||
/// Size of memory pool used during compaction plan execution, in
|
||||
/// bytes.
|
||||
|
|
|
@ -3,7 +3,9 @@ use crate::ingester_address::IngesterAddress;
|
|||
use data_types::{IngesterMapping, ShardIndex};
|
||||
use serde::Deserialize;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{collections::HashMap, fs, io, path::PathBuf, str::FromStr, sync::Arc};
|
||||
use std::{
|
||||
collections::HashMap, fs, io, num::NonZeroUsize, path::PathBuf, str::FromStr, sync::Arc,
|
||||
};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_docs)]
|
||||
|
@ -55,7 +57,7 @@ pub struct QuerierConfig {
|
|||
env = "INFLUXDB_IOX_NUM_QUERY_THREADS",
|
||||
action
|
||||
)]
|
||||
pub num_query_threads: Option<usize>,
|
||||
pub num_query_threads: Option<NonZeroUsize>,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
///
|
||||
|
@ -273,7 +275,7 @@ pub struct QuerierConfig {
|
|||
impl QuerierConfig {
|
||||
/// Get the querier config's num query threads.
|
||||
#[must_use]
|
||||
pub fn num_query_threads(&self) -> Option<usize> {
|
||||
pub fn num_query_threads(&self) -> Option<NonZeroUsize> {
|
||||
self.num_query_threads
|
||||
}
|
||||
|
||||
|
@ -465,7 +467,10 @@ mod tests {
|
|||
let actual =
|
||||
QuerierConfig::try_parse_from(["my_binary", "--num-query-threads", "42"]).unwrap();
|
||||
|
||||
assert_eq!(actual.num_query_threads(), Some(42));
|
||||
assert_eq!(
|
||||
actual.num_query_threads(),
|
||||
Some(NonZeroUsize::new(42).unwrap())
|
||||
);
|
||||
assert!(matches!(
|
||||
actual.ingester_addresses().unwrap(),
|
||||
IngesterAddresses::None,
|
||||
|
|
|
@ -878,6 +878,8 @@ async fn update_catalog(
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
use crate::parquet_file::CompactorParquetFile;
|
||||
|
||||
use super::*;
|
||||
|
@ -1066,7 +1068,7 @@ mod tests {
|
|||
/// Create data that is pre-sorted and deduplicated
|
||||
async fn new_for_sort() -> Self {
|
||||
// Ensure we have at least run at least partitions to test cross partition merging
|
||||
let catalog = TestCatalog::with_target_query_partitions(2);
|
||||
let catalog = TestCatalog::with_target_query_partitions(NonZeroUsize::new(2).unwrap());
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let shard = ns.create_shard(1).await;
|
||||
let table = ns.create_table("table").await;
|
||||
|
|
|
@ -17,6 +17,7 @@ use once_cell::sync::Lazy;
|
|||
use parking_lot::Mutex;
|
||||
use pin_project::{pin_project, pinned_drop};
|
||||
use std::{
|
||||
num::NonZeroUsize,
|
||||
panic::AssertUnwindSafe,
|
||||
pin::Pin,
|
||||
sync::{
|
||||
|
@ -115,7 +116,7 @@ pub struct DedicatedExecutor {
|
|||
state: Arc<Mutex<State>>,
|
||||
|
||||
/// Number of threads
|
||||
num_threads: usize,
|
||||
num_threads: NonZeroUsize,
|
||||
|
||||
/// Used for testing.
|
||||
///
|
||||
|
@ -173,7 +174,7 @@ impl std::fmt::Debug for DedicatedExecutor {
|
|||
|
||||
/// [`DedicatedExecutor`] for testing purposes.
|
||||
static TESTING_EXECUTOR: Lazy<DedicatedExecutor> =
|
||||
Lazy::new(|| DedicatedExecutor::new_inner("testing", 1, true));
|
||||
Lazy::new(|| DedicatedExecutor::new_inner("testing", NonZeroUsize::new(1).unwrap(), true));
|
||||
|
||||
impl DedicatedExecutor {
|
||||
/// Creates a new `DedicatedExecutor` with a dedicated tokio
|
||||
|
@ -192,11 +193,11 @@ impl DedicatedExecutor {
|
|||
/// drop a runtime in a context where blocking is not allowed. This
|
||||
/// happens when a runtime is dropped from within an asynchronous
|
||||
/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
|
||||
pub fn new(thread_name: &str, num_threads: usize) -> Self {
|
||||
pub fn new(thread_name: &str, num_threads: NonZeroUsize) -> Self {
|
||||
Self::new_inner(thread_name, num_threads, false)
|
||||
}
|
||||
|
||||
fn new_inner(thread_name: &str, num_threads: usize, testing: bool) -> Self {
|
||||
fn new_inner(thread_name: &str, num_threads: NonZeroUsize, testing: bool) -> Self {
|
||||
let thread_name = thread_name.to_string();
|
||||
let thread_counter = Arc::new(AtomicUsize::new(1));
|
||||
|
||||
|
@ -215,7 +216,7 @@ impl DedicatedExecutor {
|
|||
thread_counter.fetch_add(1, Ordering::SeqCst)
|
||||
)
|
||||
})
|
||||
.worker_threads(num_threads)
|
||||
.worker_threads(num_threads.get())
|
||||
.on_thread_start(move || set_current_thread_priority(WORKER_PRIORITY))
|
||||
.build()
|
||||
.expect("Creating tokio runtime");
|
||||
|
@ -267,7 +268,7 @@ impl DedicatedExecutor {
|
|||
}
|
||||
|
||||
/// Number of threads that back this executor.
|
||||
pub fn num_threads(&self) -> usize {
|
||||
pub fn num_threads(&self) -> NonZeroUsize {
|
||||
self.num_threads
|
||||
}
|
||||
|
||||
|
@ -409,7 +410,7 @@ mod tests {
|
|||
async fn basic() {
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
let dedicated_task = exec.spawn(do_work(42, Arc::clone(&barrier)));
|
||||
|
||||
// Note the dedicated task will never complete if it runs on
|
||||
|
@ -427,7 +428,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn basic_clone() {
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
// Run task on clone should work fine
|
||||
let dedicated_task = exec.clone().spawn(do_work(42, Arc::clone(&barrier)));
|
||||
barrier.wait();
|
||||
|
@ -439,7 +440,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn drop_clone() {
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
|
||||
drop(exec.clone());
|
||||
|
||||
|
@ -461,7 +462,7 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
let _s = S(exec);
|
||||
|
||||
// this must not lead to a double-panic and SIGILL
|
||||
|
@ -473,7 +474,7 @@ mod tests {
|
|||
let barrier = Arc::new(Barrier::new(3));
|
||||
|
||||
// make an executor with two threads
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(2).unwrap());
|
||||
let dedicated_task1 = exec.spawn(do_work(11, Arc::clone(&barrier)));
|
||||
let dedicated_task2 = exec.spawn(do_work(42, Arc::clone(&barrier)));
|
||||
|
||||
|
@ -489,7 +490,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn worker_priority() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(2).unwrap());
|
||||
|
||||
let dedicated_task = exec.spawn(async move { get_current_thread_priority() });
|
||||
|
||||
|
@ -500,7 +501,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn tokio_spawn() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 2);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(2).unwrap());
|
||||
|
||||
// spawn a task that spawns to other tasks and ensure they run on the dedicated
|
||||
// executor
|
||||
|
@ -528,7 +529,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn panic_on_executor_str() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
let dedicated_task = exec.spawn(async move {
|
||||
if true {
|
||||
panic!("At the disco, on the dedicated task scheduler");
|
||||
|
@ -549,7 +550,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn panic_on_executor_string() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
let dedicated_task = exec.spawn(async move {
|
||||
if true {
|
||||
panic!("{} {}", 1, 2);
|
||||
|
@ -567,7 +568,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn panic_on_executor_other() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
let dedicated_task = exec.spawn(async move {
|
||||
if true {
|
||||
panic_any(1)
|
||||
|
@ -588,7 +589,7 @@ mod tests {
|
|||
let barrier = Arc::new(Barrier::new(2));
|
||||
let captured = Arc::clone(&barrier);
|
||||
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
let dedicated_task = exec.spawn(async move {
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
|
||||
do_work(42, captured).await
|
||||
|
@ -606,7 +607,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn executor_submit_task_after_shutdown() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
|
||||
// Simulate trying to submit tasks once executor has shutdown
|
||||
exec.shutdown();
|
||||
|
@ -624,7 +625,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn executor_submit_task_after_clone_shutdown() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
|
||||
// shutdown the clone (but not the exec)
|
||||
exec.clone().join().await;
|
||||
|
@ -644,14 +645,14 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn executor_join() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
// test it doesn't hang
|
||||
exec.join().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executor_join2() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
// test it doesn't hang
|
||||
exec.join().await;
|
||||
exec.join().await;
|
||||
|
@ -660,7 +661,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
#[allow(clippy::redundant_clone)]
|
||||
async fn executor_clone_join() {
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
// test it doesn't hang
|
||||
exec.clone().join().await;
|
||||
exec.clone().join().await;
|
||||
|
@ -670,7 +671,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn drop_receiver() {
|
||||
// create empty executor
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
assert_eq!(exec.tasks(), 0);
|
||||
|
||||
// create first blocked task
|
||||
|
@ -701,7 +702,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn detach_receiver() {
|
||||
// create empty executor
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
|
||||
let exec = DedicatedExecutor::new("Test DedicatedExecutor", NonZeroUsize::new(1).unwrap());
|
||||
assert_eq!(exec.tasks(), 0);
|
||||
|
||||
// create first task
|
||||
|
|
|
@ -10,7 +10,7 @@ use object_store::DynObjectStore;
|
|||
use object_store_metrics::ObjectStoreMetrics;
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use snafu::prelude::*;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};
|
||||
|
||||
use crate::process_info::{setup_metric_registry, USIZE_MAX};
|
||||
|
||||
|
@ -40,7 +40,7 @@ pub enum Command {
|
|||
default_value = "4",
|
||||
action
|
||||
)]
|
||||
query_exec_thread_count: usize,
|
||||
query_exec_thread_count: NonZeroUsize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
|
|
|
@ -470,7 +470,7 @@ impl Config {
|
|||
compaction_job_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
|
||||
compaction_partition_minute_threshold: 10,
|
||||
query_exec_thread_count: Some(1),
|
||||
query_exec_thread_count: Some(NonZeroUsize::new(1).unwrap()),
|
||||
exec_mem_pool_bytes,
|
||||
max_desired_file_size_bytes: 30_000,
|
||||
percentage_max_file_size: 30,
|
||||
|
@ -585,7 +585,8 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
|
||||
// TODO: make num_threads a parameter (other modes have it
|
||||
// configured by a command line)
|
||||
let num_threads = num_cpus::get();
|
||||
let num_threads = NonZeroUsize::new(num_cpus::get())
|
||||
.unwrap_or_else(|| NonZeroUsize::new(1).expect("1 is valid"));
|
||||
info!(%num_threads, "Creating shared query executor");
|
||||
|
||||
let parquet_store_real = ParquetStorage::new(Arc::clone(&object_store), StorageId::from("iox"));
|
||||
|
|
|
@ -7,6 +7,7 @@ use object_store_metrics::ObjectStoreMetrics;
|
|||
use observability_deps::tracing::*;
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -75,7 +76,7 @@ pub struct Config {
|
|||
default_value = "4",
|
||||
action
|
||||
)]
|
||||
pub query_exec_thread_count: usize,
|
||||
pub query_exec_thread_count: NonZeroUsize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
|
|
|
@ -7,6 +7,7 @@ use object_store::DynObjectStore;
|
|||
use object_store_metrics::ObjectStoreMetrics;
|
||||
use observability_deps::tracing::*;
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -109,7 +110,10 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
let num_threads = config
|
||||
.compactor_config
|
||||
.query_exec_thread_count
|
||||
.unwrap_or_else(|| num_cpus::get() - 1_usize);
|
||||
.unwrap_or_else(|| {
|
||||
NonZeroUsize::new(num_cpus::get().saturating_sub(1))
|
||||
.unwrap_or_else(|| NonZeroUsize::new(1).unwrap())
|
||||
});
|
||||
info!(%num_threads, "using specified number of threads");
|
||||
|
||||
let exec = Arc::new(Executor::new_with_config(ExecutorConfig {
|
||||
|
|
|
@ -12,7 +12,7 @@ use ioxd_common::{
|
|||
};
|
||||
use ioxd_ingest_replica::create_ingest_replica_server_type;
|
||||
use observability_deps::tracing::*;
|
||||
use std::sync::Arc;
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -64,7 +64,7 @@ pub struct Config {
|
|||
default_value = "4",
|
||||
action
|
||||
)]
|
||||
pub exec_thread_count: usize,
|
||||
pub exec_thread_count: NonZeroUsize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
|
|
|
@ -13,6 +13,7 @@ use ioxd_ingester::create_ingester_server_type;
|
|||
use object_store::DynObjectStore;
|
||||
use object_store_metrics::ObjectStoreMetrics;
|
||||
use observability_deps::tracing::*;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -74,7 +75,7 @@ pub struct Config {
|
|||
default_value = "4",
|
||||
action
|
||||
)]
|
||||
pub query_exec_thread_count: usize,
|
||||
pub query_exec_thread_count: NonZeroUsize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
|
|
|
@ -18,7 +18,7 @@ use object_store_metrics::ObjectStoreMetrics;
|
|||
use observability_deps::tracing::*;
|
||||
use panic_logging::make_panics_fatal;
|
||||
use parquet_file::storage::{ParquetStorage, StorageId};
|
||||
use std::sync::Arc;
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -73,7 +73,7 @@ pub struct Config {
|
|||
default_value = "4",
|
||||
action
|
||||
)]
|
||||
pub exec_thread_count: usize,
|
||||
pub exec_thread_count: NonZeroUsize,
|
||||
|
||||
/// Size of memory pool used during query exec, in bytes.
|
||||
#[clap(
|
||||
|
|
|
@ -17,7 +17,7 @@ use ioxd_querier::{create_querier_server_type, QuerierServerTypeArgs};
|
|||
use object_store::DynObjectStore;
|
||||
use object_store_metrics::ObjectStoreMetrics;
|
||||
use observability_deps::tracing::*;
|
||||
use std::sync::Arc;
|
||||
use std::{num::NonZeroUsize, sync::Arc};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -93,7 +93,9 @@ pub async fn command(config: Config) -> Result<(), Error> {
|
|||
let time_provider = Arc::new(SystemProvider::new());
|
||||
|
||||
let num_query_threads = config.querier_config.num_query_threads();
|
||||
let num_threads = num_query_threads.unwrap_or_else(num_cpus::get);
|
||||
let num_threads = num_query_threads.unwrap_or_else(|| {
|
||||
NonZeroUsize::new(num_cpus::get()).unwrap_or_else(|| NonZeroUsize::new(1).unwrap())
|
||||
});
|
||||
info!(%num_threads, "using specified number of threads per thread pool");
|
||||
|
||||
let rpc_write = std::env::var("INFLUXDB_IOX_RPC_MODE").is_ok();
|
||||
|
|
|
@ -17,7 +17,7 @@ use parquet_file::storage::StorageId;
|
|||
use trace::span::{SpanExt, SpanRecorder};
|
||||
mod cross_rt_stream;
|
||||
|
||||
use std::{collections::HashMap, fmt::Display, sync::Arc};
|
||||
use std::{collections::HashMap, fmt::Display, num::NonZeroUsize, sync::Arc};
|
||||
|
||||
use datafusion::{
|
||||
self,
|
||||
|
@ -40,10 +40,10 @@ use self::{non_null_checker::NonNullCheckerNode, split::StreamSplitNode};
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct ExecutorConfig {
|
||||
/// Number of threads per thread pool
|
||||
pub num_threads: usize,
|
||||
pub num_threads: NonZeroUsize,
|
||||
|
||||
/// Target parallelism for query execution
|
||||
pub target_query_partitions: usize,
|
||||
pub target_query_partitions: NonZeroUsize,
|
||||
|
||||
/// Object stores
|
||||
pub object_stores: HashMap<StorageId, Arc<DynObjectStore>>,
|
||||
|
@ -72,11 +72,11 @@ pub struct DedicatedExecutors {
|
|||
reorg_exec: DedicatedExecutor,
|
||||
|
||||
/// Number of threads per thread pool
|
||||
num_threads: usize,
|
||||
num_threads: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl DedicatedExecutors {
|
||||
pub fn new(num_threads: usize) -> Self {
|
||||
pub fn new(num_threads: NonZeroUsize) -> Self {
|
||||
let query_exec = DedicatedExecutor::new("IOx Query", num_threads);
|
||||
let reorg_exec = DedicatedExecutor::new("IOx Reorg", num_threads);
|
||||
|
||||
|
@ -88,14 +88,18 @@ impl DedicatedExecutors {
|
|||
}
|
||||
|
||||
pub fn new_testing() -> Self {
|
||||
let query_exec = DedicatedExecutor::new_testing();
|
||||
let reorg_exec = DedicatedExecutor::new_testing();
|
||||
assert_eq!(query_exec.num_threads(), reorg_exec.num_threads());
|
||||
let num_threads = query_exec.num_threads();
|
||||
Self {
|
||||
query_exec: DedicatedExecutor::new_testing(),
|
||||
reorg_exec: DedicatedExecutor::new_testing(),
|
||||
num_threads: 1,
|
||||
query_exec,
|
||||
reorg_exec,
|
||||
num_threads,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn num_threads(&self) -> usize {
|
||||
pub fn num_threads(&self) -> NonZeroUsize {
|
||||
self.num_threads
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +137,7 @@ pub enum ExecutorType {
|
|||
impl Executor {
|
||||
/// Creates a new executor with a two dedicated thread pools, each
|
||||
/// with num_threads
|
||||
pub fn new(num_threads: usize, mem_pool_size: usize) -> Self {
|
||||
pub fn new(num_threads: NonZeroUsize, mem_pool_size: usize) -> Self {
|
||||
Self::new_with_config(ExecutorConfig {
|
||||
num_threads,
|
||||
target_query_partitions: num_threads,
|
||||
|
@ -152,8 +156,8 @@ impl Executor {
|
|||
/// to preserve resources.
|
||||
pub fn new_testing() -> Self {
|
||||
let config = ExecutorConfig {
|
||||
num_threads: 1,
|
||||
target_query_partitions: 1,
|
||||
num_threads: NonZeroUsize::new(1).unwrap(),
|
||||
target_query_partitions: NonZeroUsize::new(1).unwrap(),
|
||||
object_stores: HashMap::default(),
|
||||
mem_pool_size: 1024 * 1024 * 1024, // 1GB
|
||||
};
|
||||
|
|
|
@ -54,7 +54,7 @@ use executor::DedicatedExecutor;
|
|||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use observability_deps::tracing::debug;
|
||||
use query_functions::{register_scalar_functions, selectors::register_selector_aggregates};
|
||||
use std::{convert::TryInto, fmt, sync::Arc};
|
||||
use std::{convert::TryInto, fmt, num::NonZeroUsize, sync::Arc};
|
||||
use trace::{
|
||||
ctx::SpanContext,
|
||||
span::{MetaValue, Span, SpanExt, SpanRecorder},
|
||||
|
@ -200,10 +200,10 @@ impl IOxSessionConfig {
|
|||
}
|
||||
|
||||
/// Set execution concurrency
|
||||
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
|
||||
pub fn with_target_partitions(mut self, target_partitions: NonZeroUsize) -> Self {
|
||||
self.session_config = self
|
||||
.session_config
|
||||
.with_target_partitions(target_partitions);
|
||||
.with_target_partitions(target_partitions.get());
|
||||
self
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ metric = { path = "../metric" }
|
|||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||
object_store = "0.5.4"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
once_cell = { version = "1.17", features = ["parking_lot"] }
|
||||
parquet_file = { path = "../parquet_file" }
|
||||
predicate = { path = "../predicate" }
|
||||
iox_query = { path = "../iox_query" }
|
||||
|
|
|
@ -27,7 +27,6 @@ use iox_time::{MockProvider, Time, TimeProvider};
|
|||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use object_store::{memory::InMemory, DynObjectStore};
|
||||
use observability_deps::tracing::debug;
|
||||
use once_cell::sync::Lazy;
|
||||
use parquet_file::{
|
||||
chunk::ParquetChunk,
|
||||
metadata::IoxMetadata,
|
||||
|
@ -37,13 +36,9 @@ use schema::{
|
|||
sort::{adjust_sort_key_columns, compute_sort_key, SortKey},
|
||||
Projection, Schema,
|
||||
};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Global executor used by all test catalogs.
|
||||
static GLOBAL_EXEC: Lazy<Arc<DedicatedExecutors>> =
|
||||
Lazy::new(|| Arc::new(DedicatedExecutors::new(1)));
|
||||
|
||||
/// Common retention period used throughout tests
|
||||
pub const TEST_RETENTION_PERIOD_NS: Option<i64> = Some(3_600 * 1_000_000_000);
|
||||
|
||||
|
@ -65,19 +60,21 @@ impl TestCatalog {
|
|||
/// All test catalogs use the same [`Executor`]. Use [`with_execs`](Self::with_execs) if you need a special or
|
||||
/// dedicated executor.
|
||||
pub fn new() -> Arc<Self> {
|
||||
let exec = Arc::clone(&GLOBAL_EXEC);
|
||||
|
||||
Self::with_execs(exec, 1)
|
||||
let exec = Arc::new(DedicatedExecutors::new_testing());
|
||||
Self::with_execs(exec, NonZeroUsize::new(1).unwrap())
|
||||
}
|
||||
|
||||
/// Initialize with partitions
|
||||
pub fn with_target_query_partitions(target_query_partitions: usize) -> Arc<Self> {
|
||||
let exec = Arc::clone(&GLOBAL_EXEC);
|
||||
pub fn with_target_query_partitions(target_query_partitions: NonZeroUsize) -> Arc<Self> {
|
||||
let exec = Arc::new(DedicatedExecutors::new_testing());
|
||||
Self::with_execs(exec, target_query_partitions)
|
||||
}
|
||||
|
||||
/// Initialize with given executors and partitions
|
||||
pub fn with_execs(exec: Arc<DedicatedExecutors>, target_query_partitions: usize) -> Arc<Self> {
|
||||
pub fn with_execs(
|
||||
exec: Arc<DedicatedExecutors>,
|
||||
target_query_partitions: NonZeroUsize,
|
||||
) -> Arc<Self> {
|
||||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
|
||||
let object_store = Arc::new(InMemory::new());
|
||||
|
|
Loading…
Reference in New Issue