feat: shut down executor when `Executor` is dropped

pull/24376/head
Marco Neumann 2021-08-24 14:38:00 +02:00
parent 3fdc0e9a6a
commit 4f23d3b60b
3 changed files with 40 additions and 12 deletions

View File

@ -81,13 +81,17 @@ impl Executor {
}
}
/// Return a new execution config, suitable for executing a new query or system task
/// Return a new execution config, suitable for executing a new query or system task.
///
/// Note that this context (and all its clones) will be shut down once `Executor` is dropped.
pub fn new_execution_config(&self, executor_type: ExecutorType) -> IOxExecutionConfig {
let exec = self.executor(executor_type).clone();
IOxExecutionConfig::new(exec).with_concurrency(self.config.concurrency)
}
/// Create a new execution context, suitable for executing a new query or system task
///
/// Note that this context (and all its clones) will be shut down once `Executor` is dropped.
pub fn new_context(&self, executor_type: ExecutorType) -> IOxExecutionContext {
self.new_execution_config(executor_type).build()
}
@ -99,6 +103,23 @@ impl Executor {
ExecutorType::Reorg => &self.reorg_exec,
}
}
/// Stops all subsequent task executions, and waits for the worker
/// thread to complete. Note this will shutdown all created contexts.
///
/// Only the first all to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
pub fn join(&self) {
self.query_exec.join();
self.reorg_exec.join();
}
}
impl Drop for Executor {
fn drop(&mut self) {
self.join();
}
}
/// Create a SchemaPivot node which an arbitrary input like
@ -176,7 +197,8 @@ mod tests {
let expected_strings = to_set(&["Foo", "Bar"]);
let plan = StringSetPlan::Known(Arc::clone(&expected_strings));
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let result_strings = ctx.to_string_set(plan).await.unwrap();
assert_eq!(result_strings, expected_strings);
}
@ -188,7 +210,8 @@ mod tests {
let scan = make_plan(schema, vec![]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, StringSetRef::new(StringSet::new()));
@ -203,7 +226,8 @@ mod tests {
let scan = make_plan(batch.schema(), vec![batch]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
@ -222,7 +246,8 @@ mod tests {
let scan = make_plan(schema, vec![batch1, batch2]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
@ -245,7 +270,8 @@ mod tests {
let plan: StringSetPlan = vec![scan1, scan2].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.unwrap();
assert_eq!(results, to_set(&["foo", "bar", "baz"]));
@ -265,7 +291,8 @@ mod tests {
let scan = make_plan(schema, vec![batch]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await;
let actual_error = match results {
@ -290,7 +317,8 @@ mod tests {
let scan = make_plan(batch.schema(), vec![batch]);
let plan: StringSetPlan = vec![scan].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await;
let actual_error = match results {
@ -321,7 +349,8 @@ mod tests {
let pivot = make_schema_pivot(scan);
let plan = vec![pivot].into();
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let results = ctx.to_string_set(plan).await.expect("Executed plan");
assert_eq!(results, to_set(&["f1", "f2"]));

View File

@ -141,7 +141,6 @@ impl DedicatedExecutor {
}
/// signals shutdown of this executor and any Clones
#[allow(dead_code)] // https://github.com/influxdata/influxdb_iox/issues/2372
pub fn shutdown(&self) {
// hang up the channel which will cause the dedicated thread
// to quit
@ -156,7 +155,6 @@ impl DedicatedExecutor {
/// Only the first all to `join` will actually wait for the
/// executing thread to complete. All other calls to join will
/// complete immediately.
#[allow(dead_code)] // https://github.com/influxdata/influxdb_iox/issues/2372
pub fn join(&self) {
self.shutdown();

View File

@ -218,7 +218,8 @@ mod tests {
let expected_ss = to_string_set(&["foo", "bar", "baz", "from_a_plan"]).into();
assert!(matches!(plan, StringSetPlan::Plan(_)));
let ctx = Executor::new(1).new_context(ExecutorType::Query);
let exec = Executor::new(1);
let ctx = exec.new_context(ExecutorType::Query);
let ss = ctx.to_string_set(plan).await.unwrap();
assert_eq!(ss, expected_ss);
}