From 5e187ae1c076d2be4a22b652d9d29a291f69908c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 1 Sep 2022 12:22:12 +0000 Subject: [PATCH] refactor: use concrete type in `MetricsLoader` (#5525) The API user may still use a `Box` if they want, but they technically don't have to. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- cache_system/src/loader/metrics.rs | 42 +++++++++------------- cache_system/src/loader/mod.rs | 22 ++++++++++-- querier/src/cache/namespace.rs | 42 +++++++++++----------- querier/src/cache/parquet_file.rs | 4 +-- querier/src/cache/partition.rs | 44 +++++++++++------------ querier/src/cache/processed_tombstones.rs | 36 +++++++++---------- querier/src/cache/projected_schema.rs | 7 ++-- querier/src/cache/read_buffer.rs | 7 ++-- querier/src/cache/tombstones.rs | 4 +-- 9 files changed, 104 insertions(+), 104 deletions(-) diff --git a/cache_system/src/loader/metrics.rs b/cache_system/src/loader/metrics.rs index e6499557f5..7cb8d3f530 100644 --- a/cache_system/src/loader/metrics.rs +++ b/cache_system/src/loader/metrics.rs @@ -1,6 +1,6 @@ //! Metrics for [`Loader`]. -use std::{hash::Hash, sync::Arc}; +use std::sync::Arc; use async_trait::async_trait; use iox_time::TimeProvider; @@ -11,25 +11,21 @@ use pdatastructs::filters::{bloomfilter::BloomFilter, Filter}; use super::Loader; /// Wraps a [`Loader`] and adds metrics. -pub struct MetricsLoader +pub struct MetricsLoader where - K: Hash + Send + 'static, - V: Send + 'static, - Extra: Send + 'static, + L: Loader, { - inner: Box>, + inner: L, time_provider: Arc, metric_calls_new: U64Counter, metric_calls_probably_reloaded: U64Counter, metric_duration: DurationHistogram, - seen: Mutex>, + seen: Mutex>, } -impl MetricsLoader +impl MetricsLoader where - K: Hash + Send + 'static, - V: Send + 'static, - Extra: Send + 'static, + L: Loader, { /// Create new wrapper. /// @@ -37,7 +33,7 @@ where /// If `testing` is set, the "seen" metrics will NOT be processed correctly because the underlying data structure is /// too expensive to create many times a second in an un-optimized debug build. pub fn new( - inner: Box>, + inner: L, name: &'static str, time_provider: Arc, metric_registry: &metric::Registry, @@ -110,11 +106,9 @@ where } } -impl std::fmt::Debug for MetricsLoader +impl std::fmt::Debug for MetricsLoader where - K: Hash + Send + 'static, - V: Send + 'static, - Extra: Send + 'static, + L: Loader, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MetricsLoader").finish_non_exhaustive() @@ -122,15 +116,13 @@ where } #[async_trait] -impl Loader for MetricsLoader +impl Loader for MetricsLoader where - K: Hash + Send + 'static, - V: Send + 'static, - Extra: Send + 'static, + L: Loader, { - type K = K; - type V = V; - type Extra = Extra; + type K = L::K; + type V = L::V; + type Extra = L::Extra; async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V { { @@ -172,13 +164,13 @@ mod tests { let time_provider_captured = Arc::clone(&time_provider); let d = Duration::from_secs(10); - let inner_loader = Box::new(FunctionLoader::new(move |x: u64, _extra: ()| { + let inner_loader = FunctionLoader::new(move |x: u64, _extra: ()| { let time_provider_captured = Arc::clone(&time_provider_captured); async move { time_provider_captured.inc(d); x.to_string() } - })); + }); let loader = MetricsLoader::new( inner_loader, diff --git a/cache_system/src/loader/mod.rs b/cache_system/src/loader/mod.rs index 30657d3622..3d8e64d1cf 100644 --- a/cache_system/src/loader/mod.rs +++ b/cache_system/src/loader/mod.rs @@ -1,7 +1,7 @@ //! How to load new cache entries. use async_trait::async_trait; use futures::{future::BoxFuture, FutureExt}; -use std::future::Future; +use std::{future::Future, hash::Hash}; pub mod metrics; @@ -9,7 +9,7 @@ pub mod metrics; #[async_trait] pub trait Loader: std::fmt::Debug + Send + Sync + 'static { /// Cache key. - type K: Send + 'static; + type K: Hash + Send + 'static; /// Extra data needed when loading a missing entry. Specify `()` if not needed. type Extra: Send + 'static; @@ -21,6 +21,22 @@ pub trait Loader: std::fmt::Debug + Send + Sync + 'static { async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V; } +#[async_trait] +impl Loader for Box> +where + K: Hash + Send + 'static, + V: Send + 'static, + Extra: Send + 'static, +{ + type K = K; + type V = V; + type Extra = Extra; + + async fn load(&self, k: Self::K, extra: Self::Extra) -> Self::V { + self.as_ref().load(k, extra).await + } +} + /// Simple-to-use wrapper for async functions to act as a [`Loader`]. pub struct FunctionLoader { loader: Box BoxFuture<'static, V>) + Send + Sync>, @@ -47,7 +63,7 @@ impl std::fmt::Debug for FunctionLoader { #[async_trait] impl Loader for FunctionLoader where - K: Send + 'static, + K: Hash + Send + 'static, V: Send + 'static, Extra: Send + 'static, { diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index 0e6e6a83a6..f997b4ba29 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -75,30 +75,28 @@ impl NamespaceCache { handle: &Handle, testing: bool, ) -> Self { - let loader = Box::new(FunctionLoader::new( - move |namespace_name: Arc, _extra: ()| { - let catalog = Arc::clone(&catalog); - let backoff_config = backoff_config.clone(); + let loader = FunctionLoader::new(move |namespace_name: Arc, _extra: ()| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); - async move { - let schema = Backoff::new(&backoff_config) - .retry_all_errors("get namespace schema", || async { - let mut repos = catalog.repositories().await; - match get_schema_by_name(&namespace_name, repos.as_mut()).await { - Ok(schema) => Ok(Some(schema)), - Err(iox_catalog::interface::Error::NamespaceNotFoundByName { - .. - }) => Ok(None), - Err(e) => Err(e), - } - }) - .await - .expect("retry forever")?; + async move { + let schema = Backoff::new(&backoff_config) + .retry_all_errors("get namespace schema", || async { + let mut repos = catalog.repositories().await; + match get_schema_by_name(&namespace_name, repos.as_mut()).await { + Ok(schema) => Ok(Some(schema)), + Err(iox_catalog::interface::Error::NamespaceNotFoundByName { + .. + }) => Ok(None), + Err(e) => Err(e), + } + }) + .await + .expect("retry forever")?; - Some(Arc::new((&schema).into())) - } - }, - )); + Some(Arc::new((&schema).into())) + } + }); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index 06506fcf76..cee9a21e7b 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -105,7 +105,7 @@ impl ParquetFileCache { ram_pool: Arc>, testing: bool, ) -> Self { - let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| { + let loader = FunctionLoader::new(move |table_id: TableId, _extra: ()| { let catalog = Arc::clone(&catalog); let backoff_config = backoff_config.clone(); @@ -138,7 +138,7 @@ impl ParquetFileCache { .await .expect("retry forever") } - })); + }); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 7a4e8a3553..744ef054c2 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -48,32 +48,30 @@ impl PartitionCache { ram_pool: Arc>, testing: bool, ) -> Self { - let loader = Box::new(FunctionLoader::new( - move |partition_id: PartitionId, _extra: ()| { - let catalog = Arc::clone(&catalog); - let backoff_config = backoff_config.clone(); + let loader = FunctionLoader::new(move |partition_id: PartitionId, _extra: ()| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); - async move { - let partition = Backoff::new(&backoff_config) - .retry_all_errors("get partition_key", || async { - catalog - .repositories() - .await - .partitions() - .get_by_id(partition_id) - .await - }) - .await - .expect("retry forever") - .expect("partition gone from catalog?!"); + async move { + let partition = Backoff::new(&backoff_config) + .retry_all_errors("get partition_key", || async { + catalog + .repositories() + .await + .partitions() + .get_by_id(partition_id) + .await + }) + .await + .expect("retry forever") + .expect("partition gone from catalog?!"); - CachedPartition { - shard_id: partition.shard_id, - sort_key: Arc::new(partition.sort_key()), - } + CachedPartition { + shard_id: partition.shard_id, + sort_key: Arc::new(partition.sort_key()), } - }, - )); + } + }); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, diff --git a/querier/src/cache/processed_tombstones.rs b/querier/src/cache/processed_tombstones.rs index 8334551610..fbbfe24d1e 100644 --- a/querier/src/cache/processed_tombstones.rs +++ b/querier/src/cache/processed_tombstones.rs @@ -52,26 +52,24 @@ impl ProcessedTombstonesCache { ram_pool: Arc>, testing: bool, ) -> Self { - let loader = Box::new(FunctionLoader::new( - move |(parquet_file_id, tombstone_id), _extra: ()| { - let catalog = Arc::clone(&catalog); - let backoff_config = backoff_config.clone(); + let loader = FunctionLoader::new(move |(parquet_file_id, tombstone_id), _extra: ()| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); - async move { - Backoff::new(&backoff_config) - .retry_all_errors("processed tombstone exists", || async { - catalog - .repositories() - .await - .processed_tombstones() - .exist(parquet_file_id, tombstone_id) - .await - }) - .await - .expect("retry forever") - } - }, - )); + async move { + Backoff::new(&backoff_config) + .retry_all_errors("processed tombstone exists", || async { + catalog + .repositories() + .await + .processed_tombstones() + .exist(parquet_file_id, tombstone_id) + .await + }) + .await + .expect("retry forever") + } + }); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, diff --git a/querier/src/cache/projected_schema.rs b/querier/src/cache/projected_schema.rs index 331a9d7b47..f219cd1cfb 100644 --- a/querier/src/cache/projected_schema.rs +++ b/querier/src/cache/projected_schema.rs @@ -81,16 +81,15 @@ impl ProjectedSchemaCache { ram_pool: Arc>, testing: bool, ) -> Self { - let loader = Box::new(FunctionLoader::new( - move |key: CacheKey, table_schema: Arc| async move { + let loader = + FunctionLoader::new(move |key: CacheKey, table_schema: Arc| async move { let projection: Vec<&str> = key.projection.iter().map(|s| s.as_str()).collect(); Arc::new( table_schema .select_by_names(&projection) .expect("Bug in schema projection"), ) - }, - )); + }); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index f00eb83147..c9baf6076f 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -56,8 +56,8 @@ impl ReadBufferCache { testing: bool, ) -> Self { let metric_registry_captured = Arc::clone(&metric_registry); - let loader = Box::new(FunctionLoader::new( - move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| { + let loader = + FunctionLoader::new(move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| { let backoff_config = backoff_config.clone(); let metric_registry = Arc::clone(&metric_registry_captured); @@ -82,8 +82,7 @@ impl ReadBufferCache { Arc::new(rb_chunk) } - }, - )); + }); let loader = Arc::new(MetricsLoader::new( loader, diff --git a/querier/src/cache/tombstones.rs b/querier/src/cache/tombstones.rs index 76818d50d0..af7c70928e 100644 --- a/querier/src/cache/tombstones.rs +++ b/querier/src/cache/tombstones.rs @@ -94,7 +94,7 @@ impl TombstoneCache { ram_pool: Arc>, testing: bool, ) -> Self { - let loader = Box::new(FunctionLoader::new(move |table_id: TableId, _extra: ()| { + let loader = FunctionLoader::new(move |table_id: TableId, _extra: ()| { let catalog = Arc::clone(&catalog); let backoff_config = backoff_config.clone(); @@ -116,7 +116,7 @@ impl TombstoneCache { .await .expect("retry forever") } - })); + }); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID,