diff --git a/Cargo.lock b/Cargo.lock index 60300c21d8..c583ce406f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,6 +213,15 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.1" @@ -465,6 +474,40 @@ dependencies = [ "serde", ] +[[package]] +name = "cached" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e2afe73808fbaac302e39c9754bfc3c4b4d0f99c9c240b9f4e4efc841ad1b74" +dependencies = [ + "async-mutex", + "async-trait", + "cached_proc_macro", + "cached_proc_macro_types", + "futures", + "hashbrown 0.9.1", + "once_cell", +] + +[[package]] +name = "cached_proc_macro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf857ae42d910aede5c5186e62684b0d7a597ce2fe3bd14448ab8f7ef439848c" +dependencies = [ + "async-mutex", + "cached_proc_macro_types", + "darling", + "quote", + "syn", +] + +[[package]] +name = "cached_proc_macro_types" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663" + [[package]] name = "cast" version = "0.2.5" @@ -538,7 +581,7 @@ dependencies = [ "ansi_term 0.11.0", "atty", "bitflags", - "strsim", + "strsim 0.8.0", "textwrap", "unicode-width", "vec_map", @@ -793,6 +836,41 @@ dependencies = [ "sct", ] +[[package]] +name = "darling" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.9.3", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "dashmap" version = "4.0.2" @@ -1010,6 +1088,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + [[package]] name = "extend" version = "0.1.2" @@ -1491,6 +1575,12 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.2.3" @@ -3580,6 +3670,7 @@ dependencies = [ "arrow_util", "async-trait", "bytes", + "cached", "chrono", "crc32fast", "criterion", @@ -3833,6 +3924,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" +[[package]] +name = "strsim" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c" + [[package]] name = "structopt" version = "0.3.21" diff --git a/server/Cargo.toml b/server/Cargo.toml index da9e614a57..d1ebe30405 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,6 +11,7 @@ arrow_flight = { path = "../arrow_flight" } async-trait = "0.1" bytes = { version = "1.0" } chrono = "0.4" +cached = "0.23.0" crc32fast = "1.2.0" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 2432f5287e..40d67d9383 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -72,6 +72,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::BytesMut; +use cached::proc_macro::cached; use db::load_preserved_catalog; use futures::stream::TryStreamExt; use observability_deps::tracing::{error, info, warn}; @@ -936,17 +937,25 @@ impl ConnectionManager for ConnectionManagerImpl { &self, connect: &str, ) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> { - // TODO(mkm): cache the connections - let connection = Builder::default() - .build(connect) - .await - .map_err(|e| Box::new(e) as _) - .context(RemoteServerConnectError)?; - let client = write::Client::new(connection); - Ok(Arc::new(RemoteServerImpl { client })) + cached_remote_server(connect.to_string()).await } } +// cannot be an associated function +// argument need to have static lifetime because they become caching keys +#[cached(result = true)] +async fn cached_remote_server( + connect: String, +) -> Result<Arc<RemoteServerImpl>, ConnectionManagerError> { + let connection = Builder::default() + .build(&connect) + .await + .map_err(|e| Box::new(e) as _) + .context(RemoteServerConnectError)?; + let client = write::Client::new(connection); + Ok(Arc::new(RemoteServerImpl { client })) +} + /// An implementation for communicating with other IOx servers. This should /// be moved into and implemented in an influxdb_iox_client create at a later /// date.