refactor: use concrete type in `MetricsLoader` (#5525)

The API user may still use a `Box<dyn ...>` if they want, but they
technically don't have to.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-09-01 12:22:12 +00:00 committed by GitHub
parent 3971c67beb
commit 5e187ae1c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 104 additions and 104 deletions

View File

@ -1,6 +1,6 @@
//! Metrics for [`Loader`].
use std::{hash::Hash, sync::Arc};
use std::sync::Arc;
use async_trait::async_trait;
use iox_time::TimeProvider;
@ -11,25 +11,21 @@ use pdatastructs::filters::{bloomfilter::BloomFilter, Filter};
use super::Loader;
/// Wraps a [`Loader`] and adds metrics.
pub struct MetricsLoader<K, V, Extra>
pub struct MetricsLoader<L>
where
K: Hash + Send + 'static,
V: Send + 'static,
Extra: Send + 'static,
L: Loader,
{
inner: Box<dyn Loader<K = K, V = V, Extra = Extra>>,
inner: L,
time_provider: Arc<dyn TimeProvider>,
metric_calls_new: U64Counter,
metric_calls_probably_reloaded: U64Counter,
metric_duration: DurationHistogram,
seen: Mutex<BloomFilter<K>>,
seen: Mutex<BloomFilter<L::K>>,
}
impl<K, V, Extra> MetricsLoader<K, V, Extra>
impl<L> MetricsLoader<L>
where
K: Hash + Send + 'static,
V: Send + 'static,
Extra: Send + 'static,
L: Loader,
{
/// Create new wrapper.
///
@ -37,7 +33,7 @@ where
/// If `testing` is set, the "seen" metrics will NOT be processed correctly because the underlying data structure is
/// too expensive to create many times a second in an un-optimized debug build.
pub fn new(
inner: Box<dyn Loader<K = K, V = V, Extra = Extra>>,
inner: L,
name: &'static str,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
@ -110,11 +106,9 @@ where
}
}
impl<K, V, Extra> std::fmt::Debug for MetricsLoader<K, V, Extra>
impl<L> std::fmt::Debug for MetricsLoader<L>
where
K: Hash + Send + 'static,
V: Send + 'static,
Extra: Send + 'static,
L: Loader,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsLoader").finish_non_exhaustive()
@ -122,15 +116,13 @@ where
}
#[async_trait]
impl<K, V, Extra> Loader for MetricsLoader<K, V, Extra>
impl<L> Loader for MetricsLoader<L>
where
K: Hash + Send + 'static,
V: Send + 'static,
Extra: Send + 'static,
L: Loader,
{
type K = K;
type V = V;
type Extra = Extra;
type K = L::K;
type V = L::V;
type Extra = L::Extra;
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
{
@ -172,13 +164,13 @@ mod tests {
let time_provider_captured = Arc::clone(&time_provider);
let d = Duration::from_secs(10);
let inner_loader = Box::new(FunctionLoader::new(move |x: u64, _extra: ()| {
let inner_loader = FunctionLoader::new(move |x: u64, _extra: ()| {
let time_provider_captured = Arc::clone(&time_provider_captured);
async move {
time_provider_captured.inc(d);
x.to_string()
}
}));
});
let loader = MetricsLoader::new(
inner_loader,

View File

@ -1,7 +1,7 @@
//! How to load new cache entries.
use async_trait::async_trait;
use futures::{future::BoxFuture, FutureExt};
use std::future::Future;
use std::{future::Future, hash::Hash};
pub mod metrics;
@ -9,7 +9,7 @@ pub mod metrics;
#[async_trait]
pub trait Loader: std::fmt::Debug + Send + Sync + 'static {
/// Cache key.
type K: Send + 'static;
type K: Hash + Send + 'static;
/// Extra data needed when loading a missing entry. Specify `()` if not needed.
type Extra: Send + 'static;
@ -21,6 +21,22 @@ pub trait Loader: std::fmt::Debug + Send + Sync + 'static {
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V;
}
#[async_trait]
impl<K, V, Extra> Loader for Box<dyn Loader<K = K, V = V, Extra = Extra>>
where
K: Hash + Send + 'static,
V: Send + 'static,
Extra: Send + 'static,
{
type K = K;
type V = V;
type Extra = Extra;
async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V {
self.as_ref().load(k, extra).await
}
}
/// Simple-to-use wrapper for async functions to act as a [`Loader`].
pub struct FunctionLoader<K, V, Extra> {
loader: Box<dyn (Fn(K, Extra) -> BoxFuture<'static, V>) + Send + Sync>,
@ -47,7 +63,7 @@ impl<K, V, Extra> std::fmt::Debug for FunctionLoader<K, V, Extra> {
#[async_trait]
impl<K, V, Extra> Loader for FunctionLoader<K, V, Extra>
where
K: Send + 'static,
K: Hash + Send + 'static,
V: Send + 'static,
Extra: Send + 'static,
{

View File

@ -75,30 +75,28 @@ impl NamespaceCache {
handle: &Handle,
testing: bool,
) -> Self {
let loader = Box::new(FunctionLoader::new(
move |namespace_name: Arc<str>, _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
let loader = FunctionLoader::new(move |namespace_name: Arc<str>, _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
let schema = Backoff::new(&backoff_config)
.retry_all_errors("get namespace schema", || async {
let mut repos = catalog.repositories().await;
match get_schema_by_name(&namespace_name, repos.as_mut()).await {
Ok(schema) => Ok(Some(schema)),
Err(iox_catalog::interface::Error::NamespaceNotFoundByName {
..
}) => Ok(None),
Err(e) => Err(e),
}
})
.await
.expect("retry forever")?;
async move {
let schema = Backoff::new(&backoff_config)
.retry_all_errors("get namespace schema", || async {
let mut repos = catalog.repositories().await;
match get_schema_by_name(&namespace_name, repos.as_mut()).await {
Ok(schema) => Ok(Some(schema)),
Err(iox_catalog::interface::Error::NamespaceNotFoundByName {
..
}) => Ok(None),
Err(e) => Err(e),
}
})
.await
.expect("retry forever")?;
Some(Arc::new((&schema).into()))
}
},
));
Some(Arc::new((&schema).into()))
}
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,

View File

@ -105,7 +105,7 @@ impl ParquetFileCache {
ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool,
) -> Self {
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
let loader = FunctionLoader::new(move |table_id: TableId, _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
@ -138,7 +138,7 @@ impl ParquetFileCache {
.await
.expect("retry forever")
}
}));
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,

View File

@ -48,32 +48,30 @@ impl PartitionCache {
ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool,
) -> Self {
let loader = Box::new(FunctionLoader::new(
move |partition_id: PartitionId, _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
let loader = FunctionLoader::new(move |partition_id: PartitionId, _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
let partition = Backoff::new(&backoff_config)
.retry_all_errors("get partition_key", || async {
catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
})
.await
.expect("retry forever")
.expect("partition gone from catalog?!");
async move {
let partition = Backoff::new(&backoff_config)
.retry_all_errors("get partition_key", || async {
catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
})
.await
.expect("retry forever")
.expect("partition gone from catalog?!");
CachedPartition {
shard_id: partition.shard_id,
sort_key: Arc::new(partition.sort_key()),
}
CachedPartition {
shard_id: partition.shard_id,
sort_key: Arc::new(partition.sort_key()),
}
},
));
}
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,

View File

@ -52,26 +52,24 @@ impl ProcessedTombstonesCache {
ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool,
) -> Self {
let loader = Box::new(FunctionLoader::new(
move |(parquet_file_id, tombstone_id), _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
let loader = FunctionLoader::new(move |(parquet_file_id, tombstone_id), _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
Backoff::new(&backoff_config)
.retry_all_errors("processed tombstone exists", || async {
catalog
.repositories()
.await
.processed_tombstones()
.exist(parquet_file_id, tombstone_id)
.await
})
.await
.expect("retry forever")
}
},
));
async move {
Backoff::new(&backoff_config)
.retry_all_errors("processed tombstone exists", || async {
catalog
.repositories()
.await
.processed_tombstones()
.exist(parquet_file_id, tombstone_id)
.await
})
.await
.expect("retry forever")
}
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,

View File

@ -81,16 +81,15 @@ impl ProjectedSchemaCache {
ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool,
) -> Self {
let loader = Box::new(FunctionLoader::new(
move |key: CacheKey, table_schema: Arc<Schema>| async move {
let loader =
FunctionLoader::new(move |key: CacheKey, table_schema: Arc<Schema>| async move {
let projection: Vec<&str> = key.projection.iter().map(|s| s.as_str()).collect();
Arc::new(
table_schema
.select_by_names(&projection)
.expect("Bug in schema projection"),
)
},
));
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,

View File

@ -56,8 +56,8 @@ impl ReadBufferCache {
testing: bool,
) -> Self {
let metric_registry_captured = Arc::clone(&metric_registry);
let loader = Box::new(FunctionLoader::new(
move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| {
let loader =
FunctionLoader::new(move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| {
let backoff_config = backoff_config.clone();
let metric_registry = Arc::clone(&metric_registry_captured);
@ -82,8 +82,7 @@ impl ReadBufferCache {
Arc::new(rb_chunk)
}
},
));
});
let loader = Arc::new(MetricsLoader::new(
loader,

View File

@ -94,7 +94,7 @@ impl TombstoneCache {
ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool,
) -> Self {
let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| {
let loader = FunctionLoader::new(move |table_id: TableId, _extra: ()| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
@ -116,7 +116,7 @@ impl TombstoneCache {
.await
.expect("retry forever")
}
}));
});
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,