refactor: replace tokio `oneshot` w/ futures `oneshot` (#8713)

Tokio oneshots have A LOT of overhead:

61042b4d90/tokio/src/sync/oneshot.rs (L1091-L1097)

For a particular case that I've debugged (https://github.com/influxdata/EAR/issues/4505),
that change alone decreases the "cold" query time from 16s to 11s.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-09-12 10:29:18 +02:00 committed by GitHub
parent ce34d4ffa3
commit d31f54754f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 9 deletions

View File

@ -419,7 +419,8 @@ where
/// Notification when the background worker is idle, so tests know that the state has converged and that they can
/// continue working.
#[allow(dead_code)]
notify_idle_test_side: tokio::sync::mpsc::UnboundedSender<tokio::sync::oneshot::Sender<()>>,
notify_idle_test_side:
tokio::sync::mpsc::UnboundedSender<futures::channel::oneshot::Sender<()>>,
}
impl<S> ResourcePool<S>
@ -519,7 +520,7 @@ where
/// # Panic
/// Panics if the background worker is not idle within 5s or if the worker died.
pub async fn wait_converged(&self) {
let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = futures::channel::oneshot::channel();
self.notify_idle_test_side
.send(tx)
.expect("background worker alive");
@ -885,7 +886,7 @@ mod it {
async fn clean_up_loop<S>(
shared: Arc<SharedState<S>>,
mut notify_idle_worker_side: tokio::sync::mpsc::UnboundedReceiver<
tokio::sync::oneshot::Sender<()>,
futures::channel::oneshot::Sender<()>,
>,
) where
S: Resource,

View File

@ -7,13 +7,13 @@ use crate::{
};
use async_trait::async_trait;
use futures::{
channel::oneshot::{channel, Canceled, Sender},
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use std::{collections::HashMap, fmt::Debug, future::Future, sync::Arc};
use tokio::sync::oneshot::{error::RecvError, Sender};
use super::{Cache, CacheGetStatus, CachePeekStatus};
@ -55,13 +55,13 @@ where
CancellationSafeFuture<impl Future<Output = ()>>,
SharedReceiver<B::V>,
) {
let (tx_main, rx_main) = tokio::sync::oneshot::channel();
let (tx_main, rx_main) = channel();
let receiver = rx_main
.map_ok(|v| Arc::new(Mutex::new(v)))
.map_err(Arc::new)
.boxed()
.shared();
let (tx_set, rx_set) = tokio::sync::oneshot::channel();
let (tx_set, rx_set) = channel();
// generate unique tag
let tag = state.tag_counter;
@ -352,7 +352,7 @@ where
/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to `Result<Arc<Mutex<V>>,
/// Arc<RecvError>>` results in a kinda messy type and we wanna erase that.
/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places.
type SharedReceiver<V> = Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<RecvError>>>>;
type SharedReceiver<V> = Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<Canceled>>>>;
/// Retrieve data from shared receiver.
async fn retrieve_from_shared<V>(receiver: SharedReceiver<V>) -> V

View File

@ -12,10 +12,12 @@ use std::{
};
use async_trait::async_trait;
use futures::FutureExt;
use futures::{
channel::oneshot::{channel, Sender},
FutureExt,
};
use observability_deps::tracing::trace;
use parking_lot::Mutex;
use tokio::sync::oneshot::{channel, Sender};
use crate::cancellation_safe_future::{CancellationSafeFuture, CancellationSafeFutureReceiver};