Merge pull request #4754 from influxdata/cn/extra-cache-system
feat: Add an Extra type to Cacher Loader to specify extra information…pull/24376/head
commit
b714269b13
|
@ -31,23 +31,25 @@ use tokio::{
|
|||
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic.
|
||||
/// The data will NOT be cached.
|
||||
#[derive(Debug)]
|
||||
pub struct Cache<K, V>
|
||||
pub struct Cache<K, V, Extra>
|
||||
where
|
||||
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
||||
V: Clone + std::fmt::Debug + Send + 'static,
|
||||
Extra: std::fmt::Debug + Send + 'static,
|
||||
{
|
||||
state: Arc<Mutex<CacheState<K, V>>>,
|
||||
loader: Arc<dyn Loader<K = K, V = V>>,
|
||||
loader: Arc<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||
}
|
||||
|
||||
impl<K, V> Cache<K, V>
|
||||
impl<K, V, Extra> Cache<K, V, Extra>
|
||||
where
|
||||
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
||||
V: Clone + std::fmt::Debug + Send + 'static,
|
||||
Extra: std::fmt::Debug + Send + 'static,
|
||||
{
|
||||
/// Create new, empty cache with given loader function.
|
||||
pub fn new(
|
||||
loader: Arc<dyn Loader<K = K, V = V>>,
|
||||
loader: Arc<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||
backend: Box<dyn CacheBackend<K = K, V = V>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
|
@ -61,13 +63,13 @@ where
|
|||
}
|
||||
|
||||
/// Get value from cache.
|
||||
pub async fn get(&self, k: K) -> V {
|
||||
pub async fn get(&self, k: K, extra: Extra) -> V {
|
||||
// place state locking into its own scope so it doesn't leak into the generator (async
|
||||
// function)
|
||||
let receiver = {
|
||||
let mut state = self.state.lock();
|
||||
|
||||
// check if the already cached this entry
|
||||
// check if the entry has already been cached
|
||||
if let Some(v) = state.cached_entries.get(&k) {
|
||||
return v;
|
||||
}
|
||||
|
@ -102,7 +104,7 @@ where
|
|||
// execute the loader
|
||||
// If we panic here then `tx` will be dropped and the receivers will be
|
||||
// notified.
|
||||
let v = loader.load(k_for_loader).await;
|
||||
let v = loader.load(k_for_loader, extra).await;
|
||||
|
||||
// remove "running" state and store result
|
||||
let was_running = {
|
||||
|
@ -221,10 +223,11 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<K, V> Drop for Cache<K, V>
|
||||
impl<K, V, Extra> Drop for Cache<K, V, Extra>
|
||||
where
|
||||
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
||||
V: Clone + std::fmt::Debug + Send + 'static,
|
||||
Extra: std::fmt::Debug + Send + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
for (_k, running_query) in self.state.lock().running_queries.drain() {
|
||||
|
@ -297,19 +300,19 @@ mod tests {
|
|||
async fn test_answers_are_correct() {
|
||||
let (cache, _loader) = setup();
|
||||
|
||||
assert_eq!(cache.get(1).await, String::from("1"));
|
||||
assert_eq!(cache.get(2).await, String::from("2"));
|
||||
assert_eq!(cache.get(1, true).await, String::from("1_true"));
|
||||
assert_eq!(cache.get(2, false).await, String::from("2_false"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_linear_memory() {
|
||||
let (cache, loader) = setup();
|
||||
|
||||
assert_eq!(cache.get(1).await, String::from("1"));
|
||||
assert_eq!(cache.get(1).await, String::from("1"));
|
||||
assert_eq!(cache.get(2).await, String::from("2"));
|
||||
assert_eq!(cache.get(2).await, String::from("2"));
|
||||
assert_eq!(cache.get(1).await, String::from("1"));
|
||||
assert_eq!(cache.get(1, true).await, String::from("1_true"));
|
||||
assert_eq!(cache.get(1, false).await, String::from("1_true"));
|
||||
assert_eq!(cache.get(2, false).await, String::from("2_false"));
|
||||
assert_eq!(cache.get(2, false).await, String::from("2_false"));
|
||||
assert_eq!(cache.get(1, true).await, String::from("1_true"));
|
||||
|
||||
assert_eq!(loader.loaded(), vec![1, 2]);
|
||||
}
|
||||
|
@ -321,16 +324,16 @@ mod tests {
|
|||
loader.block();
|
||||
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
||||
let handle_2 = tokio::spawn(async move { cache.get(1).await });
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1, true).await });
|
||||
let handle_2 = tokio::spawn(async move { cache.get(1, true).await });
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
// Shouldn't issue concurrent load requests for the same key
|
||||
let n_blocked = loader.unblock();
|
||||
assert_eq!(n_blocked, 1);
|
||||
|
||||
assert_eq!(handle_1.await.unwrap(), String::from("1"));
|
||||
assert_eq!(handle_2.await.unwrap(), String::from("1"));
|
||||
assert_eq!(handle_1.await.unwrap(), String::from("1_true"));
|
||||
assert_eq!(handle_2.await.unwrap(), String::from("1_true"));
|
||||
|
||||
assert_eq!(loader.loaded(), vec![1]);
|
||||
}
|
||||
|
@ -342,19 +345,19 @@ mod tests {
|
|||
loader.block();
|
||||
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1, true).await });
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_2 = tokio::spawn(async move { cache_captured.get(1).await });
|
||||
let handle_3 = tokio::spawn(async move { cache.get(2).await });
|
||||
let handle_2 = tokio::spawn(async move { cache_captured.get(1, true).await });
|
||||
let handle_3 = tokio::spawn(async move { cache.get(2, false).await });
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
let n_blocked = loader.unblock();
|
||||
assert_eq!(n_blocked, 2);
|
||||
|
||||
assert_eq!(handle_1.await.unwrap(), String::from("1"));
|
||||
assert_eq!(handle_2.await.unwrap(), String::from("1"));
|
||||
assert_eq!(handle_3.await.unwrap(), String::from("2"));
|
||||
assert_eq!(handle_1.await.unwrap(), String::from("1_true"));
|
||||
assert_eq!(handle_2.await.unwrap(), String::from("1_true"));
|
||||
assert_eq!(handle_3.await.unwrap(), String::from("2_false"));
|
||||
|
||||
assert_eq!(loader.loaded(), vec![1, 2]);
|
||||
}
|
||||
|
@ -366,9 +369,9 @@ mod tests {
|
|||
loader.block();
|
||||
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1, true).await });
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
let handle_2 = tokio::spawn(async move { cache.get(1).await });
|
||||
let handle_2 = tokio::spawn(async move { cache.get(1, false).await });
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
@ -379,7 +382,7 @@ mod tests {
|
|||
let n_blocked = loader.unblock();
|
||||
assert_eq!(n_blocked, 1);
|
||||
|
||||
assert_eq!(handle_2.await.unwrap(), String::from("1"));
|
||||
assert_eq!(handle_2.await.unwrap(), String::from("1_true"));
|
||||
|
||||
assert_eq!(loader.loaded(), vec![1]);
|
||||
}
|
||||
|
@ -392,11 +395,11 @@ mod tests {
|
|||
loader.block();
|
||||
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1, true).await });
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_2 = tokio::spawn(async move { cache_captured.get(1).await });
|
||||
let handle_3 = tokio::spawn(async move { cache.get(2).await });
|
||||
let handle_2 = tokio::spawn(async move { cache_captured.get(1, false).await });
|
||||
let handle_3 = tokio::spawn(async move { cache.get(2, false).await });
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
@ -411,7 +414,7 @@ mod tests {
|
|||
handle_2.await.unwrap_err();
|
||||
|
||||
// third handle should just work
|
||||
assert_eq!(handle_3.await.unwrap(), String::from("2"));
|
||||
assert_eq!(handle_3.await.unwrap(), String::from("2_false"));
|
||||
|
||||
assert_eq!(loader.loaded(), vec![1, 2]);
|
||||
}
|
||||
|
@ -422,7 +425,7 @@ mod tests {
|
|||
|
||||
loader.block();
|
||||
|
||||
let handle = tokio::spawn(async move { cache.get(1).await });
|
||||
let handle = tokio::spawn(async move { cache.get(1, true).await });
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
|
@ -442,7 +445,7 @@ mod tests {
|
|||
cache.set(1, String::from("foo")).await;
|
||||
|
||||
// blocked loader is not used
|
||||
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1))
|
||||
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1, false))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res, String::from("foo"));
|
||||
|
@ -456,7 +459,7 @@ mod tests {
|
|||
loader.block();
|
||||
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle = tokio::spawn(async move { cache_captured.get(1).await });
|
||||
let handle = tokio::spawn(async move { cache_captured.get(1, true).await });
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
cache.set(1, String::from("foo")).await;
|
||||
|
@ -470,14 +473,14 @@ mod tests {
|
|||
assert_eq!(loader.loaded(), vec![1]);
|
||||
|
||||
// still cached
|
||||
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1))
|
||||
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1, false))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(res, String::from("foo"));
|
||||
assert_eq!(loader.loaded(), vec![1]);
|
||||
}
|
||||
|
||||
fn setup() -> (Arc<Cache<u8, String>>, Arc<TestLoader>) {
|
||||
fn setup() -> (Arc<Cache<u8, String, bool>>, Arc<TestLoader>) {
|
||||
let loader = Arc::new(TestLoader::default());
|
||||
let cache = Arc::new(Cache::new(
|
||||
Arc::clone(&loader) as _,
|
||||
|
@ -536,8 +539,9 @@ mod tests {
|
|||
impl Loader for TestLoader {
|
||||
type K = u8;
|
||||
type V = String;
|
||||
type Extra = bool;
|
||||
|
||||
async fn load(&self, k: u8) -> String {
|
||||
async fn load(&self, k: u8, extra: bool) -> String {
|
||||
self.loaded.lock().push(k);
|
||||
|
||||
// need to capture the cloned notify handle, otherwise the lock guard leaks into the
|
||||
|
@ -552,7 +556,7 @@ mod tests {
|
|||
panic!("test");
|
||||
}
|
||||
|
||||
k.to_string()
|
||||
format!("{k}_{extra}")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -569,7 +573,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_bounds() {
|
||||
assert_send::<Cache<u8, u8>>();
|
||||
assert_sync::<Cache<u8, u8>>();
|
||||
assert_send::<Cache<u8, u8, ()>>();
|
||||
assert_sync::<Cache<u8, u8, ()>>();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,25 +9,27 @@ use metric::{DurationHistogram, U64Counter};
|
|||
use super::Loader;
|
||||
|
||||
/// Wraps a [`Loader`] and adds metrics.
|
||||
pub struct MetricsLoader<K, V>
|
||||
pub struct MetricsLoader<K, V, Extra>
|
||||
where
|
||||
K: Send + 'static,
|
||||
V: Send + 'static,
|
||||
Extra: Send + 'static,
|
||||
{
|
||||
inner: Box<dyn Loader<K = K, V = V>>,
|
||||
inner: Box<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
metric_calls: U64Counter,
|
||||
metric_duration: DurationHistogram,
|
||||
}
|
||||
|
||||
impl<K, V> MetricsLoader<K, V>
|
||||
impl<K, V, Extra> MetricsLoader<K, V, Extra>
|
||||
where
|
||||
K: Send + 'static,
|
||||
V: Send + 'static,
|
||||
Extra: Send + 'static,
|
||||
{
|
||||
/// Create new wrapper.
|
||||
pub fn new(
|
||||
inner: Box<dyn Loader<K = K, V = V>>,
|
||||
inner: Box<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||
name: &'static str,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
metric_registry: &metric::Registry,
|
||||
|
@ -54,10 +56,11 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<K, V> std::fmt::Debug for MetricsLoader<K, V>
|
||||
impl<K, V, Extra> std::fmt::Debug for MetricsLoader<K, V, Extra>
|
||||
where
|
||||
K: Send + 'static,
|
||||
V: Send + 'static,
|
||||
Extra: Send + 'static,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("MetricsLoader").finish_non_exhaustive()
|
||||
|
@ -65,19 +68,21 @@ where
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<K, V> Loader for MetricsLoader<K, V>
|
||||
impl<K, V, Extra> Loader for MetricsLoader<K, V, Extra>
|
||||
where
|
||||
K: Send + 'static,
|
||||
V: Send + 'static,
|
||||
Extra: Send + 'static,
|
||||
{
|
||||
type K = K;
|
||||
type V = V;
|
||||
type Extra = Extra;
|
||||
|
||||
async fn load(&self, k: Self::K) -> Self::V {
|
||||
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
|
||||
self.metric_calls.inc(1);
|
||||
|
||||
let t_start = self.time_provider.now();
|
||||
let v = self.inner.load(k).await;
|
||||
let v = self.inner.load(k, extra).await;
|
||||
let t_end = self.time_provider.now();
|
||||
|
||||
self.metric_duration.record(t_end - t_start);
|
||||
|
@ -103,7 +108,7 @@ mod tests {
|
|||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
|
||||
let time_provider_captured = Arc::clone(&time_provider);
|
||||
let inner_loader = Box::new(FunctionLoader::new(move |x: u64| {
|
||||
let inner_loader = Box::new(FunctionLoader::new(move |x: u64, _extra: ()| {
|
||||
let time_provider_captured = Arc::clone(&time_provider_captured);
|
||||
async move {
|
||||
time_provider_captured.inc(Duration::from_secs(10));
|
||||
|
@ -134,7 +139,7 @@ mod tests {
|
|||
panic!("Wrong observation type");
|
||||
}
|
||||
|
||||
assert_eq!(loader.load(42).await, String::from("42"));
|
||||
assert_eq!(loader.load(42, ()).await, String::from("42"));
|
||||
|
||||
let mut reporter = RawReporter::default();
|
||||
metric_registry.report(&mut reporter);
|
||||
|
|
|
@ -11,46 +11,51 @@ pub trait Loader: std::fmt::Debug + Send + Sync + 'static {
|
|||
/// Cache key.
|
||||
type K: Send + 'static;
|
||||
|
||||
/// Extra data needed when loading a missing entry. Specify `()` if not needed.
|
||||
type Extra: Send + 'static;
|
||||
|
||||
/// Cache value.
|
||||
type V: Send + 'static;
|
||||
|
||||
/// Load value for given key.
|
||||
async fn load(&self, k: Self::K) -> Self::V;
|
||||
/// Load value for given key, using the extra data if needed.
|
||||
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V;
|
||||
}
|
||||
|
||||
/// Simple-to-use wrapper for async functions to act as a [`Loader`].
|
||||
pub struct FunctionLoader<K, V> {
|
||||
loader: Box<dyn (Fn(K) -> BoxFuture<'static, V>) + Send + Sync>,
|
||||
pub struct FunctionLoader<K, V, Extra> {
|
||||
loader: Box<dyn (Fn(K, Extra) -> BoxFuture<'static, V>) + Send + Sync>,
|
||||
}
|
||||
|
||||
impl<K, V> FunctionLoader<K, V> {
|
||||
impl<K, V, Extra> FunctionLoader<K, V, Extra> {
|
||||
/// Create loader from function.
|
||||
pub fn new<T, F>(loader: T) -> Self
|
||||
where
|
||||
T: Fn(K) -> F + Send + Sync + 'static,
|
||||
T: Fn(K, Extra) -> F + Send + Sync + 'static,
|
||||
F: Future<Output = V> + Send + 'static,
|
||||
{
|
||||
let loader = Box::new(move |k| loader(k).boxed());
|
||||
let loader = Box::new(move |k, extra| loader(k, extra).boxed());
|
||||
Self { loader }
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> std::fmt::Debug for FunctionLoader<K, V> {
|
||||
impl<K, V, Extra> std::fmt::Debug for FunctionLoader<K, V, Extra> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("FunctionLoader").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<K, V> Loader for FunctionLoader<K, V>
|
||||
impl<K, V, Extra> Loader for FunctionLoader<K, V, Extra>
|
||||
where
|
||||
K: Send + 'static,
|
||||
V: Send + 'static,
|
||||
Extra: Send + 'static,
|
||||
{
|
||||
type K = K;
|
||||
type V = V;
|
||||
type Extra = Extra;
|
||||
|
||||
async fn load(&self, k: Self::K) -> Self::V {
|
||||
(self.loader)(k).await
|
||||
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
|
||||
(self.loader)(k, extra).await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ pub const TTL_NON_EXISTING: Duration = Duration::from_secs(60);
|
|||
|
||||
const CACHE_ID: &str = "namespace";
|
||||
|
||||
type CacheT = Cache<Arc<str>, Option<Arc<CachedNamespace>>>;
|
||||
type CacheT = Cache<Arc<str>, Option<Arc<CachedNamespace>>, ()>;
|
||||
|
||||
/// Cache for namespace-related attributes.
|
||||
#[derive(Debug)]
|
||||
|
@ -42,30 +42,32 @@ impl NamespaceCache {
|
|||
metric_registry: &metric::Registry,
|
||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(move |namespace_name: Arc<str>| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
let loader = Box::new(FunctionLoader::new(
|
||||
move |namespace_name: Arc<str>, _extra: ()| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
|
||||
async move {
|
||||
let schema = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("get namespace schema", || async {
|
||||
let mut repos = catalog.repositories().await;
|
||||
match get_schema_by_name(&namespace_name, repos.as_mut()).await {
|
||||
Ok(schema) => Ok(Some(schema)),
|
||||
Err(iox_catalog::interface::Error::NamespaceNotFound { .. }) => {
|
||||
Ok(None)
|
||||
async move {
|
||||
let schema = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("get namespace schema", || async {
|
||||
let mut repos = catalog.repositories().await;
|
||||
match get_schema_by_name(&namespace_name, repos.as_mut()).await {
|
||||
Ok(schema) => Ok(Some(schema)),
|
||||
Err(iox_catalog::interface::Error::NamespaceNotFound {
|
||||
..
|
||||
}) => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")?;
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")?;
|
||||
|
||||
Some(Arc::new(CachedNamespace {
|
||||
schema: Arc::new(schema),
|
||||
}))
|
||||
}
|
||||
}));
|
||||
Some(Arc::new(CachedNamespace {
|
||||
schema: Arc::new(schema),
|
||||
}))
|
||||
}
|
||||
},
|
||||
));
|
||||
let loader = Arc::new(MetricsLoader::new(
|
||||
loader,
|
||||
CACHE_ID,
|
||||
|
@ -106,7 +108,10 @@ impl NamespaceCache {
|
|||
|
||||
/// Get namespace schema by name.
|
||||
pub async fn schema(&self, name: Arc<str>) -> Option<Arc<NamespaceSchema>> {
|
||||
self.cache.get(name).await.map(|n| Arc::clone(&n.schema))
|
||||
self.cache
|
||||
.get(name, ())
|
||||
.await
|
||||
.map(|n| Arc::clone(&n.schema))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ impl CachedParquetFiles {
|
|||
/// DOES NOT CACHE the actual parquet bytes from object store
|
||||
#[derive(Debug)]
|
||||
pub struct ParquetFileCache {
|
||||
cache: Cache<TableId, Arc<CachedParquetFiles>>,
|
||||
cache: Cache<TableId, Arc<CachedParquetFiles>, ()>,
|
||||
|
||||
/// Handle that allows clearing entries for existing cache entries
|
||||
backend: SharedBackend<TableId, Arc<CachedParquetFiles>>,
|
||||
|
@ -113,7 +113,7 @@ impl ParquetFileCache {
|
|||
metric_registry: &metric::Registry,
|
||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(move |table_id: TableId| {
|
||||
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
|
||||
|
@ -177,7 +177,7 @@ impl ParquetFileCache {
|
|||
|
||||
/// Get list of cached parquet files, by table id
|
||||
pub async fn get(&self, table_id: TableId) -> Arc<CachedParquetFiles> {
|
||||
self.cache.get(table_id).await
|
||||
self.cache.get(table_id, ()).await
|
||||
}
|
||||
|
||||
/// Mark the entry for table_id as expired (and needs a refresh)
|
||||
|
|
|
@ -27,7 +27,7 @@ const CACHE_ID: &str = "partition";
|
|||
/// Cache for partition-related attributes.
|
||||
#[derive(Debug)]
|
||||
pub struct PartitionCache {
|
||||
cache: Cache<PartitionId, CachedPartition>,
|
||||
cache: Cache<PartitionId, CachedPartition, ()>,
|
||||
backend: SharedBackend<PartitionId, CachedPartition>,
|
||||
}
|
||||
|
||||
|
@ -40,30 +40,32 @@ impl PartitionCache {
|
|||
metric_registry: &metric::Registry,
|
||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(move |partition_id| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
let loader = Box::new(FunctionLoader::new(
|
||||
move |partition_id: PartitionId, _extra: ()| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
|
||||
async move {
|
||||
let partition = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("get partition_key", || async {
|
||||
catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(partition_id)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")
|
||||
.expect("partition gone from catalog?!");
|
||||
async move {
|
||||
let partition = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("get partition_key", || async {
|
||||
catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(partition_id)
|
||||
.await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")
|
||||
.expect("partition gone from catalog?!");
|
||||
|
||||
CachedPartition {
|
||||
sequencer_id: partition.sequencer_id,
|
||||
sort_key: Arc::new(partition.sort_key()),
|
||||
CachedPartition {
|
||||
sequencer_id: partition.sequencer_id,
|
||||
sort_key: Arc::new(partition.sort_key()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
},
|
||||
));
|
||||
let loader = Arc::new(MetricsLoader::new(
|
||||
loader,
|
||||
CACHE_ID,
|
||||
|
@ -90,12 +92,12 @@ impl PartitionCache {
|
|||
|
||||
/// Get sequencer ID.
|
||||
pub async fn sequencer_id(&self, partition_id: PartitionId) -> SequencerId {
|
||||
self.cache.get(partition_id).await.sequencer_id
|
||||
self.cache.get(partition_id, ()).await.sequencer_id
|
||||
}
|
||||
|
||||
/// Get sort key
|
||||
pub async fn sort_key(&self, partition_id: PartitionId) -> Arc<Option<SortKey>> {
|
||||
self.cache.get(partition_id).await.sort_key
|
||||
self.cache.get(partition_id, ()).await.sort_key
|
||||
}
|
||||
|
||||
/// Expire partition if the cached sort key does NOT cover the given set of columns.
|
||||
|
|
|
@ -28,7 +28,7 @@ const CACHE_ID: &str = "processed_tombstones";
|
|||
/// Cache for processed tombstones.
|
||||
#[derive(Debug)]
|
||||
pub struct ProcessedTombstonesCache {
|
||||
cache: Cache<(ParquetFileId, TombstoneId), bool>,
|
||||
cache: Cache<(ParquetFileId, TombstoneId), bool, ()>,
|
||||
}
|
||||
|
||||
impl ProcessedTombstonesCache {
|
||||
|
@ -41,7 +41,7 @@ impl ProcessedTombstonesCache {
|
|||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(
|
||||
move |(parquet_file_id, tombstone_id)| {
|
||||
move |(parquet_file_id, tombstone_id), _extra: ()| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
|
||||
|
@ -89,7 +89,7 @@ impl ProcessedTombstonesCache {
|
|||
|
||||
/// Check if the specified tombstone is mark as "processed" for the given parquet file.
|
||||
pub async fn exists(&self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> bool {
|
||||
self.cache.get((parquet_file_id, tombstone_id)).await
|
||||
self.cache.get((parquet_file_id, tombstone_id), ()).await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ const CACHE_ID: &str = "read_buffer";
|
|||
/// Cache for parquet file data decoded into read buffer chunks
|
||||
#[derive(Debug)]
|
||||
pub struct ReadBufferCache {
|
||||
cache: Cache<ParquetFileId, Arc<RBChunk>>,
|
||||
cache: Cache<ParquetFileId, Arc<RBChunk>, ()>,
|
||||
|
||||
/// Handle that allows clearing entries for existing cache entries
|
||||
_backend: SharedBackend<ParquetFileId, Arc<RBChunk>>,
|
||||
|
@ -39,7 +39,7 @@ impl ReadBufferCache {
|
|||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(
|
||||
move |parquet_file_id: ParquetFileId| {
|
||||
move |parquet_file_id: ParquetFileId, _extra| {
|
||||
let backoff_config = BackoffConfig::default();
|
||||
|
||||
async move {
|
||||
|
@ -89,7 +89,7 @@ impl ReadBufferCache {
|
|||
pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc<RBChunk> {
|
||||
let parquet_file = &decoded_parquet_file.parquet_file;
|
||||
|
||||
self.cache.get(parquet_file.id).await
|
||||
self.cache.get(parquet_file.id, ()).await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ pub const TTL_NON_EXISTING: Duration = Duration::from_secs(10);
|
|||
|
||||
const CACHE_ID: &str = "table";
|
||||
|
||||
type CacheT = Cache<TableId, Option<Arc<CachedTable>>>;
|
||||
type CacheT = Cache<TableId, Option<Arc<CachedTable>>, ()>;
|
||||
|
||||
/// Cache for table-related queries.
|
||||
#[derive(Debug)]
|
||||
|
@ -39,7 +39,7 @@ impl TableCache {
|
|||
metric_registry: &metric::Registry,
|
||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(move |table_id| {
|
||||
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
|
||||
|
@ -95,14 +95,17 @@ impl TableCache {
|
|||
///
|
||||
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
|
||||
pub async fn name(&self, table_id: TableId) -> Option<Arc<str>> {
|
||||
self.cache.get(table_id).await.map(|t| Arc::clone(&t.name))
|
||||
self.cache
|
||||
.get(table_id, ())
|
||||
.await
|
||||
.map(|t| Arc::clone(&t.name))
|
||||
}
|
||||
|
||||
/// Get the table namespace ID for the given table ID.
|
||||
///
|
||||
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
|
||||
pub async fn namespace_id(&self, table_id: TableId) -> Option<NamespaceId> {
|
||||
self.cache.get(table_id).await.map(|t| t.namespace_id)
|
||||
self.cache.get(table_id, ()).await.map(|t| t.namespace_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ impl CachedTombstones {
|
|||
/// Cache for tombstones for a particular table
|
||||
#[derive(Debug)]
|
||||
pub struct TombstoneCache {
|
||||
cache: Cache<TableId, CachedTombstones>,
|
||||
cache: Cache<TableId, CachedTombstones, ()>,
|
||||
/// Handle that allows clearing entries for existing cache entries
|
||||
backend: SharedBackend<TableId, CachedTombstones>,
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ impl TombstoneCache {
|
|||
metric_registry: &metric::Registry,
|
||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
) -> Self {
|
||||
let loader = Box::new(FunctionLoader::new(move |table_id: TableId| {
|
||||
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
|
||||
|
@ -146,7 +146,7 @@ impl TombstoneCache {
|
|||
|
||||
/// Get list of cached tombstones, by table id
|
||||
pub async fn get(&self, table_id: TableId) -> CachedTombstones {
|
||||
self.cache.get(table_id).await
|
||||
self.cache.get(table_id, ()).await
|
||||
}
|
||||
|
||||
/// Mark the entry for table_id as expired / needs a refresh
|
||||
|
|
Loading…
Reference in New Issue