From 37347f2389c92a1c73b7105e4a3fd8bf318fada7 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 27 May 2022 10:52:10 -0400 Subject: [PATCH] feat: Add an Extra type to Cacher Loader to specify extra information for loading entries --- cache_system/src/driver.rs | 70 ++++++++++++----------- cache_system/src/loader/metrics.rs | 25 ++++---- cache_system/src/loader/mod.rs | 27 +++++---- querier/src/cache/namespace.rs | 51 +++++++++-------- querier/src/cache/parquet_file.rs | 6 +- querier/src/cache/partition.rs | 50 ++++++++-------- querier/src/cache/processed_tombstones.rs | 6 +- querier/src/cache/read_buffer.rs | 6 +- querier/src/cache/table.rs | 11 ++-- querier/src/cache/tombstones.rs | 6 +- 10 files changed, 141 insertions(+), 117 deletions(-) diff --git a/cache_system/src/driver.rs b/cache_system/src/driver.rs index 740af806a4..1a2844c9c5 100644 --- a/cache_system/src/driver.rs +++ b/cache_system/src/driver.rs @@ -31,23 +31,25 @@ use tokio::{ /// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. /// The data will NOT be cached. #[derive(Debug)] -pub struct Cache +pub struct Cache where K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static, V: Clone + std::fmt::Debug + Send + 'static, + Extra: std::fmt::Debug + Send + 'static, { state: Arc>>, - loader: Arc>, + loader: Arc>, } -impl Cache +impl Cache where K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static, V: Clone + std::fmt::Debug + Send + 'static, + Extra: std::fmt::Debug + Send + 'static, { /// Create new, empty cache with given loader function. pub fn new( - loader: Arc>, + loader: Arc>, backend: Box>, ) -> Self { Self { @@ -61,13 +63,13 @@ where } /// Get value from cache. - pub async fn get(&self, k: K) -> V { + pub async fn get(&self, k: K, extra: Extra) -> V { // place state locking into its own scope so it doesn't leak into the generator (async // function) let receiver = { let mut state = self.state.lock(); - // check if the already cached this entry + // check if the entry has already been cached if let Some(v) = state.cached_entries.get(&k) { return v; } @@ -102,7 +104,7 @@ where // execute the loader // If we panic here then `tx` will be dropped and the receivers will be // notified. - let v = loader.load(k_for_loader).await; + let v = loader.load(k_for_loader, extra).await; // remove "running" state and store result let was_running = { @@ -221,10 +223,11 @@ where } } -impl Drop for Cache +impl Drop for Cache where K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static, V: Clone + std::fmt::Debug + Send + 'static, + Extra: std::fmt::Debug + Send + 'static, { fn drop(&mut self) { for (_k, running_query) in self.state.lock().running_queries.drain() { @@ -297,19 +300,19 @@ mod tests { async fn test_answers_are_correct() { let (cache, _loader) = setup(); - assert_eq!(cache.get(1).await, String::from("1")); - assert_eq!(cache.get(2).await, String::from("2")); + assert_eq!(cache.get(1, ()).await, String::from("1")); + assert_eq!(cache.get(2, ()).await, String::from("2")); } #[tokio::test] async fn test_linear_memory() { let (cache, loader) = setup(); - assert_eq!(cache.get(1).await, String::from("1")); - assert_eq!(cache.get(1).await, String::from("1")); - assert_eq!(cache.get(2).await, String::from("2")); - assert_eq!(cache.get(2).await, String::from("2")); - assert_eq!(cache.get(1).await, String::from("1")); + assert_eq!(cache.get(1, ()).await, String::from("1")); + assert_eq!(cache.get(1, ()).await, String::from("1")); + assert_eq!(cache.get(2, ()).await, String::from("2")); + assert_eq!(cache.get(2, ()).await, String::from("2")); + assert_eq!(cache.get(1, ()).await, String::from("1")); assert_eq!(loader.loaded(), vec![1, 2]); } @@ -321,8 +324,8 @@ mod tests { loader.block(); let cache_captured = Arc::clone(&cache); - let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); - let handle_2 = tokio::spawn(async move { cache.get(1).await }); + let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await }); + let handle_2 = tokio::spawn(async move { cache.get(1, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; // Shouldn't issue concurrent load requests for the same key @@ -342,10 +345,10 @@ mod tests { loader.block(); let cache_captured = Arc::clone(&cache); - let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); + let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await }); let cache_captured = Arc::clone(&cache); - let handle_2 = tokio::spawn(async move { cache_captured.get(1).await }); - let handle_3 = tokio::spawn(async move { cache.get(2).await }); + let handle_2 = tokio::spawn(async move { cache_captured.get(1, ()).await }); + let handle_3 = tokio::spawn(async move { cache.get(2, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; @@ -366,9 +369,9 @@ mod tests { loader.block(); let cache_captured = Arc::clone(&cache); - let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); + let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; - let handle_2 = tokio::spawn(async move { cache.get(1).await }); + let handle_2 = tokio::spawn(async move { cache.get(1, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; @@ -392,11 +395,11 @@ mod tests { loader.block(); let cache_captured = Arc::clone(&cache); - let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); + let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; let cache_captured = Arc::clone(&cache); - let handle_2 = tokio::spawn(async move { cache_captured.get(1).await }); - let handle_3 = tokio::spawn(async move { cache.get(2).await }); + let handle_2 = tokio::spawn(async move { cache_captured.get(1, ()).await }); + let handle_3 = tokio::spawn(async move { cache.get(2, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; @@ -422,7 +425,7 @@ mod tests { loader.block(); - let handle = tokio::spawn(async move { cache.get(1).await }); + let handle = tokio::spawn(async move { cache.get(1, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; @@ -442,7 +445,7 @@ mod tests { cache.set(1, String::from("foo")).await; // blocked loader is not used - let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1)) + let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1, ())) .await .unwrap(); assert_eq!(res, String::from("foo")); @@ -456,7 +459,7 @@ mod tests { loader.block(); let cache_captured = Arc::clone(&cache); - let handle = tokio::spawn(async move { cache_captured.get(1).await }); + let handle = tokio::spawn(async move { cache_captured.get(1, ()).await }); tokio::time::sleep(Duration::from_millis(10)).await; cache.set(1, String::from("foo")).await; @@ -470,14 +473,14 @@ mod tests { assert_eq!(loader.loaded(), vec![1]); // still cached - let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1)) + let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1, ())) .await .unwrap(); assert_eq!(res, String::from("foo")); assert_eq!(loader.loaded(), vec![1]); } - fn setup() -> (Arc>, Arc) { + fn setup() -> (Arc>, Arc) { let loader = Arc::new(TestLoader::default()); let cache = Arc::new(Cache::new( Arc::clone(&loader) as _, @@ -536,8 +539,9 @@ mod tests { impl Loader for TestLoader { type K = u8; type V = String; + type Extra = (); - async fn load(&self, k: u8) -> String { + async fn load(&self, k: u8, _extra: ()) -> String { self.loaded.lock().push(k); // need to capture the cloned notify handle, otherwise the lock guard leaks into the @@ -569,7 +573,7 @@ mod tests { #[test] fn test_bounds() { - assert_send::>(); - assert_sync::>(); + assert_send::>(); + assert_sync::>(); } } diff --git a/cache_system/src/loader/metrics.rs b/cache_system/src/loader/metrics.rs index ea072282b5..2726dfe048 100644 --- a/cache_system/src/loader/metrics.rs +++ b/cache_system/src/loader/metrics.rs @@ -9,25 +9,27 @@ use metric::{DurationHistogram, U64Counter}; use super::Loader; /// Wraps a [`Loader`] and adds metrics. -pub struct MetricsLoader +pub struct MetricsLoader where K: Send + 'static, V: Send + 'static, + Extra: Send + 'static, { - inner: Box>, + inner: Box>, time_provider: Arc, metric_calls: U64Counter, metric_duration: DurationHistogram, } -impl MetricsLoader +impl MetricsLoader where K: Send + 'static, V: Send + 'static, + Extra: Send + 'static, { /// Create new wrapper. pub fn new( - inner: Box>, + inner: Box>, name: &'static str, time_provider: Arc, metric_registry: &metric::Registry, @@ -54,10 +56,11 @@ where } } -impl std::fmt::Debug for MetricsLoader +impl std::fmt::Debug for MetricsLoader where K: Send + 'static, V: Send + 'static, + Extra: Send + 'static, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MetricsLoader").finish_non_exhaustive() @@ -65,19 +68,21 @@ where } #[async_trait] -impl Loader for MetricsLoader +impl Loader for MetricsLoader where K: Send + 'static, V: Send + 'static, + Extra: Send + 'static, { type K = K; type V = V; + type Extra = Extra; - async fn load(&self, k: Self::K) -> Self::V { + async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V { self.metric_calls.inc(1); let t_start = self.time_provider.now(); - let v = self.inner.load(k).await; + let v = self.inner.load(k, extra).await; let t_end = self.time_provider.now(); self.metric_duration.record(t_end - t_start); @@ -103,7 +108,7 @@ mod tests { let metric_registry = Arc::new(metric::Registry::new()); let time_provider_captured = Arc::clone(&time_provider); - let inner_loader = Box::new(FunctionLoader::new(move |x: u64| { + let inner_loader = Box::new(FunctionLoader::new(move |x: u64, _extra: ()| { let time_provider_captured = Arc::clone(&time_provider_captured); async move { time_provider_captured.inc(Duration::from_secs(10)); @@ -134,7 +139,7 @@ mod tests { panic!("Wrong observation type"); } - assert_eq!(loader.load(42).await, String::from("42")); + assert_eq!(loader.load(42, ()).await, String::from("42")); let mut reporter = RawReporter::default(); metric_registry.report(&mut reporter); diff --git a/cache_system/src/loader/mod.rs b/cache_system/src/loader/mod.rs index ffd1308c36..fbca807766 100644 --- a/cache_system/src/loader/mod.rs +++ b/cache_system/src/loader/mod.rs @@ -11,46 +11,51 @@ pub trait Loader: std::fmt::Debug + Send + Sync + 'static { /// Cache key. type K: Send + 'static; + /// Extra data needed when loading a missing entry. Specify `()` if not needed. + type Extra: Send + 'static; + /// Cache value. type V: Send + 'static; - /// Load value for given key. - async fn load(&self, k: Self::K) -> Self::V; + /// Load value for given key, using the extra data if needed. + async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V; } /// Simple-to-use wrapper for async functions to act as a [`Loader`]. -pub struct FunctionLoader { - loader: Box BoxFuture<'static, V>) + Send + Sync>, +pub struct FunctionLoader { + loader: Box BoxFuture<'static, V>) + Send + Sync>, } -impl FunctionLoader { +impl FunctionLoader { /// Create loader from function. pub fn new(loader: T) -> Self where - T: Fn(K) -> F + Send + Sync + 'static, + T: Fn(K, Extra) -> F + Send + Sync + 'static, F: Future + Send + 'static, { - let loader = Box::new(move |k| loader(k).boxed()); + let loader = Box::new(move |k, extra| loader(k, extra).boxed()); Self { loader } } } -impl std::fmt::Debug for FunctionLoader { +impl std::fmt::Debug for FunctionLoader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FunctionLoader").finish_non_exhaustive() } } #[async_trait] -impl Loader for FunctionLoader +impl Loader for FunctionLoader where K: Send + 'static, V: Send + 'static, + Extra: Send + 'static, { type K = K; type V = V; + type Extra = Extra; - async fn load(&self, k: Self::K) -> Self::V { - (self.loader)(k).await + async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V { + (self.loader)(k, extra).await } } diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index aa4cdf79e2..5a3af71bb9 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -25,7 +25,7 @@ pub const TTL_NON_EXISTING: Duration = Duration::from_secs(60); const CACHE_ID: &str = "namespace"; -type CacheT = Cache, Option>>; +type CacheT = Cache, Option>, ()>; /// Cache for namespace-related attributes. #[derive(Debug)] @@ -42,30 +42,32 @@ impl NamespaceCache { metric_registry: &metric::Registry, ram_pool: Arc>, ) -> Self { - let loader = Box::new(FunctionLoader::new(move |namespace_name: Arc| { - let catalog = Arc::clone(&catalog); - let backoff_config = backoff_config.clone(); + let loader = Box::new(FunctionLoader::new( + move |namespace_name: Arc, _extra: ()| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); - async move { - let schema = Backoff::new(&backoff_config) - .retry_all_errors("get namespace schema", || async { - let mut repos = catalog.repositories().await; - match get_schema_by_name(&namespace_name, repos.as_mut()).await { - Ok(schema) => Ok(Some(schema)), - Err(iox_catalog::interface::Error::NamespaceNotFound { .. }) => { - Ok(None) + async move { + let schema = Backoff::new(&backoff_config) + .retry_all_errors("get namespace schema", || async { + let mut repos = catalog.repositories().await; + match get_schema_by_name(&namespace_name, repos.as_mut()).await { + Ok(schema) => Ok(Some(schema)), + Err(iox_catalog::interface::Error::NamespaceNotFound { + .. + }) => Ok(None), + Err(e) => Err(e), } - Err(e) => Err(e), - } - }) - .await - .expect("retry forever")?; + }) + .await + .expect("retry forever")?; - Some(Arc::new(CachedNamespace { - schema: Arc::new(schema), - })) - } - })); + Some(Arc::new(CachedNamespace { + schema: Arc::new(schema), + })) + } + }, + )); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, @@ -106,7 +108,10 @@ impl NamespaceCache { /// Get namespace schema by name. pub async fn schema(&self, name: Arc) -> Option> { - self.cache.get(name).await.map(|n| Arc::clone(&n.schema)) + self.cache + .get(name, ()) + .await + .map(|n| Arc::clone(&n.schema)) } } diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index ab72613737..4b66025a37 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -98,7 +98,7 @@ impl CachedParquetFiles { /// DOES NOT CACHE the actual parquet bytes from object store #[derive(Debug)] pub struct ParquetFileCache { - cache: Cache>, + cache: Cache, ()>, /// Handle that allows clearing entries for existing cache entries backend: SharedBackend>, @@ -113,7 +113,7 @@ impl ParquetFileCache { metric_registry: &metric::Registry, ram_pool: Arc>, ) -> Self { - let loader = Box::new(FunctionLoader::new(move |table_id: TableId| { + let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| { let catalog = Arc::clone(&catalog); let backoff_config = backoff_config.clone(); @@ -177,7 +177,7 @@ impl ParquetFileCache { /// Get list of cached parquet files, by table id pub async fn get(&self, table_id: TableId) -> Arc { - self.cache.get(table_id).await + self.cache.get(table_id, ()).await } /// Mark the entry for table_id as expired (and needs a refresh) diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 4dca8f8eb4..78579262e0 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -27,7 +27,7 @@ const CACHE_ID: &str = "partition"; /// Cache for partition-related attributes. #[derive(Debug)] pub struct PartitionCache { - cache: Cache, + cache: Cache, backend: SharedBackend, } @@ -40,30 +40,32 @@ impl PartitionCache { metric_registry: &metric::Registry, ram_pool: Arc>, ) -> Self { - let loader = Box::new(FunctionLoader::new(move |partition_id| { - let catalog = Arc::clone(&catalog); - let backoff_config = backoff_config.clone(); + let loader = Box::new(FunctionLoader::new( + move |partition_id: PartitionId, _extra: ()| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); - async move { - let partition = Backoff::new(&backoff_config) - .retry_all_errors("get partition_key", || async { - catalog - .repositories() - .await - .partitions() - .get_by_id(partition_id) - .await - }) - .await - .expect("retry forever") - .expect("partition gone from catalog?!"); + async move { + let partition = Backoff::new(&backoff_config) + .retry_all_errors("get partition_key", || async { + catalog + .repositories() + .await + .partitions() + .get_by_id(partition_id) + .await + }) + .await + .expect("retry forever") + .expect("partition gone from catalog?!"); - CachedPartition { - sequencer_id: partition.sequencer_id, - sort_key: Arc::new(partition.sort_key()), + CachedPartition { + sequencer_id: partition.sequencer_id, + sort_key: Arc::new(partition.sort_key()), + } } - } - })); + }, + )); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, @@ -90,12 +92,12 @@ impl PartitionCache { /// Get sequencer ID. pub async fn sequencer_id(&self, partition_id: PartitionId) -> SequencerId { - self.cache.get(partition_id).await.sequencer_id + self.cache.get(partition_id, ()).await.sequencer_id } /// Get sort key pub async fn sort_key(&self, partition_id: PartitionId) -> Arc> { - self.cache.get(partition_id).await.sort_key + self.cache.get(partition_id, ()).await.sort_key } /// Expire partition if the cached sort key does NOT cover the given set of columns. diff --git a/querier/src/cache/processed_tombstones.rs b/querier/src/cache/processed_tombstones.rs index 671ee23bcf..4a56e7d2ad 100644 --- a/querier/src/cache/processed_tombstones.rs +++ b/querier/src/cache/processed_tombstones.rs @@ -28,7 +28,7 @@ const CACHE_ID: &str = "processed_tombstones"; /// Cache for processed tombstones. #[derive(Debug)] pub struct ProcessedTombstonesCache { - cache: Cache<(ParquetFileId, TombstoneId), bool>, + cache: Cache<(ParquetFileId, TombstoneId), bool, ()>, } impl ProcessedTombstonesCache { @@ -41,7 +41,7 @@ impl ProcessedTombstonesCache { ram_pool: Arc>, ) -> Self { let loader = Box::new(FunctionLoader::new( - move |(parquet_file_id, tombstone_id)| { + move |(parquet_file_id, tombstone_id), _extra: ()| { let catalog = Arc::clone(&catalog); let backoff_config = backoff_config.clone(); @@ -89,7 +89,7 @@ impl ProcessedTombstonesCache { /// Check if the specified tombstone is mark as "processed" for the given parquet file. pub async fn exists(&self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> bool { - self.cache.get((parquet_file_id, tombstone_id)).await + self.cache.get((parquet_file_id, tombstone_id), ()).await } } diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index 25db8cdefa..59918fa4e9 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -25,7 +25,7 @@ const CACHE_ID: &str = "read_buffer"; /// Cache for parquet file data decoded into read buffer chunks #[derive(Debug)] pub struct ReadBufferCache { - cache: Cache>, + cache: Cache, ()>, /// Handle that allows clearing entries for existing cache entries _backend: SharedBackend>, @@ -39,7 +39,7 @@ impl ReadBufferCache { ram_pool: Arc>, ) -> Self { let loader = Box::new(FunctionLoader::new( - move |parquet_file_id: ParquetFileId| { + move |parquet_file_id: ParquetFileId, _extra| { let backoff_config = BackoffConfig::default(); async move { @@ -89,7 +89,7 @@ impl ReadBufferCache { pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc { let parquet_file = &decoded_parquet_file.parquet_file; - self.cache.get(parquet_file.id).await + self.cache.get(parquet_file.id, ()).await } } diff --git a/querier/src/cache/table.rs b/querier/src/cache/table.rs index 83cdb818b6..7fde8fc6c6 100644 --- a/querier/src/cache/table.rs +++ b/querier/src/cache/table.rs @@ -22,7 +22,7 @@ pub const TTL_NON_EXISTING: Duration = Duration::from_secs(10); const CACHE_ID: &str = "table"; -type CacheT = Cache>>; +type CacheT = Cache>, ()>; /// Cache for table-related queries. #[derive(Debug)] @@ -39,7 +39,7 @@ impl TableCache { metric_registry: &metric::Registry, ram_pool: Arc>, ) -> Self { - let loader = Box::new(FunctionLoader::new(move |table_id| { + let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| { let catalog = Arc::clone(&catalog); let backoff_config = backoff_config.clone(); @@ -95,14 +95,17 @@ impl TableCache { /// /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. pub async fn name(&self, table_id: TableId) -> Option> { - self.cache.get(table_id).await.map(|t| Arc::clone(&t.name)) + self.cache + .get(table_id, ()) + .await + .map(|t| Arc::clone(&t.name)) } /// Get the table namespace ID for the given table ID. /// /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. pub async fn namespace_id(&self, table_id: TableId) -> Option { - self.cache.get(table_id).await.map(|t| t.namespace_id) + self.cache.get(table_id, ()).await.map(|t| t.namespace_id) } } diff --git a/querier/src/cache/tombstones.rs b/querier/src/cache/tombstones.rs index cd71eec917..cbc36a6559 100644 --- a/querier/src/cache/tombstones.rs +++ b/querier/src/cache/tombstones.rs @@ -81,7 +81,7 @@ impl CachedTombstones { /// Cache for tombstones for a particular table #[derive(Debug)] pub struct TombstoneCache { - cache: Cache, + cache: Cache, /// Handle that allows clearing entries for existing cache entries backend: SharedBackend, } @@ -95,7 +95,7 @@ impl TombstoneCache { metric_registry: &metric::Registry, ram_pool: Arc>, ) -> Self { - let loader = Box::new(FunctionLoader::new(move |table_id: TableId| { + let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| { let catalog = Arc::clone(&catalog); let backoff_config = backoff_config.clone(); @@ -146,7 +146,7 @@ impl TombstoneCache { /// Get list of cached tombstones, by table id pub async fn get(&self, table_id: TableId) -> CachedTombstones { - self.cache.get(table_id).await + self.cache.get(table_id, ()).await } /// Mark the entry for table_id as expired / needs a refresh