diff --git a/cache_system/src/backend/policy/lru.rs b/cache_system/src/backend/policy/lru.rs index 3082eeb382..4f5c9abfad 100644 --- a/cache_system/src/backend/policy/lru.rs +++ b/cache_system/src/backend/policy/lru.rs @@ -419,7 +419,8 @@ where /// Notification when the background worker is idle, so tests know that the state has converged and that they can /// continue working. #[allow(dead_code)] - notify_idle_test_side: tokio::sync::mpsc::UnboundedSender<tokio::sync::oneshot::Sender<()>>, + notify_idle_test_side: + tokio::sync::mpsc::UnboundedSender<futures::channel::oneshot::Sender<()>>, } impl<S> ResourcePool<S> @@ -519,7 +520,7 @@ where /// # Panic /// Panics if the background worker is not idle within 5s or if the worker died. pub async fn wait_converged(&self) { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = futures::channel::oneshot::channel(); self.notify_idle_test_side .send(tx) .expect("background worker alive"); @@ -885,7 +886,7 @@ mod it { async fn clean_up_loop<S>( shared: Arc<SharedState<S>>, mut notify_idle_worker_side: tokio::sync::mpsc::UnboundedReceiver< - tokio::sync::oneshot::Sender<()>, + futures::channel::oneshot::Sender<()>, >, ) where S: Resource, diff --git a/cache_system/src/cache/driver.rs b/cache_system/src/cache/driver.rs index 57de0e344b..2b5e17ba06 100644 --- a/cache_system/src/cache/driver.rs +++ b/cache_system/src/cache/driver.rs @@ -7,13 +7,13 @@ use crate::{ }; use async_trait::async_trait; use futures::{ + channel::oneshot::{channel, Canceled, Sender}, future::{BoxFuture, Shared}, FutureExt, TryFutureExt, }; use observability_deps::tracing::debug; use parking_lot::Mutex; use std::{collections::HashMap, fmt::Debug, future::Future, sync::Arc}; -use tokio::sync::oneshot::{error::RecvError, Sender}; use super::{Cache, CacheGetStatus, CachePeekStatus}; @@ -55,13 +55,13 @@ where CancellationSafeFuture<impl Future<Output = ()>>, SharedReceiver<B::V>, ) { - let (tx_main, rx_main) = tokio::sync::oneshot::channel(); + let (tx_main, rx_main) = channel(); let receiver = rx_main .map_ok(|v| Arc::new(Mutex::new(v))) .map_err(Arc::new) .boxed() .shared(); - let (tx_set, rx_set) = tokio::sync::oneshot::channel(); + let (tx_set, rx_set) = channel(); // generate unique tag let tag = state.tag_counter; @@ -352,7 +352,7 @@ where /// - `BoxFuture`: The transformation from `Result<V, RecvError>` to `Result<Arc<Mutex<V>>, /// Arc<RecvError>>` results in a kinda messy type and we wanna erase that. /// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places. -type SharedReceiver<V> = Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<RecvError>>>>; +type SharedReceiver<V> = Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<Canceled>>>>; /// Retrieve data from shared receiver. async fn retrieve_from_shared<V>(receiver: SharedReceiver<V>) -> V diff --git a/cache_system/src/loader/batch.rs b/cache_system/src/loader/batch.rs index 6915711714..4d30196a0d 100644 --- a/cache_system/src/loader/batch.rs +++ b/cache_system/src/loader/batch.rs @@ -12,10 +12,12 @@ use std::{ }; use async_trait::async_trait; -use futures::FutureExt; +use futures::{ + channel::oneshot::{channel, Sender}, + FutureExt, +}; use observability_deps::tracing::trace; use parking_lot::Mutex; -use tokio::sync::oneshot::{channel, Sender}; use crate::cancellation_safe_future::{CancellationSafeFuture, CancellationSafeFutureReceiver};