chore: Switch to smaller cache dep

pull/24376/head
Marko Mikulicic 2021-06-17 18:15:21 +02:00
parent c63ad0ea31
commit b612c3af4e
No known key found for this signature in database
GPG Key ID: D02A41F91A687DB3
5 changed files with 63 additions and 123 deletions

103
Cargo.lock generated
View File

@ -202,15 +202,6 @@ dependencies = [
"wait-timeout",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-stream"
version = "0.3.2"
@ -464,38 +455,15 @@ dependencies = [
]
[[package]]
name = "cached"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e2afe73808fbaac302e39c9754bfc3c4b4d0f99c9c240b9f4e4efc841ad1b74"
dependencies = [
"async-mutex",
"async-trait",
"cached_proc_macro",
"cached_proc_macro_types",
"futures",
"hashbrown 0.9.1",
"once_cell",
]
[[package]]
name = "cached_proc_macro"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf857ae42d910aede5c5186e62684b0d7a597ce2fe3bd14448ab8f7ef439848c"
dependencies = [
"async-mutex",
"cached_proc_macro_types",
"darling",
"quote",
"syn",
]
[[package]]
name = "cached_proc_macro_types"
name = "cache_loader_async"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663"
checksum = "81e31ddbb9783157bc1693936b6e10370114ad12edd9814df4b472e9e2aef224"
dependencies = [
"futures",
"thiserror",
"tokio",
]
[[package]]
name = "cast"
@ -570,7 +538,7 @@ dependencies = [
"ansi_term 0.11.0",
"atty",
"bitflags",
"strsim 0.8.0",
"strsim",
"textwrap",
"unicode-width",
"vec_map",
@ -835,41 +803,6 @@ dependencies = [
"sct",
]
[[package]]
name = "darling"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim 0.9.3",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "dashmap"
version = "4.0.2"
@ -1096,12 +1029,6 @@ dependencies = [
"str-buf",
]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "extend"
version = "0.1.2"
@ -1581,12 +1508,6 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "0.2.3"
@ -3714,7 +3635,7 @@ dependencies = [
"arrow_util",
"async-trait",
"bytes",
"cached",
"cache_loader_async",
"chrono",
"crc32fast",
"data_types",
@ -3988,12 +3909,6 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c"
[[package]]
name = "structopt"
version = "0.3.21"

View File

@ -10,7 +10,7 @@ arrow_util = { path = "../arrow_util" }
async-trait = "0.1"
bytes = { version = "1.0" }
chrono = "0.4"
cached = "0.23.0"
cache_loader_async = {version = "0.1.0", features = ["ttl-cache"] }
crc32fast = "1.2.0"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }

View File

@ -73,7 +73,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use cached::proc_macro::cached;
use db::load_or_create_preserved_catalog;
use init::InitStatus;
use observability_deps::tracing::{debug, info, warn};
@ -95,7 +94,7 @@ use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, Tr
pub use crate::config::RemoteTemplate;
use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString};
use cached::Return;
use cache_loader_async::cache_api::LoadingCache;
use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId};
pub use db::Db;
use generated_types::database_rules::encode_database_rules;
@ -936,7 +935,43 @@ pub trait RemoteServer {
/// The connection manager maps a host identifier to a remote server.
#[derive(Debug)]
pub struct ConnectionManagerImpl {}
pub struct ConnectionManagerImpl {
cache: LoadingCache<String, Arc<RemoteServerImpl>, CacheFillError>,
}
// Error must be Clone because LoadingCache requires so.
#[derive(Debug, Snafu, Clone)]
pub enum CacheFillError {
#[snafu(display("gRPC error: {}", source))]
GrpcError {
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
},
}
impl ConnectionManagerImpl {
pub fn new() -> Self {
let (cache, _) = LoadingCache::new(Self::cached_remote_server);
Self { cache }
}
async fn cached_remote_server(
connect: String,
) -> Result<Arc<RemoteServerImpl>, CacheFillError> {
let connection = Builder::default()
.build(&connect)
.await
.map_err(|e| Arc::new(e) as _)
.context(GrpcError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
}
}
impl Default for ConnectionManagerImpl {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ConnectionManager for ConnectionManagerImpl {
@ -946,27 +981,17 @@ impl ConnectionManager for ConnectionManagerImpl {
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
let ret = cached_remote_server(connect.to_string()).await?;
debug!(was_cached=%ret.was_cached, %connect, "getting remote connection");
Ok(ret.value)
let ret = self
.cache
.get_with_meta(connect.to_string())
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
debug!(was_cached=%ret.cached, %connect, "getting remote connection");
Ok(ret.result)
}
}
// cannot be an associated function
// argument need to have static lifetime because they become caching keys
#[cached(result = true, with_cached_flag = true)]
async fn cached_remote_server(
connect: String,
) -> Result<Return<Arc<RemoteServerImpl>>, ConnectionManagerError> {
let connection = Builder::default()
.build(&connect)
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
let client = write::Client::new(connection);
Ok(Return::new(Arc::new(RemoteServerImpl { client })))
}
/// An implementation for communicating with other IOx servers. This should
/// be moved into and implemented in an influxdb_iox_client create at a later
/// date.

View File

@ -150,7 +150,7 @@ pub async fn main(config: Config) -> Result<()> {
std::process::exit(1);
}
let connection_manager = ConnectionManager {};
let connection_manager = ConnectionManager::new();
let app_server = Arc::new(AppServer::new(connection_manager, server_config));
// if this ID isn't set the server won't be usable until this is set via an API

View File

@ -884,7 +884,7 @@ mod tests {
#[tokio::test]
async fn test_health() {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config));
let server_url = test_server(Arc::clone(&app_server));
let client = Client::new();
@ -897,7 +897,7 @@ mod tests {
#[tokio::test]
async fn test_write() {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await;
app_server
@ -945,7 +945,7 @@ mod tests {
#[tokio::test]
async fn test_write_metrics() {
let (metrics_registry, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await;
app_server
@ -1035,7 +1035,7 @@ mod tests {
/// endpoint
async fn setup_test_data() -> (Client, String) {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await;
app_server
@ -1172,7 +1172,7 @@ mod tests {
#[tokio::test]
async fn test_gzip_write() {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await;
app_server
@ -1221,7 +1221,7 @@ mod tests {
#[tokio::test]
async fn write_to_invalid_database() {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl::new(), config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_initialize_server().await;
app_server