feat: Add an Extra type to Cacher Loader to specify extra information for loading entries
parent
2886149afc
commit
37347f2389
|
@ -31,23 +31,25 @@ use tokio::{
|
||||||
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic.
|
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic.
|
||||||
/// The data will NOT be cached.
|
/// The data will NOT be cached.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Cache<K, V>
|
pub struct Cache<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
||||||
V: Clone + std::fmt::Debug + Send + 'static,
|
V: Clone + std::fmt::Debug + Send + 'static,
|
||||||
|
Extra: std::fmt::Debug + Send + 'static,
|
||||||
{
|
{
|
||||||
state: Arc<Mutex<CacheState<K, V>>>,
|
state: Arc<Mutex<CacheState<K, V>>>,
|
||||||
loader: Arc<dyn Loader<K = K, V = V>>,
|
loader: Arc<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> Cache<K, V>
|
impl<K, V, Extra> Cache<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
||||||
V: Clone + std::fmt::Debug + Send + 'static,
|
V: Clone + std::fmt::Debug + Send + 'static,
|
||||||
|
Extra: std::fmt::Debug + Send + 'static,
|
||||||
{
|
{
|
||||||
/// Create new, empty cache with given loader function.
|
/// Create new, empty cache with given loader function.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
loader: Arc<dyn Loader<K = K, V = V>>,
|
loader: Arc<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||||
backend: Box<dyn CacheBackend<K = K, V = V>>,
|
backend: Box<dyn CacheBackend<K = K, V = V>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
@ -61,13 +63,13 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get value from cache.
|
/// Get value from cache.
|
||||||
pub async fn get(&self, k: K) -> V {
|
pub async fn get(&self, k: K, extra: Extra) -> V {
|
||||||
// place state locking into its own scope so it doesn't leak into the generator (async
|
// place state locking into its own scope so it doesn't leak into the generator (async
|
||||||
// function)
|
// function)
|
||||||
let receiver = {
|
let receiver = {
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
|
|
||||||
// check if the already cached this entry
|
// check if the entry has already been cached
|
||||||
if let Some(v) = state.cached_entries.get(&k) {
|
if let Some(v) = state.cached_entries.get(&k) {
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
@ -102,7 +104,7 @@ where
|
||||||
// execute the loader
|
// execute the loader
|
||||||
// If we panic here then `tx` will be dropped and the receivers will be
|
// If we panic here then `tx` will be dropped and the receivers will be
|
||||||
// notified.
|
// notified.
|
||||||
let v = loader.load(k_for_loader).await;
|
let v = loader.load(k_for_loader, extra).await;
|
||||||
|
|
||||||
// remove "running" state and store result
|
// remove "running" state and store result
|
||||||
let was_running = {
|
let was_running = {
|
||||||
|
@ -221,10 +223,11 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> Drop for Cache<K, V>
|
impl<K, V, Extra> Drop for Cache<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
K: Clone + Eq + Hash + std::fmt::Debug + Ord + Send + 'static,
|
||||||
V: Clone + std::fmt::Debug + Send + 'static,
|
V: Clone + std::fmt::Debug + Send + 'static,
|
||||||
|
Extra: std::fmt::Debug + Send + 'static,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
for (_k, running_query) in self.state.lock().running_queries.drain() {
|
for (_k, running_query) in self.state.lock().running_queries.drain() {
|
||||||
|
@ -297,19 +300,19 @@ mod tests {
|
||||||
async fn test_answers_are_correct() {
|
async fn test_answers_are_correct() {
|
||||||
let (cache, _loader) = setup();
|
let (cache, _loader) = setup();
|
||||||
|
|
||||||
assert_eq!(cache.get(1).await, String::from("1"));
|
assert_eq!(cache.get(1, ()).await, String::from("1"));
|
||||||
assert_eq!(cache.get(2).await, String::from("2"));
|
assert_eq!(cache.get(2, ()).await, String::from("2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_linear_memory() {
|
async fn test_linear_memory() {
|
||||||
let (cache, loader) = setup();
|
let (cache, loader) = setup();
|
||||||
|
|
||||||
assert_eq!(cache.get(1).await, String::from("1"));
|
assert_eq!(cache.get(1, ()).await, String::from("1"));
|
||||||
assert_eq!(cache.get(1).await, String::from("1"));
|
assert_eq!(cache.get(1, ()).await, String::from("1"));
|
||||||
assert_eq!(cache.get(2).await, String::from("2"));
|
assert_eq!(cache.get(2, ()).await, String::from("2"));
|
||||||
assert_eq!(cache.get(2).await, String::from("2"));
|
assert_eq!(cache.get(2, ()).await, String::from("2"));
|
||||||
assert_eq!(cache.get(1).await, String::from("1"));
|
assert_eq!(cache.get(1, ()).await, String::from("1"));
|
||||||
|
|
||||||
assert_eq!(loader.loaded(), vec![1, 2]);
|
assert_eq!(loader.loaded(), vec![1, 2]);
|
||||||
}
|
}
|
||||||
|
@ -321,8 +324,8 @@ mod tests {
|
||||||
loader.block();
|
loader.block();
|
||||||
|
|
||||||
let cache_captured = Arc::clone(&cache);
|
let cache_captured = Arc::clone(&cache);
|
||||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await });
|
||||||
let handle_2 = tokio::spawn(async move { cache.get(1).await });
|
let handle_2 = tokio::spawn(async move { cache.get(1, ()).await });
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
// Shouldn't issue concurrent load requests for the same key
|
// Shouldn't issue concurrent load requests for the same key
|
||||||
|
@ -342,10 +345,10 @@ mod tests {
|
||||||
loader.block();
|
loader.block();
|
||||||
|
|
||||||
let cache_captured = Arc::clone(&cache);
|
let cache_captured = Arc::clone(&cache);
|
||||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await });
|
||||||
let cache_captured = Arc::clone(&cache);
|
let cache_captured = Arc::clone(&cache);
|
||||||
let handle_2 = tokio::spawn(async move { cache_captured.get(1).await });
|
let handle_2 = tokio::spawn(async move { cache_captured.get(1, ()).await });
|
||||||
let handle_3 = tokio::spawn(async move { cache.get(2).await });
|
let handle_3 = tokio::spawn(async move { cache.get(2, ()).await });
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
|
||||||
|
@ -366,9 +369,9 @@ mod tests {
|
||||||
loader.block();
|
loader.block();
|
||||||
|
|
||||||
let cache_captured = Arc::clone(&cache);
|
let cache_captured = Arc::clone(&cache);
|
||||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await });
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
let handle_2 = tokio::spawn(async move { cache.get(1).await });
|
let handle_2 = tokio::spawn(async move { cache.get(1, ()).await });
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
|
||||||
|
@ -392,11 +395,11 @@ mod tests {
|
||||||
loader.block();
|
loader.block();
|
||||||
|
|
||||||
let cache_captured = Arc::clone(&cache);
|
let cache_captured = Arc::clone(&cache);
|
||||||
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
|
let handle_1 = tokio::spawn(async move { cache_captured.get(1, ()).await });
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
let cache_captured = Arc::clone(&cache);
|
let cache_captured = Arc::clone(&cache);
|
||||||
let handle_2 = tokio::spawn(async move { cache_captured.get(1).await });
|
let handle_2 = tokio::spawn(async move { cache_captured.get(1, ()).await });
|
||||||
let handle_3 = tokio::spawn(async move { cache.get(2).await });
|
let handle_3 = tokio::spawn(async move { cache.get(2, ()).await });
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
|
||||||
|
@ -422,7 +425,7 @@ mod tests {
|
||||||
|
|
||||||
loader.block();
|
loader.block();
|
||||||
|
|
||||||
let handle = tokio::spawn(async move { cache.get(1).await });
|
let handle = tokio::spawn(async move { cache.get(1, ()).await });
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
|
||||||
|
@ -442,7 +445,7 @@ mod tests {
|
||||||
cache.set(1, String::from("foo")).await;
|
cache.set(1, String::from("foo")).await;
|
||||||
|
|
||||||
// blocked loader is not used
|
// blocked loader is not used
|
||||||
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1))
|
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1, ()))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(res, String::from("foo"));
|
assert_eq!(res, String::from("foo"));
|
||||||
|
@ -456,7 +459,7 @@ mod tests {
|
||||||
loader.block();
|
loader.block();
|
||||||
|
|
||||||
let cache_captured = Arc::clone(&cache);
|
let cache_captured = Arc::clone(&cache);
|
||||||
let handle = tokio::spawn(async move { cache_captured.get(1).await });
|
let handle = tokio::spawn(async move { cache_captured.get(1, ()).await });
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
|
||||||
cache.set(1, String::from("foo")).await;
|
cache.set(1, String::from("foo")).await;
|
||||||
|
@ -470,14 +473,14 @@ mod tests {
|
||||||
assert_eq!(loader.loaded(), vec![1]);
|
assert_eq!(loader.loaded(), vec![1]);
|
||||||
|
|
||||||
// still cached
|
// still cached
|
||||||
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1))
|
let res = tokio::time::timeout(Duration::from_millis(10), cache.get(1, ()))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
assert_eq!(res, String::from("foo"));
|
assert_eq!(res, String::from("foo"));
|
||||||
assert_eq!(loader.loaded(), vec![1]);
|
assert_eq!(loader.loaded(), vec![1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn setup() -> (Arc<Cache<u8, String>>, Arc<TestLoader>) {
|
fn setup() -> (Arc<Cache<u8, String, ()>>, Arc<TestLoader>) {
|
||||||
let loader = Arc::new(TestLoader::default());
|
let loader = Arc::new(TestLoader::default());
|
||||||
let cache = Arc::new(Cache::new(
|
let cache = Arc::new(Cache::new(
|
||||||
Arc::clone(&loader) as _,
|
Arc::clone(&loader) as _,
|
||||||
|
@ -536,8 +539,9 @@ mod tests {
|
||||||
impl Loader for TestLoader {
|
impl Loader for TestLoader {
|
||||||
type K = u8;
|
type K = u8;
|
||||||
type V = String;
|
type V = String;
|
||||||
|
type Extra = ();
|
||||||
|
|
||||||
async fn load(&self, k: u8) -> String {
|
async fn load(&self, k: u8, _extra: ()) -> String {
|
||||||
self.loaded.lock().push(k);
|
self.loaded.lock().push(k);
|
||||||
|
|
||||||
// need to capture the cloned notify handle, otherwise the lock guard leaks into the
|
// need to capture the cloned notify handle, otherwise the lock guard leaks into the
|
||||||
|
@ -569,7 +573,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_bounds() {
|
fn test_bounds() {
|
||||||
assert_send::<Cache<u8, u8>>();
|
assert_send::<Cache<u8, u8, ()>>();
|
||||||
assert_sync::<Cache<u8, u8>>();
|
assert_sync::<Cache<u8, u8, ()>>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,25 +9,27 @@ use metric::{DurationHistogram, U64Counter};
|
||||||
use super::Loader;
|
use super::Loader;
|
||||||
|
|
||||||
/// Wraps a [`Loader`] and adds metrics.
|
/// Wraps a [`Loader`] and adds metrics.
|
||||||
pub struct MetricsLoader<K, V>
|
pub struct MetricsLoader<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Send + 'static,
|
K: Send + 'static,
|
||||||
V: Send + 'static,
|
V: Send + 'static,
|
||||||
|
Extra: Send + 'static,
|
||||||
{
|
{
|
||||||
inner: Box<dyn Loader<K = K, V = V>>,
|
inner: Box<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||||
time_provider: Arc<dyn TimeProvider>,
|
time_provider: Arc<dyn TimeProvider>,
|
||||||
metric_calls: U64Counter,
|
metric_calls: U64Counter,
|
||||||
metric_duration: DurationHistogram,
|
metric_duration: DurationHistogram,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> MetricsLoader<K, V>
|
impl<K, V, Extra> MetricsLoader<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Send + 'static,
|
K: Send + 'static,
|
||||||
V: Send + 'static,
|
V: Send + 'static,
|
||||||
|
Extra: Send + 'static,
|
||||||
{
|
{
|
||||||
/// Create new wrapper.
|
/// Create new wrapper.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
inner: Box<dyn Loader<K = K, V = V>>,
|
inner: Box<dyn Loader<K = K, V = V, Extra = Extra>>,
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
time_provider: Arc<dyn TimeProvider>,
|
time_provider: Arc<dyn TimeProvider>,
|
||||||
metric_registry: &metric::Registry,
|
metric_registry: &metric::Registry,
|
||||||
|
@ -54,10 +56,11 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> std::fmt::Debug for MetricsLoader<K, V>
|
impl<K, V, Extra> std::fmt::Debug for MetricsLoader<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Send + 'static,
|
K: Send + 'static,
|
||||||
V: Send + 'static,
|
V: Send + 'static,
|
||||||
|
Extra: Send + 'static,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("MetricsLoader").finish_non_exhaustive()
|
f.debug_struct("MetricsLoader").finish_non_exhaustive()
|
||||||
|
@ -65,19 +68,21 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<K, V> Loader for MetricsLoader<K, V>
|
impl<K, V, Extra> Loader for MetricsLoader<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Send + 'static,
|
K: Send + 'static,
|
||||||
V: Send + 'static,
|
V: Send + 'static,
|
||||||
|
Extra: Send + 'static,
|
||||||
{
|
{
|
||||||
type K = K;
|
type K = K;
|
||||||
type V = V;
|
type V = V;
|
||||||
|
type Extra = Extra;
|
||||||
|
|
||||||
async fn load(&self, k: Self::K) -> Self::V {
|
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
|
||||||
self.metric_calls.inc(1);
|
self.metric_calls.inc(1);
|
||||||
|
|
||||||
let t_start = self.time_provider.now();
|
let t_start = self.time_provider.now();
|
||||||
let v = self.inner.load(k).await;
|
let v = self.inner.load(k, extra).await;
|
||||||
let t_end = self.time_provider.now();
|
let t_end = self.time_provider.now();
|
||||||
|
|
||||||
self.metric_duration.record(t_end - t_start);
|
self.metric_duration.record(t_end - t_start);
|
||||||
|
@ -103,7 +108,7 @@ mod tests {
|
||||||
let metric_registry = Arc::new(metric::Registry::new());
|
let metric_registry = Arc::new(metric::Registry::new());
|
||||||
|
|
||||||
let time_provider_captured = Arc::clone(&time_provider);
|
let time_provider_captured = Arc::clone(&time_provider);
|
||||||
let inner_loader = Box::new(FunctionLoader::new(move |x: u64| {
|
let inner_loader = Box::new(FunctionLoader::new(move |x: u64, _extra: ()| {
|
||||||
let time_provider_captured = Arc::clone(&time_provider_captured);
|
let time_provider_captured = Arc::clone(&time_provider_captured);
|
||||||
async move {
|
async move {
|
||||||
time_provider_captured.inc(Duration::from_secs(10));
|
time_provider_captured.inc(Duration::from_secs(10));
|
||||||
|
@ -134,7 +139,7 @@ mod tests {
|
||||||
panic!("Wrong observation type");
|
panic!("Wrong observation type");
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!(loader.load(42).await, String::from("42"));
|
assert_eq!(loader.load(42, ()).await, String::from("42"));
|
||||||
|
|
||||||
let mut reporter = RawReporter::default();
|
let mut reporter = RawReporter::default();
|
||||||
metric_registry.report(&mut reporter);
|
metric_registry.report(&mut reporter);
|
||||||
|
|
|
@ -11,46 +11,51 @@ pub trait Loader: std::fmt::Debug + Send + Sync + 'static {
|
||||||
/// Cache key.
|
/// Cache key.
|
||||||
type K: Send + 'static;
|
type K: Send + 'static;
|
||||||
|
|
||||||
|
/// Extra data needed when loading a missing entry. Specify `()` if not needed.
|
||||||
|
type Extra: Send + 'static;
|
||||||
|
|
||||||
/// Cache value.
|
/// Cache value.
|
||||||
type V: Send + 'static;
|
type V: Send + 'static;
|
||||||
|
|
||||||
/// Load value for given key.
|
/// Load value for given key, using the extra data if needed.
|
||||||
async fn load(&self, k: Self::K) -> Self::V;
|
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Simple-to-use wrapper for async functions to act as a [`Loader`].
|
/// Simple-to-use wrapper for async functions to act as a [`Loader`].
|
||||||
pub struct FunctionLoader<K, V> {
|
pub struct FunctionLoader<K, V, Extra> {
|
||||||
loader: Box<dyn (Fn(K) -> BoxFuture<'static, V>) + Send + Sync>,
|
loader: Box<dyn (Fn(K, Extra) -> BoxFuture<'static, V>) + Send + Sync>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> FunctionLoader<K, V> {
|
impl<K, V, Extra> FunctionLoader<K, V, Extra> {
|
||||||
/// Create loader from function.
|
/// Create loader from function.
|
||||||
pub fn new<T, F>(loader: T) -> Self
|
pub fn new<T, F>(loader: T) -> Self
|
||||||
where
|
where
|
||||||
T: Fn(K) -> F + Send + Sync + 'static,
|
T: Fn(K, Extra) -> F + Send + Sync + 'static,
|
||||||
F: Future<Output = V> + Send + 'static,
|
F: Future<Output = V> + Send + 'static,
|
||||||
{
|
{
|
||||||
let loader = Box::new(move |k| loader(k).boxed());
|
let loader = Box::new(move |k, extra| loader(k, extra).boxed());
|
||||||
Self { loader }
|
Self { loader }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<K, V> std::fmt::Debug for FunctionLoader<K, V> {
|
impl<K, V, Extra> std::fmt::Debug for FunctionLoader<K, V, Extra> {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("FunctionLoader").finish_non_exhaustive()
|
f.debug_struct("FunctionLoader").finish_non_exhaustive()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<K, V> Loader for FunctionLoader<K, V>
|
impl<K, V, Extra> Loader for FunctionLoader<K, V, Extra>
|
||||||
where
|
where
|
||||||
K: Send + 'static,
|
K: Send + 'static,
|
||||||
V: Send + 'static,
|
V: Send + 'static,
|
||||||
|
Extra: Send + 'static,
|
||||||
{
|
{
|
||||||
type K = K;
|
type K = K;
|
||||||
type V = V;
|
type V = V;
|
||||||
|
type Extra = Extra;
|
||||||
|
|
||||||
async fn load(&self, k: Self::K) -> Self::V {
|
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
|
||||||
(self.loader)(k).await
|
(self.loader)(k, extra).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ pub const TTL_NON_EXISTING: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
const CACHE_ID: &str = "namespace";
|
const CACHE_ID: &str = "namespace";
|
||||||
|
|
||||||
type CacheT = Cache<Arc<str>, Option<Arc<CachedNamespace>>>;
|
type CacheT = Cache<Arc<str>, Option<Arc<CachedNamespace>>, ()>;
|
||||||
|
|
||||||
/// Cache for namespace-related attributes.
|
/// Cache for namespace-related attributes.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -42,7 +42,8 @@ impl NamespaceCache {
|
||||||
metric_registry: &metric::Registry,
|
metric_registry: &metric::Registry,
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let loader = Box::new(FunctionLoader::new(move |namespace_name: Arc<str>| {
|
let loader = Box::new(FunctionLoader::new(
|
||||||
|
move |namespace_name: Arc<str>, _extra: ()| {
|
||||||
let catalog = Arc::clone(&catalog);
|
let catalog = Arc::clone(&catalog);
|
||||||
let backoff_config = backoff_config.clone();
|
let backoff_config = backoff_config.clone();
|
||||||
|
|
||||||
|
@ -52,9 +53,9 @@ impl NamespaceCache {
|
||||||
let mut repos = catalog.repositories().await;
|
let mut repos = catalog.repositories().await;
|
||||||
match get_schema_by_name(&namespace_name, repos.as_mut()).await {
|
match get_schema_by_name(&namespace_name, repos.as_mut()).await {
|
||||||
Ok(schema) => Ok(Some(schema)),
|
Ok(schema) => Ok(Some(schema)),
|
||||||
Err(iox_catalog::interface::Error::NamespaceNotFound { .. }) => {
|
Err(iox_catalog::interface::Error::NamespaceNotFound {
|
||||||
Ok(None)
|
..
|
||||||
}
|
}) => Ok(None),
|
||||||
Err(e) => Err(e),
|
Err(e) => Err(e),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -65,7 +66,8 @@ impl NamespaceCache {
|
||||||
schema: Arc::new(schema),
|
schema: Arc::new(schema),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}));
|
},
|
||||||
|
));
|
||||||
let loader = Arc::new(MetricsLoader::new(
|
let loader = Arc::new(MetricsLoader::new(
|
||||||
loader,
|
loader,
|
||||||
CACHE_ID,
|
CACHE_ID,
|
||||||
|
@ -106,7 +108,10 @@ impl NamespaceCache {
|
||||||
|
|
||||||
/// Get namespace schema by name.
|
/// Get namespace schema by name.
|
||||||
pub async fn schema(&self, name: Arc<str>) -> Option<Arc<NamespaceSchema>> {
|
pub async fn schema(&self, name: Arc<str>) -> Option<Arc<NamespaceSchema>> {
|
||||||
self.cache.get(name).await.map(|n| Arc::clone(&n.schema))
|
self.cache
|
||||||
|
.get(name, ())
|
||||||
|
.await
|
||||||
|
.map(|n| Arc::clone(&n.schema))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,7 +98,7 @@ impl CachedParquetFiles {
|
||||||
/// DOES NOT CACHE the actual parquet bytes from object store
|
/// DOES NOT CACHE the actual parquet bytes from object store
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ParquetFileCache {
|
pub struct ParquetFileCache {
|
||||||
cache: Cache<TableId, Arc<CachedParquetFiles>>,
|
cache: Cache<TableId, Arc<CachedParquetFiles>, ()>,
|
||||||
|
|
||||||
/// Handle that allows clearing entries for existing cache entries
|
/// Handle that allows clearing entries for existing cache entries
|
||||||
backend: SharedBackend<TableId, Arc<CachedParquetFiles>>,
|
backend: SharedBackend<TableId, Arc<CachedParquetFiles>>,
|
||||||
|
@ -113,7 +113,7 @@ impl ParquetFileCache {
|
||||||
metric_registry: &metric::Registry,
|
metric_registry: &metric::Registry,
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let loader = Box::new(FunctionLoader::new(move |table_id: TableId| {
|
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
|
||||||
let catalog = Arc::clone(&catalog);
|
let catalog = Arc::clone(&catalog);
|
||||||
let backoff_config = backoff_config.clone();
|
let backoff_config = backoff_config.clone();
|
||||||
|
|
||||||
|
@ -177,7 +177,7 @@ impl ParquetFileCache {
|
||||||
|
|
||||||
/// Get list of cached parquet files, by table id
|
/// Get list of cached parquet files, by table id
|
||||||
pub async fn get(&self, table_id: TableId) -> Arc<CachedParquetFiles> {
|
pub async fn get(&self, table_id: TableId) -> Arc<CachedParquetFiles> {
|
||||||
self.cache.get(table_id).await
|
self.cache.get(table_id, ()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark the entry for table_id as expired (and needs a refresh)
|
/// Mark the entry for table_id as expired (and needs a refresh)
|
||||||
|
|
|
@ -27,7 +27,7 @@ const CACHE_ID: &str = "partition";
|
||||||
/// Cache for partition-related attributes.
|
/// Cache for partition-related attributes.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PartitionCache {
|
pub struct PartitionCache {
|
||||||
cache: Cache<PartitionId, CachedPartition>,
|
cache: Cache<PartitionId, CachedPartition, ()>,
|
||||||
backend: SharedBackend<PartitionId, CachedPartition>,
|
backend: SharedBackend<PartitionId, CachedPartition>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,7 +40,8 @@ impl PartitionCache {
|
||||||
metric_registry: &metric::Registry,
|
metric_registry: &metric::Registry,
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let loader = Box::new(FunctionLoader::new(move |partition_id| {
|
let loader = Box::new(FunctionLoader::new(
|
||||||
|
move |partition_id: PartitionId, _extra: ()| {
|
||||||
let catalog = Arc::clone(&catalog);
|
let catalog = Arc::clone(&catalog);
|
||||||
let backoff_config = backoff_config.clone();
|
let backoff_config = backoff_config.clone();
|
||||||
|
|
||||||
|
@ -63,7 +64,8 @@ impl PartitionCache {
|
||||||
sort_key: Arc::new(partition.sort_key()),
|
sort_key: Arc::new(partition.sort_key()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}));
|
},
|
||||||
|
));
|
||||||
let loader = Arc::new(MetricsLoader::new(
|
let loader = Arc::new(MetricsLoader::new(
|
||||||
loader,
|
loader,
|
||||||
CACHE_ID,
|
CACHE_ID,
|
||||||
|
@ -90,12 +92,12 @@ impl PartitionCache {
|
||||||
|
|
||||||
/// Get sequencer ID.
|
/// Get sequencer ID.
|
||||||
pub async fn sequencer_id(&self, partition_id: PartitionId) -> SequencerId {
|
pub async fn sequencer_id(&self, partition_id: PartitionId) -> SequencerId {
|
||||||
self.cache.get(partition_id).await.sequencer_id
|
self.cache.get(partition_id, ()).await.sequencer_id
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get sort key
|
/// Get sort key
|
||||||
pub async fn sort_key(&self, partition_id: PartitionId) -> Arc<Option<SortKey>> {
|
pub async fn sort_key(&self, partition_id: PartitionId) -> Arc<Option<SortKey>> {
|
||||||
self.cache.get(partition_id).await.sort_key
|
self.cache.get(partition_id, ()).await.sort_key
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Expire partition if the cached sort key does NOT cover the given set of columns.
|
/// Expire partition if the cached sort key does NOT cover the given set of columns.
|
||||||
|
|
|
@ -28,7 +28,7 @@ const CACHE_ID: &str = "processed_tombstones";
|
||||||
/// Cache for processed tombstones.
|
/// Cache for processed tombstones.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ProcessedTombstonesCache {
|
pub struct ProcessedTombstonesCache {
|
||||||
cache: Cache<(ParquetFileId, TombstoneId), bool>,
|
cache: Cache<(ParquetFileId, TombstoneId), bool, ()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ProcessedTombstonesCache {
|
impl ProcessedTombstonesCache {
|
||||||
|
@ -41,7 +41,7 @@ impl ProcessedTombstonesCache {
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let loader = Box::new(FunctionLoader::new(
|
let loader = Box::new(FunctionLoader::new(
|
||||||
move |(parquet_file_id, tombstone_id)| {
|
move |(parquet_file_id, tombstone_id), _extra: ()| {
|
||||||
let catalog = Arc::clone(&catalog);
|
let catalog = Arc::clone(&catalog);
|
||||||
let backoff_config = backoff_config.clone();
|
let backoff_config = backoff_config.clone();
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ impl ProcessedTombstonesCache {
|
||||||
|
|
||||||
/// Check if the specified tombstone is mark as "processed" for the given parquet file.
|
/// Check if the specified tombstone is mark as "processed" for the given parquet file.
|
||||||
pub async fn exists(&self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> bool {
|
pub async fn exists(&self, parquet_file_id: ParquetFileId, tombstone_id: TombstoneId) -> bool {
|
||||||
self.cache.get((parquet_file_id, tombstone_id)).await
|
self.cache.get((parquet_file_id, tombstone_id), ()).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ const CACHE_ID: &str = "read_buffer";
|
||||||
/// Cache for parquet file data decoded into read buffer chunks
|
/// Cache for parquet file data decoded into read buffer chunks
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ReadBufferCache {
|
pub struct ReadBufferCache {
|
||||||
cache: Cache<ParquetFileId, Arc<RBChunk>>,
|
cache: Cache<ParquetFileId, Arc<RBChunk>, ()>,
|
||||||
|
|
||||||
/// Handle that allows clearing entries for existing cache entries
|
/// Handle that allows clearing entries for existing cache entries
|
||||||
_backend: SharedBackend<ParquetFileId, Arc<RBChunk>>,
|
_backend: SharedBackend<ParquetFileId, Arc<RBChunk>>,
|
||||||
|
@ -39,7 +39,7 @@ impl ReadBufferCache {
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let loader = Box::new(FunctionLoader::new(
|
let loader = Box::new(FunctionLoader::new(
|
||||||
move |parquet_file_id: ParquetFileId| {
|
move |parquet_file_id: ParquetFileId, _extra| {
|
||||||
let backoff_config = BackoffConfig::default();
|
let backoff_config = BackoffConfig::default();
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
|
@ -89,7 +89,7 @@ impl ReadBufferCache {
|
||||||
pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc<RBChunk> {
|
pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc<RBChunk> {
|
||||||
let parquet_file = &decoded_parquet_file.parquet_file;
|
let parquet_file = &decoded_parquet_file.parquet_file;
|
||||||
|
|
||||||
self.cache.get(parquet_file.id).await
|
self.cache.get(parquet_file.id, ()).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,7 @@ pub const TTL_NON_EXISTING: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
const CACHE_ID: &str = "table";
|
const CACHE_ID: &str = "table";
|
||||||
|
|
||||||
type CacheT = Cache<TableId, Option<Arc<CachedTable>>>;
|
type CacheT = Cache<TableId, Option<Arc<CachedTable>>, ()>;
|
||||||
|
|
||||||
/// Cache for table-related queries.
|
/// Cache for table-related queries.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -39,7 +39,7 @@ impl TableCache {
|
||||||
metric_registry: &metric::Registry,
|
metric_registry: &metric::Registry,
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let loader = Box::new(FunctionLoader::new(move |table_id| {
|
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
|
||||||
let catalog = Arc::clone(&catalog);
|
let catalog = Arc::clone(&catalog);
|
||||||
let backoff_config = backoff_config.clone();
|
let backoff_config = backoff_config.clone();
|
||||||
|
|
||||||
|
@ -95,14 +95,17 @@ impl TableCache {
|
||||||
///
|
///
|
||||||
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
|
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
|
||||||
pub async fn name(&self, table_id: TableId) -> Option<Arc<str>> {
|
pub async fn name(&self, table_id: TableId) -> Option<Arc<str>> {
|
||||||
self.cache.get(table_id).await.map(|t| Arc::clone(&t.name))
|
self.cache
|
||||||
|
.get(table_id, ())
|
||||||
|
.await
|
||||||
|
.map(|t| Arc::clone(&t.name))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the table namespace ID for the given table ID.
|
/// Get the table namespace ID for the given table ID.
|
||||||
///
|
///
|
||||||
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
|
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
|
||||||
pub async fn namespace_id(&self, table_id: TableId) -> Option<NamespaceId> {
|
pub async fn namespace_id(&self, table_id: TableId) -> Option<NamespaceId> {
|
||||||
self.cache.get(table_id).await.map(|t| t.namespace_id)
|
self.cache.get(table_id, ()).await.map(|t| t.namespace_id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ impl CachedTombstones {
|
||||||
/// Cache for tombstones for a particular table
|
/// Cache for tombstones for a particular table
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TombstoneCache {
|
pub struct TombstoneCache {
|
||||||
cache: Cache<TableId, CachedTombstones>,
|
cache: Cache<TableId, CachedTombstones, ()>,
|
||||||
/// Handle that allows clearing entries for existing cache entries
|
/// Handle that allows clearing entries for existing cache entries
|
||||||
backend: SharedBackend<TableId, CachedTombstones>,
|
backend: SharedBackend<TableId, CachedTombstones>,
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ impl TombstoneCache {
|
||||||
metric_registry: &metric::Registry,
|
metric_registry: &metric::Registry,
|
||||||
ram_pool: Arc<ResourcePool<RamSize>>,
|
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let loader = Box::new(FunctionLoader::new(move |table_id: TableId| {
|
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
|
||||||
let catalog = Arc::clone(&catalog);
|
let catalog = Arc::clone(&catalog);
|
||||||
let backoff_config = backoff_config.clone();
|
let backoff_config = backoff_config.clone();
|
||||||
|
|
||||||
|
@ -146,7 +146,7 @@ impl TombstoneCache {
|
||||||
|
|
||||||
/// Get list of cached tombstones, by table id
|
/// Get list of cached tombstones, by table id
|
||||||
pub async fn get(&self, table_id: TableId) -> CachedTombstones {
|
pub async fn get(&self, table_id: TableId) -> CachedTombstones {
|
||||||
self.cache.get(table_id).await
|
self.cache.get(table_id, ()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark the entry for table_id as expired / needs a refresh
|
/// Mark the entry for table_id as expired / needs a refresh
|
||||||
|
|
Loading…
Reference in New Issue