From b612c3af4e64a457857f5ce8106a366c3304cb3b Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Thu, 17 Jun 2021 18:15:21 +0200 Subject: [PATCH] chore: Switch to smaller cache dep --- Cargo.lock | 103 ++++---------------------------------- server/Cargo.toml | 2 +- server/src/lib.rs | 67 +++++++++++++++++-------- src/influxdb_ioxd.rs | 2 +- src/influxdb_ioxd/http.rs | 12 ++--- 5 files changed, 63 insertions(+), 123 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b24d9a93cb..71337f7e63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,15 +202,6 @@ dependencies = [ "wait-timeout", ] -[[package]] -name = "async-mutex" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" -dependencies = [ - "event-listener", -] - [[package]] name = "async-stream" version = "0.3.2" @@ -464,38 +455,15 @@ dependencies = [ ] [[package]] -name = "cached" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e2afe73808fbaac302e39c9754bfc3c4b4d0f99c9c240b9f4e4efc841ad1b74" -dependencies = [ - "async-mutex", - "async-trait", - "cached_proc_macro", - "cached_proc_macro_types", - "futures", - "hashbrown 0.9.1", - "once_cell", -] - -[[package]] -name = "cached_proc_macro" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf857ae42d910aede5c5186e62684b0d7a597ce2fe3bd14448ab8f7ef439848c" -dependencies = [ - "async-mutex", - "cached_proc_macro_types", - "darling", - "quote", - "syn", -] - -[[package]] -name = "cached_proc_macro_types" +name = "cache_loader_async" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663" +checksum = "81e31ddbb9783157bc1693936b6e10370114ad12edd9814df4b472e9e2aef224" +dependencies = [ + "futures", + "thiserror", + "tokio", +] [[package]] name = "cast" @@ -570,7 +538,7 @@ dependencies = [ "ansi_term 0.11.0", "atty", "bitflags", - "strsim 0.8.0", + "strsim", "textwrap", "unicode-width", "vec_map", @@ -835,41 +803,6 @@ dependencies = [ "sct", ] -[[package]] -name = "darling" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" -dependencies = [ - "darling_core", - "darling_macro", -] - -[[package]] -name = "darling_core" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim 0.9.3", - "syn", -] - -[[package]] -name = "darling_macro" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" -dependencies = [ - "darling_core", - "quote", - "syn", -] - [[package]] name = "dashmap" version = "4.0.2" @@ -1096,12 +1029,6 @@ dependencies = [ "str-buf", ] -[[package]] -name = "event-listener" -version = "2.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" - [[package]] name = "extend" version = "0.1.2" @@ -1581,12 +1508,6 @@ dependencies = [ "tokio-native-tls", ] -[[package]] -name = "ident_case" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" - [[package]] name = "idna" version = "0.2.3" @@ -3714,7 +3635,7 @@ dependencies = [ "arrow_util", "async-trait", "bytes", - "cached", + "cache_loader_async", "chrono", "crc32fast", "data_types", @@ -3988,12 +3909,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" -[[package]] -name = "strsim" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" - [[package]] name = "structopt" version = "0.3.21" diff --git a/server/Cargo.toml b/server/Cargo.toml index 3f950750a4..7b7fa81d29 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ arrow_util = { path = "../arrow_util" } async-trait = "0.1" bytes = { version = "1.0" } chrono = "0.4" -cached = "0.23.0" +cache_loader_async = {version = "0.1.0", features = ["ttl-cache"] } crc32fast = "1.2.0" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 593e140b0b..c14394bc18 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -73,7 +73,6 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::BytesMut; -use cached::proc_macro::cached; use db::load_or_create_preserved_catalog; use init::InitStatus; use observability_deps::tracing::{debug, info, warn}; @@ -95,7 +94,7 @@ use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, Tr pub use crate::config::RemoteTemplate; use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString}; -use cached::Return; +use cache_loader_async::cache_api::LoadingCache; use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId}; pub use db::Db; use generated_types::database_rules::encode_database_rules; @@ -936,7 +935,43 @@ pub trait RemoteServer { /// The connection manager maps a host identifier to a remote server. #[derive(Debug)] -pub struct ConnectionManagerImpl {} +pub struct ConnectionManagerImpl { + cache: LoadingCache, CacheFillError>, +} + +// Error must be Clone because LoadingCache requires so. +#[derive(Debug, Snafu, Clone)] +pub enum CacheFillError { + #[snafu(display("gRPC error: {}", source))] + GrpcError { + source: Arc, + }, +} + +impl ConnectionManagerImpl { + pub fn new() -> Self { + let (cache, _) = LoadingCache::new(Self::cached_remote_server); + Self { cache } + } + + async fn cached_remote_server( + connect: String, + ) -> Result, CacheFillError> { + let connection = Builder::default() + .build(&connect) + .await + .map_err(|e| Arc::new(e) as _) + .context(GrpcError)?; + let client = write::Client::new(connection); + Ok(Arc::new(RemoteServerImpl { client })) + } +} + +impl Default for ConnectionManagerImpl { + fn default() -> Self { + Self::new() + } +} #[async_trait] impl ConnectionManager for ConnectionManagerImpl { @@ -946,27 +981,17 @@ impl ConnectionManager for ConnectionManagerImpl { &self, connect: &str, ) -> Result, ConnectionManagerError> { - let ret = cached_remote_server(connect.to_string()).await?; - debug!(was_cached=%ret.was_cached, %connect, "getting remote connection"); - Ok(ret.value) + let ret = self + .cache + .get_with_meta(connect.to_string()) + .await + .map_err(|e| Box::new(e) as _) + .context(RemoteServerConnectError)?; + debug!(was_cached=%ret.cached, %connect, "getting remote connection"); + Ok(ret.result) } } -// cannot be an associated function -// argument need to have static lifetime because they become caching keys -#[cached(result = true, with_cached_flag = true)] -async fn cached_remote_server( - connect: String, -) -> Result>, ConnectionManagerError> { - let connection = Builder::default() - .build(&connect) - .await - .map_err(|e| Box::new(e) as _) - .context(RemoteServerConnectError)?; - let client = write::Client::new(connection); - Ok(Return::new(Arc::new(RemoteServerImpl { client }))) -} - /// An implementation for communicating with other IOx servers. This should /// be moved into and implemented in an influxdb_iox_client create at a later /// date. diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 137619fa29..73bf8d43d7 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -150,7 +150,7 @@ pub async fn main(config: Config) -> Result<()> { std::process::exit(1); } - let connection_manager = ConnectionManager {}; + let connection_manager = ConnectionManager::new(); let app_server = Arc::new(AppServer::new(connection_manager, server_config)); // if this ID isn't set the server won't be usable until this is set via an API diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index c11ab6919e..9c837e545c 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -884,7 +884,7 @@ mod tests { #[tokio::test] async fn test_health() { let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); + let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); let server_url = test_server(Arc::clone(&app_server)); let client = Client::new(); @@ -897,7 +897,7 @@ mod tests { #[tokio::test] async fn test_write() { let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); + let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -945,7 +945,7 @@ mod tests { #[tokio::test] async fn test_write_metrics() { let (metrics_registry, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); + let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -1035,7 +1035,7 @@ mod tests { /// endpoint async fn setup_test_data() -> (Client, String) { let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); + let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -1172,7 +1172,7 @@ mod tests { #[tokio::test] async fn test_gzip_write() { let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); + let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server @@ -1221,7 +1221,7 @@ mod tests { #[tokio::test] async fn write_to_invalid_database() { let (_, config) = config(); - let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); + let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); app_server.maybe_initialize_server().await; app_server