fix: Cache outbound gRPC connections

pull/24376/head
Marko Mikulicic 2021-05-19 18:24:24 +02:00
parent 9b42c1a065
commit ce2f8351be
No known key found for this signature in database
GPG Key ID: D02A41F91A687DB3
3 changed files with 116 additions and 9 deletions

99
Cargo.lock generated
View File

@ -213,6 +213,15 @@ dependencies = [
"wait-timeout",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
dependencies = [
"event-listener",
]
[[package]]
name = "async-stream"
version = "0.3.1"
@ -465,6 +474,40 @@ dependencies = [
"serde",
]
[[package]]
name = "cached"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e2afe73808fbaac302e39c9754bfc3c4b4d0f99c9c240b9f4e4efc841ad1b74"
dependencies = [
"async-mutex",
"async-trait",
"cached_proc_macro",
"cached_proc_macro_types",
"futures",
"hashbrown 0.9.1",
"once_cell",
]
[[package]]
name = "cached_proc_macro"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf857ae42d910aede5c5186e62684b0d7a597ce2fe3bd14448ab8f7ef439848c"
dependencies = [
"async-mutex",
"cached_proc_macro_types",
"darling",
"quote",
"syn",
]
[[package]]
name = "cached_proc_macro_types"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a4f925191b4367301851c6d99b09890311d74b0d43f274c0b34c86d308a3663"
[[package]]
name = "cast"
version = "0.2.5"
@ -538,7 +581,7 @@ dependencies = [
"ansi_term 0.11.0",
"atty",
"bitflags",
"strsim",
"strsim 0.8.0",
"textwrap",
"unicode-width",
"vec_map",
@ -793,6 +836,41 @@ dependencies = [
"sct",
]
[[package]]
name = "darling"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d706e75d87e35569db781a9b5e2416cff1236a47ed380831f959382ccd5f858"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c960ae2da4de88a91b2d920c2a7233b400bc33cb28453a2987822d8392519b"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim 0.9.3",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b5a2f4ac4969822c62224815d069952656cadc7084fdca9751e6d959189b72"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "dashmap"
version = "4.0.2"
@ -1010,6 +1088,12 @@ dependencies = [
"termcolor",
]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "extend"
version = "0.1.2"
@ -1491,6 +1575,12 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "0.2.3"
@ -3580,6 +3670,7 @@ dependencies = [
"arrow_util",
"async-trait",
"bytes",
"cached",
"chrono",
"crc32fast",
"criterion",
@ -3833,6 +3924,12 @@ version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6446ced80d6c486436db5c078dde11a9f73d42b57fb273121e160b84f63d894c"
[[package]]
name = "structopt"
version = "0.3.21"

View File

@ -11,6 +11,7 @@ arrow_flight = { path = "../arrow_flight" }
async-trait = "0.1"
bytes = { version = "1.0" }
chrono = "0.4"
cached = "0.23.0"
crc32fast = "1.2.0"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }

View File

@ -72,6 +72,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use cached::proc_macro::cached;
use db::load_preserved_catalog;
use futures::stream::TryStreamExt;
use observability_deps::tracing::{error, info, warn};
@ -936,17 +937,25 @@ impl ConnectionManager for ConnectionManagerImpl {
&self,
connect: &str,
) -> Result<Arc<Self::RemoteServer>, ConnectionManagerError> {
// TODO(mkm): cache the connections
let connection = Builder::default()
.build(connect)
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
cached_remote_server(connect.to_string()).await
}
}
// cannot be an associated function
// argument need to have static lifetime because they become caching keys
#[cached(result = true)]
async fn cached_remote_server(
connect: String,
) -> Result<Arc<RemoteServerImpl>, ConnectionManagerError> {
let connection = Builder::default()
.build(&connect)
.await
.map_err(|e| Box::new(e) as _)
.context(RemoteServerConnectError)?;
let client = write::Client::new(connection);
Ok(Arc::new(RemoteServerImpl { client }))
}
/// An implementation for communicating with other IOx servers. This should
/// be moved into and implemented in an influxdb_iox_client create at a later
/// date.