refactor: cancel executor jobs on drop

Our executor is not meant as a fire-and-forget system. Instead the
submitter should always poll the result. Dropping the receiver side (aka
the job handle) should cancel the job.
pull/24376/head
Marco Neumann 2021-10-11 10:03:19 +02:00
parent 470de36f3b
commit 24ae269b3a
3 changed files with 89 additions and 7 deletions

2
Cargo.lock generated
View File

@ -3152,6 +3152,7 @@ dependencies = [
"libc",
"observability_deps",
"parking_lot",
"pin-project",
"predicate",
"regex",
"schema",
@ -3159,6 +3160,7 @@ dependencies = [
"test_helpers",
"tokio",
"tokio-stream",
"tokio-util",
"trace",
]

View File

@ -26,11 +26,13 @@ futures = "0.3"
hashbrown = "0.11"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2"
pin-project = "1.0"
regex = "1"
schema = { path = "../schema" }
snafu = "0.6.9"
tokio = { version = "1.11", features = ["macros"] }
tokio-stream = "0.1.2"
tokio-util = { version = "0.6.3" }
trace = { path = "../trace" }
predicate = { path = "../predicate" }

View File

@ -2,20 +2,68 @@
//! intensive" workloads such as DataFusion plans
use parking_lot::Mutex;
use pin_project::{pin_project, pinned_drop};
use std::{pin::Pin, sync::Arc};
use tokio::sync::oneshot::Receiver;
use tokio_util::sync::CancellationToken;
use futures::Future;
use observability_deps::tracing::warn;
/// The type of thing that the dedicated executor runs
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
/// Task that can be added to the executor-internal queue.
///
/// Every task within the executor is represented by a [`Job`] that can be polled by the API user.
struct Task {
fut: Pin<Box<dyn Future<Output = ()> + Send>>,
cancel: CancellationToken,
}
impl Task {
/// Run task.
///
/// This runs the payload or cancels if the linked [`Job`] is dropped.
async fn run(self) {
tokio::select! {
_ = self.cancel.cancelled() => (),
_ = self.fut => (),
}
}
}
/// The type of error that is returned from tasks in this module
#[allow(dead_code)]
pub type Error = tokio::sync::oneshot::error::RecvError;
/// Job within the executor.
///
/// Dropping the job will cancel its linked task.
#[pin_project(PinnedDrop)]
pub struct Job<T> {
cancel: CancellationToken,
#[pin]
rx: Receiver<T>,
}
impl<T> Future for Job<T> {
type Output = Result<T, Error>;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
this.rx.poll(cx)
}
}
#[pinned_drop]
impl<T> PinnedDrop for Job<T> {
fn drop(self: Pin<&mut Self>) {
self.cancel.cancel();
}
}
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by
/// them) on a separate tokio Executor
#[derive(Clone)]
@ -86,7 +134,7 @@ impl DedicatedExecutor {
let handle = join.read_owned().await;
tokio::task::spawn(async move {
task.await;
task.run().await;
std::mem::drop(handle);
});
}
@ -111,30 +159,35 @@ impl DedicatedExecutor {
///
/// Currently all tasks are added to the tokio executor
/// immediately and compete for the threadpool's resources.
pub fn spawn<T>(&self, task: T) -> Receiver<T::Output>
pub fn spawn<T>(&self, task: T) -> Job<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
let job = Box::pin(async move {
let fut = Box::pin(async move {
let task_output = task.await;
if tx.send(task_output).is_err() {
warn!("Spawned task output ignored: receiver dropped")
}
});
let cancel = CancellationToken::new();
let task = Task {
fut,
cancel: cancel.clone(),
};
let mut state = self.state.lock();
if let Some(requests) = &mut state.requests {
// would fail if someone has started shutdown
requests.send(job).ok();
requests.send(task).ok();
} else {
warn!("tried to schedule task on an executor that was shutdown");
}
rx
Job { rx, cancel }
}
/// signals shutdown of this executor and any Clones
@ -185,6 +238,7 @@ fn set_current_thread_priority(prio: i32) {
mod tests {
use super::*;
use std::sync::{Arc, Barrier};
use tokio::sync::Barrier as AsyncBarrier;
#[cfg(unix)]
fn get_current_thread_priority() -> i32 {
@ -352,9 +406,33 @@ mod tests {
exec.join();
}
#[tokio::test]
async fn drop_receiver() {
let barrier1 = Arc::new(AsyncBarrier::new(2));
let barrier2 = Arc::new(AsyncBarrier::new(2));
let exec = DedicatedExecutor::new("Test DedicatedExecutor", 1);
let dedicated_task1 = exec.spawn(do_work_async(11, Arc::clone(&barrier1)));
let dedicated_task2 = exec.spawn(do_work_async(22, Arc::clone(&barrier2)));
drop(dedicated_task1);
barrier2.wait().await;
assert_eq!(dedicated_task2.await.unwrap(), 22);
exec.join()
}
/// Wait for the barrier and then return `result`
async fn do_work(result: usize, barrier: Arc<Barrier>) -> usize {
barrier.wait();
result
}
/// Wait for the barrier and then return `result`
async fn do_work_async(result: usize, barrier: Arc<AsyncBarrier>) -> usize {
barrier.wait().await;
result
}
}