refactor: clarify async/runtime requirement in cache system (#5483)

Instead of using a "fake async" function to ensure that we have a
running tokio runtime, use an explicit handle.
pull/24376/head
Marco Neumann 2022-08-29 09:51:25 +00:00 committed by GitHub
parent 05e599228b
commit 1b230d9291
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 85 deletions

View File

@ -4,7 +4,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use iox_time::{MockProvider, Time};
use parking_lot::Mutex;
use tokio::sync::Notify;
use tokio::{runtime::Handle, sync::Notify};
use crate::{
backend::{policy::refresh::test_util::NotifyExt, CacheBackend},
@ -31,7 +31,7 @@ async fn test_refresh_can_prevent_expiration() {
loader,
notify_idle,
..
} = TestStateTtlAndRefresh::new().await;
} = TestStateTtlAndRefresh::new();
loader.mock_next(1, String::from("foo"));
refresh_duration_provider.set_refresh_in(1, String::from("a"), Some(Duration::from_secs(1)));
@ -65,7 +65,7 @@ async fn test_refresh_sets_new_expiration_after_it_finishes() {
loader,
notify_idle,
..
} = TestStateTtlAndRefresh::new().await;
} = TestStateTtlAndRefresh::new();
let barrier = loader.block_next(1, String::from("foo"));
refresh_duration_provider.set_refresh_in(1, String::from("a"), Some(Duration::from_secs(1)));
@ -104,7 +104,7 @@ async fn test_if_refresh_to_slow_then_expire() {
loader,
notify_idle,
..
} = TestStateTtlAndRefresh::new().await;
} = TestStateTtlAndRefresh::new();
let _barrier = loader.block_next(1, String::from("foo"));
refresh_duration_provider.set_refresh_in(1, String::from("a"), Some(Duration::from_secs(1)));
@ -134,7 +134,7 @@ async fn test_refresh_can_trigger_lru_eviction() {
notify_idle,
pool,
..
} = TestStateLRUAndRefresh::new().await;
} = TestStateLRUAndRefresh::new();
assert_eq!(pool.limit(), TestSize(10));
@ -219,7 +219,7 @@ struct TestStateTtlAndRefresh {
}
impl TestStateTtlAndRefresh {
async fn new() -> Self {
fn new() -> Self {
let refresh_duration_provider = Arc::new(TestRefreshDurationProvider::new());
let ttl_provider = Arc::new(TestTtlProvider::new());
let time_provider = Arc::new(MockProvider::new(Time::MIN));
@ -228,17 +228,15 @@ impl TestStateTtlAndRefresh {
let notify_idle = Arc::new(Notify::new());
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
backend.add_policy(
RefreshPolicy::new_inner(
Arc::clone(&refresh_duration_provider) as _,
Arc::clone(&time_provider) as _,
Arc::clone(&loader) as _,
"my_cache",
&metric_registry,
Arc::clone(&notify_idle),
)
.await,
);
backend.add_policy(RefreshPolicy::new_inner(
Arc::clone(&refresh_duration_provider) as _,
Arc::clone(&time_provider) as _,
Arc::clone(&loader) as _,
"my_cache",
&metric_registry,
Arc::clone(&notify_idle),
&Handle::current(),
));
backend.add_policy(TtlPolicy::new(
Arc::clone(&ttl_provider) as _,
Arc::clone(&time_provider) as _,
@ -269,7 +267,7 @@ struct TestStateLRUAndRefresh {
}
impl TestStateLRUAndRefresh {
async fn new() -> Self {
fn new() -> Self {
let refresh_duration_provider = Arc::new(TestRefreshDurationProvider::new());
let size_estimator = Arc::new(TestSizeEstimator::default());
let time_provider = Arc::new(MockProvider::new(Time::MIN));
@ -278,17 +276,15 @@ impl TestStateLRUAndRefresh {
let notify_idle = Arc::new(Notify::new());
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
backend.add_policy(
RefreshPolicy::new_inner(
Arc::clone(&refresh_duration_provider) as _,
Arc::clone(&time_provider) as _,
Arc::clone(&loader) as _,
"my_cache",
&metric_registry,
Arc::clone(&notify_idle),
)
.await,
);
backend.add_policy(RefreshPolicy::new_inner(
Arc::clone(&refresh_duration_provider) as _,
Arc::clone(&time_provider) as _,
Arc::clone(&loader) as _,
"my_cache",
&metric_registry,
Arc::clone(&notify_idle),
&Handle::current(),
));
let pool = Arc::new(ResourcePool::new(
"my_pool",
TestSize(10),

View File

@ -8,6 +8,7 @@ use iox_time::{Time, TimeProvider};
use metric::U64Counter;
use parking_lot::Mutex;
use tokio::{
runtime::Handle,
sync::{mpsc::UnboundedSender, Notify},
task::JoinHandle,
};
@ -139,15 +140,14 @@ where
V: Clone + Debug + Send + 'static,
{
/// Create new refresh policy.
///
/// Even though this does NOT perform any IO, we require this to be `async` so that we can start a tokio background process.
#[allow(clippy::new_ret_no_self)]
pub async fn new(
pub fn new(
refresh_duration_provider: Arc<dyn RefreshDurationProvider<K = K, V = V>>,
time_provider: Arc<dyn TimeProvider>,
loader: Arc<dyn Loader<K = K, V = V, Extra = ()>>,
name: &'static str,
metric_registry: &metric::Registry,
handle: &Handle,
) -> impl FnOnce(CallbackHandle<K, V>) -> Self {
let idle_notify = Arc::new(Notify::new());
Self::new_inner(
@ -157,31 +157,35 @@ where
name,
metric_registry,
idle_notify,
handle,
)
.await
}
#[allow(clippy::new_ret_no_self)]
pub(crate) async fn new_inner(
pub(crate) fn new_inner(
refresh_duration_provider: Arc<dyn RefreshDurationProvider<K = K, V = V>>,
time_provider: Arc<dyn TimeProvider>,
loader: Arc<dyn Loader<K = K, V = V, Extra = ()>>,
name: &'static str,
metric_registry: &metric::Registry,
idle_notify: Arc<Notify>,
handle: &Handle,
) -> impl FnOnce(CallbackHandle<K, V>) -> Self {
let metric_refreshed = metric_registry
.register_metric::<U64Counter>("cache_refresh", "Number of cache refresh operations.")
.recorder(&[("name", name)]);
|mut callback_handle| {
// clone handle for callback
let handle = handle.clone();
move |mut callback_handle| {
callback_handle.execute_requests(vec![ChangeRequest::ensure_empty()]);
let (tx_refresh_tasks, mut rx_refresh_tasks) = tokio::sync::mpsc::unbounded_channel();
let timings: Arc<Mutex<HashMap<K, RefreshState>>> = Default::default();
let callback_handle = Arc::new(Mutex::new(callback_handle));
let background_worker = tokio::task::spawn(async move {
let background_worker = handle.spawn(async move {
let mut refresh_tasks = FuturesUnordered::<BoxFuture<'static, ()>>::new();
// We MUST NOT poll the empty task set because this would finish immediately. This will hot-loop
@ -693,8 +697,8 @@ mod tests {
loader,
"my_cache",
&metric_registry,
)
.await;
&Handle::current(),
);
backend.add_policy(|mut handle| {
handle.execute_requests(vec![ChangeRequest::set(1, String::from("foo"))]);
policy_constructor(handle)
@ -710,16 +714,14 @@ mod tests {
let time_provider = Arc::new(MockProvider::new(Time::MAX - Duration::from_secs(1)));
let loader = Arc::new(TestLoader::default());
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
backend.add_policy(
RefreshPolicy::new(
refresh_duration_provider,
Arc::clone(&time_provider) as _,
loader,
"my_cache",
&metric_registry,
)
.await,
);
backend.add_policy(RefreshPolicy::new(
refresh_duration_provider,
Arc::clone(&time_provider) as _,
loader,
"my_cache",
&metric_registry,
&Handle::current(),
));
backend.set(1, String::from("a"));
@ -738,7 +740,7 @@ mod tests {
metric_registry,
notify_idle,
..
} = TestState::new().await;
} = TestState::new();
loader.mock_next(1, String::from("foo"));
loader.mock_next(1, String::from("bar"));
@ -799,7 +801,7 @@ mod tests {
loader,
notify_idle,
..
} = TestState::new().await;
} = TestState::new();
let barrier = loader.block_next(1, String::from("foo"));
refresh_duration_provider.set_refresh_in(
@ -830,7 +832,7 @@ mod tests {
loader,
notify_idle,
..
} = TestState::new().await;
} = TestState::new();
let barrier = loader.block_next(1, String::from("foo"));
refresh_duration_provider.set_refresh_in(
@ -866,7 +868,7 @@ mod tests {
loader,
notify_idle,
..
} = TestState::new().await;
} = TestState::new();
let barrier = loader.block_next(1, String::from("foo"));
refresh_duration_provider.set_refresh_in(
@ -896,7 +898,7 @@ mod tests {
loader,
notify_idle,
..
} = TestState::new().await;
} = TestState::new();
let barrier = loader.block_next(1, String::from("foo"));
refresh_duration_provider.set_refresh_in(
@ -923,30 +925,26 @@ mod tests {
assert_eq!(backend.get(&1), Some(String::from("b")));
}
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn test_generic_backend() {
use crate::backend::test_util::test_generic;
// need some async foo because `test_generic` is not async (because backends are usually fully sync)
tokio::task::block_in_place(move || {
test_generic(|| {
let ttl_provider = Arc::new(NeverRefreshProvider::default());
let time_provider = Arc::new(MockProvider::new(Time::MIN));
let metric_registry = metric::Registry::new();
let loader = Arc::new(TestLoader::default());
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
test_generic(|| {
let ttl_provider = Arc::new(NeverRefreshProvider::default());
let time_provider = Arc::new(MockProvider::new(Time::MIN));
let metric_registry = metric::Registry::new();
let loader = Arc::new(TestLoader::default());
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
let policy_constructor =
tokio::runtime::Handle::current().block_on(RefreshPolicy::new(
Arc::clone(&ttl_provider) as _,
Arc::clone(&time_provider) as _,
loader,
"my_cache",
&metric_registry,
));
backend.add_policy(policy_constructor);
backend
});
backend.add_policy(RefreshPolicy::new(
Arc::clone(&ttl_provider) as _,
Arc::clone(&time_provider) as _,
loader,
"my_cache",
&metric_registry,
&Handle::current(),
));
backend
});
}
@ -960,7 +958,7 @@ mod tests {
}
impl TestState {
async fn new() -> Self {
fn new() -> Self {
let refresh_duration_provider = Arc::new(TestRefreshDurationProvider::new());
let time_provider = Arc::new(MockProvider::new(Time::MIN));
let metric_registry = metric::Registry::new();
@ -968,17 +966,15 @@ mod tests {
let notify_idle = Arc::new(Notify::new());
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
backend.add_policy(
RefreshPolicy::new_inner(
Arc::clone(&refresh_duration_provider) as _,
Arc::clone(&time_provider) as _,
Arc::clone(&loader) as _,
"my_cache",
&metric_registry,
Arc::clone(&notify_idle),
)
.await,
);
backend.add_policy(RefreshPolicy::new_inner(
Arc::clone(&refresh_duration_provider) as _,
Arc::clone(&time_provider) as _,
Arc::clone(&loader) as _,
"my_cache",
&metric_registry,
Arc::clone(&notify_idle),
&Handle::current(),
));
Self {
backend,