feat: Implement grpc-router crate

This PR implements the main building block for implementing the gRPC StorageService router.
pull/24376/head
Marko Mikulicic 2021-06-23 01:34:23 +02:00
parent dd447d940c
commit 69b0bb1510
No known key found for this signature in database
GPG Key ID: D02A41F91A687DB3
10 changed files with 861 additions and 1 deletions

33
Cargo.lock generated
View File

@ -1345,6 +1345,39 @@ dependencies = [
"serde",
]
[[package]]
name = "grpc-router"
version = "0.1.0"
dependencies = [
"bytes",
"cache_loader_async",
"futures",
"grpc-router-test-gen",
"observability_deps",
"paste 1.0.5",
"prost",
"prost-build",
"prost-types",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tonic",
"tonic-build",
"tonic-reflection",
]
[[package]]
name = "grpc-router-test-gen"
version = "0.1.0"
dependencies = [
"prost",
"prost-build",
"prost-types",
"tonic",
"tonic-build",
]
[[package]]
name = "h2"
version = "0.3.3"

View File

@ -35,7 +35,9 @@ members = [
"server_benchmarks",
"test_helpers",
"tracker",
"trogging"
"trogging",
"grpc-router",
"grpc-router/grpc-router-test-gen",
]
[profile.release]

28
grpc-router/Cargo.toml Normal file
View File

@ -0,0 +1,28 @@
[package]
name = "grpc-router"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
[dependencies]
bytes = { version = "1.0" }
cache_loader_async = {version = "0.1.0", features = ["ttl-cache"] }
futures = "0.3"
observability_deps = { path = "../observability_deps" }
paste = "1.0.5"
prost = "0.7"
prost-types = "0.7"
thiserror = "1.0.23"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-util = { version = "0.6.3" }
tonic = "0.4"
tonic-reflection = "0.1.0"
[build-dependencies]
paste = "1.0.5"
prost-build = "0.7"
tonic-build = "0.4"
[dev-dependencies]
grpc-router-test-gen = { path = "./grpc-router-test-gen" }

View File

@ -0,0 +1,15 @@
[package]
name = "grpc-router-test-gen"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
description = "Protobuf used in test for the grpc-router crate; need to be in a separate create because of linter limitations"
[dependencies]
tonic = "0.4"
prost = "0.7"
prost-types = "0.7"
[build-dependencies]
tonic-build = "0.4"
prost-build = "0.7"

View File

@ -0,0 +1,26 @@
use std::env;
use std::path::{Path, PathBuf};
type Result<T> = std::io::Result<T>;
fn main() -> Result<()> {
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("protos");
generate_grpc_types(&root)
}
fn generate_grpc_types(root: &Path) -> Result<()> {
let proto_files = vec![root.join("test.proto")];
// Tell cargo to recompile if any of these proto files are changed
for proto_file in &proto_files {
println!("cargo:rerun-if-changed={}", proto_file.display());
}
let mut config = prost_build::Config::new();
config.disable_comments(&[".google"]);
tonic_build::configure()
.format(true)
.compile_with_config(config, &proto_files, &[root.into()])
}

View File

@ -0,0 +1,22 @@
syntax = "proto3";
package influxdata.iox.test;
import "google/protobuf/empty.proto";
service Test {
rpc TestUnary (TestRequest) returns (TestUnaryResponse);
rpc TestServerStream (TestRequest) returns (stream TestServerStreamResponse);
}
message TestRequest {
bool route_me = 1;
}
message TestUnaryResponse {
uint64 answer = 1;
}
message TestServerStreamResponse {
uint64 answer = 1;
}

View File

@ -0,0 +1,4 @@
/// Test Protobuf/gRPC stubs. Public and not `#[cfg(test)]` because used in a (tested) doc example.
pub mod test_proto {
tonic::include_proto!("influxdata.iox.test");
}

View File

@ -0,0 +1,199 @@
//! This crate provides an abstraction to construct (connected) tonic gRPC clients
//! out of connection addresses, generic on the type of the service.
//!
//! # Examples
//!
//! See [`CachingConnectionManager`].
//!
use cache_loader_async::backing::{CacheBacking, HashMapBacking, TtlCacheBacking};
use cache_loader_async::cache_api::{self, LoadingCache};
use futures::Future;
use observability_deps::tracing::debug;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
/// A connection manager knows how to obtain a T client given a connection string.
#[tonic::async_trait]
pub trait ConnectionManager<T, E = Error> {
async fn remote_server(&self, connect: String) -> Result<T, E>;
}
/// A Caching ConnectionManager implementation.
///
/// It caches connected gRPC clients of type T (not operations performed over the connection).
/// Each cache access returns a clone of the tonic gRPC client. Cloning clients is cheap
/// and allows them to communicate through the same channel, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
///
/// The `CachingConnectionManager` implements a blocking cache-loading mechanism, that is, it guarantees that once a
/// connection request for a given connection string is in flight, subsequent cache access requests
/// get enqueued and wait for the first connection to complete instead of spawning each an
/// outstanding connection request and thus suffer from the thundering herd problem.
///
/// It also supports an optional expiry mechanism based on TTL, see [`CachingConnectionManager::builder()`].
///
/// # Examples
///
/// ```rust
/// # use std::time::Duration;
/// # use grpc_router::connection_manager::{ConnectionManager, CachingConnectionManager};
/// #
/// # type Result<T, E = Box<dyn std::error::Error + Send + Sync>> = std::result::Result<T, E>;
/// #
/// # #[derive(Clone)]
/// # struct FooClient;
/// # impl FooClient {
/// # async fn connect(_: String) -> Result<FooClient, tonic::transport::Error> { Ok(Self{})}
/// # async fn foo(&self) -> Result<()> { Ok(()) }
/// # }
/// #
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// #
/// let connection_manager = CachingConnectionManager::builder()
/// .with_ttl(Duration::from_secs(60))
/// .with_make_client(|dst| Box::pin(FooClient::connect(dst)))
/// .build();
/// let client: FooClient = connection_manager
/// .remote_server("http://localhost:1234".to_string())
/// .await?;
/// client.foo().await?;
/// #
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub struct CachingConnectionManager<T>
where
T: Clone + Send + 'static,
{
cache: LoadingCache<String, T, Arc<Error>>,
}
pub type ClientFuture<T> = Pin<Box<dyn Future<Output = Result<T, tonic::transport::Error>> + Send>>;
impl<T> CachingConnectionManager<T>
where
T: Clone + Send + 'static,
{
/// Returns a [`CachingConnectionManagerBuilder`] for configuring and building a [`Self`].
pub fn builder() -> CachingConnectionManagerBuilder<T> {
CachingConnectionManagerBuilder::new()
}
}
#[tonic::async_trait]
impl<T> ConnectionManager<T> for CachingConnectionManager<T>
where
T: Clone + Send + 'static,
{
async fn remote_server(&self, connect: String) -> Result<T, Error> {
let cached = self.cache.get_with_meta(connect.to_string()).await?;
debug!(was_cached=%cached.cached, %connect, "getting remote connection");
Ok(cached.result)
}
}
/// Builder for [`CachingConnectionManager`].
///
/// Can be constructed with [`CachingConnectionManager::builder`].
///
/// A CachingConnectionManager can choose which backing store `B` to use.
/// We provide a helper method [`Self::with_ttl`] to use the [`TtlCacheBacking`] backing store
///
/// The `F` type parameter encodes the client constructor set with [`Self::with_make_client`].
/// By default the `F` parameter is `()` which causes the [`Self::build`] method to not be available.
#[derive(Debug)]
pub struct CachingConnectionManagerBuilder<T, B = HashMapBacking<String, CacheEntry<T>>, F = ()>
where
T: Clone + Send + 'static,
{
backing: B,
make_client: F,
_phantom: PhantomData<T>,
}
impl<T> Default for CachingConnectionManagerBuilder<T>
where
T: Clone + Send + 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<T> CachingConnectionManagerBuilder<T>
where
T: Clone + Send + 'static,
{
pub fn new() -> Self {
Self {
backing: HashMapBacking::new(),
make_client: (),
_phantom: Default::default(),
}
}
}
type CacheEntry<T> = cache_api::CacheEntry<T, Arc<Error>>;
// This needs to be a separate impl block because they place different bounds on the type parameters.
impl<T, B, F> CachingConnectionManagerBuilder<T, B, F>
where
T: Clone + Send + 'static,
B: CacheBacking<String, CacheEntry<T>> + Send + 'static,
{
/// Use a cache backing store that expires entries once they are older than `ttl`.
pub fn with_ttl(
self,
ttl: Duration,
) -> CachingConnectionManagerBuilder<T, TtlCacheBacking<String, CacheEntry<T>>, F> {
self.with_backing(TtlCacheBacking::new(ttl))
}
/// Use a custom cache backing store.
pub fn with_backing<B2>(self, backing: B2) -> CachingConnectionManagerBuilder<T, B2, F>
where
B2: CacheBacking<String, CacheEntry<T>>,
{
CachingConnectionManagerBuilder {
backing,
make_client: self.make_client,
_phantom: Default::default(),
}
}
pub fn with_make_client<F2>(self, make_client: F2) -> CachingConnectionManagerBuilder<T, B, F2>
where
F2: Fn(String) -> ClientFuture<T> + Copy + Send + Sync + 'static,
{
CachingConnectionManagerBuilder {
backing: self.backing,
make_client,
_phantom: Default::default(),
}
}
}
// This needs to be a separate impl block because they place different bounds on the type parameters.
impl<T, B, F> CachingConnectionManagerBuilder<T, B, F>
where
T: Clone + Send + 'static,
B: CacheBacking<String, CacheEntry<T>> + Send + 'static,
F: Fn(String) -> ClientFuture<T> + Copy + Send + Sync + 'static,
{
/// Builds a [`CachingConnectionManager`].
pub fn build(self) -> CachingConnectionManager<T> {
let make_client = self.make_client;
let (cache, _) = LoadingCache::with_backing(self.backing, move |connect| async move {
(make_client)(connect)
.await
.map_err(|e| Arc::new(Box::new(e) as _))
});
CachingConnectionManager { cache }
}
}

183
grpc-router/src/lib.rs Normal file
View File

@ -0,0 +1,183 @@
//! The grpc-router crate helps creating gRPC routers/forwarders for existing gRPC services.
//!
//! The router implements a gRPC service trait and forwards requests to a local struct implementing
//! that same service interface or to a remote service. The tonic gRPC client used to talk to the
//! remote service is provided by the user by implementing the [`Router`] trait for the router service type.
//! The [`Router`] trait allows the user to provide a different gRPC client per request, or to just
//! fall-back to serving the request from a local service implementation (without any further gRPC overhead).
//!
//! This crate also offers an optional caching [`connection_manager`], which can be useful for
//! implementing the [`Router`] trait.
//!
//! # Examples
//!
//! ## Simple introductory example:
//!
//! ```
//! # use std::pin::Pin;
//! # use futures::Stream;
//! use grpc_router_test_gen::test_proto::{test_server::Test, test_client::TestClient, *};
//! use grpc_router::{grpc_router, router, Router, RoutingDestination};
//! use tonic::{Request, Response, transport::Channel, Status};
//!
//! /// A router is a service ...
//! struct TestRouter {}
//!
//! /// ... like any other gRPC service it must implement the service's trait.
//! impl Test for TestRouter {
//! /// The [`grpc_router`] macro takes care of implementing the methods.
//! /// But it needs some help from you: you need to list the signatures of all routed methods.
//! /// Luckily you can (almost) copy&paste the protobuf IDL code in it:
//! grpc_router! {
//! rpc TestUnary (unary TestRequest) returns (unary TestUnaryResponse);
//! rpc TestServerStream (unary TestRequest) returns (stream TestServerStreamResponse);
//! }
//! }
//!
//! # fn is_local<R>(request: &Request<R>) -> bool { true }
//! #
//! #[tonic::async_trait]
//! impl Router<Request<TestRequest>, TestService, TestClient<Channel>> for TestRouter {
//! async fn route_for(
//! &self,
//! request: &Request<TestRequest>,
//! ) -> router::Result<RoutingDestination<TestService, TestClient<Channel>>> {
//! /// use the request argument to decide where to route...
//! Ok(if is_local(&request) {
//! RoutingDestination::Local(&TestService{})
//! } else {
//! /// in the real world, a [`connection_manager::ConnectionManager`] can be used.
//! RoutingDestination::Remote(
//! TestClient::connect("http:/1.2.3.4:1234".to_string()).await.unwrap(),
//! )
//! })
//! }
//! }
//!
//! /// Where `TestService` is a concrete implementation of the `Test` service:
//!
//! struct TestService {
//! // ...
//! }
//!
//! #[tonic::async_trait]
//! impl Test for TestService {
//! // ...
//! # async fn test_unary(&self, request: Request<TestRequest>) -> Result<Response<TestUnaryResponse>, Status> {
//! # todo!()
//! # }
//! #
//! # type TestServerStreamStream = Pin<Box<dyn Stream<Item = Result<TestServerStreamResponse, tonic::Status>> + Send + Sync>>;
//! #
//! # async fn test_server_stream(&self, request: Request<TestRequest>) -> Result<Response<Self::TestServerStreamStream>, Status> {
//! # todo!()
//! # }
//! }
//! ```
//!
//! ## Full example with a connection manager
//!
//! ```
//! # use std::pin::Pin;
//! # use futures::Stream;
//! use grpc_router_test_gen::test_proto::{test_server::{Test, TestServer}, test_client::TestClient, *};
//! use grpc_router::{grpc_router, router, Router, RoutingDestination};
//! use grpc_router::connection_manager::{ConnectionManager, CachingConnectionManager};
//! use tonic::{Request, Response, transport::Channel, Status};
//!
//! struct TestRouter<M>
//! where
//! M: ConnectionManager<TestClient<Channel>> + Send + Sync + 'static,
//! {
//! /// A router service can embed a connection manager.
//! connection_manager: M,
//! /// and the fall-back local instance of the service
//! local: TestService,
//! }
//!
//! impl<M> Test for TestRouter<M>
//! where
//! M: ConnectionManager<TestClient<Channel>> + Send + Sync + 'static,
//! {
//! grpc_router! {
//! rpc TestUnary (unary TestRequest) returns (unary TestUnaryResponse);
//! rpc TestServerStream (unary TestRequest) returns (stream TestServerStreamResponse);
//! }
//! }
//!
//! # fn address_for<R>(request: &Request<R>) -> Option<String> { None }
//! #
//! #[tonic::async_trait]
//! impl<M> Router<Request<TestRequest>, TestService, TestClient<Channel>> for TestRouter<M>
//! where
//! M: ConnectionManager<TestClient<Channel>> + Send + Sync + 'static,
//! {
//! async fn route_for(
//! &self,
//! request: &Request<TestRequest>,
//! ) -> router::Result<RoutingDestination<TestService, TestClient<Channel>>> {
//! /// Some custom logic to figure out the connection string for a remote server ...
//! Ok(match address_for(&request) {
//! None => RoutingDestination::Local(&self.local),
//! Some(remote_addr) => RoutingDestination::Remote(
//! /// ... which the connection manager consumes to yield a client.
//! self.connection_manager
//! .remote_server(remote_addr.clone())
//! .await?,
//! ),
//! })
//! }
//! }
//!
//! # use std::net::SocketAddr;
//! /// And now we need to wire everything up together
//! async fn run(addr: SocketAddr) {
//! # use tonic::transport::Server;
//! /// A caching connection manager that builds instances of TestClient
//! let connection_manager = CachingConnectionManager::builder()
//! .with_make_client(|dst| Box::pin(TestClient::connect(dst)))
//! .build();
//! /// The fallback local service (used if the routing destination is Local).
//! let local = TestService{};
//!
//! let router = TestRouter { connection_manager, local };
//!
//! /// the `router` implements `Test` and thus can be treated as any other
//! /// implementation of a gRPC service.
//! Server::builder()
//! .add_service(TestServer::new(router))
//! .serve(addr)
//! .await
//! .unwrap();
//! }
//!
//! # struct TestService {}
//! # #[tonic::async_trait]
//! # impl Test for TestService {
//! # async fn test_unary(&self, request: Request<TestRequest>) -> Result<Response<TestUnaryResponse>, Status> {
//! # todo!()
//! # }
//! #
//! # type TestServerStreamStream = Pin<Box<dyn Stream<Item = Result<TestServerStreamResponse, tonic::Status>> + Send + Sync>>;
//! #
//! # async fn test_server_stream(&self, request: Request<TestRequest>) -> Result<Response<Self::TestServerStreamStream>, Status> {
//! # todo!()
//! # }
//! # }
//! ```
#![deny(broken_intra_doc_links, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
pub mod connection_manager;
#[macro_use]
pub mod router;
pub use router::{Router, RoutingDestination};

348
grpc-router/src/router.rs Normal file
View File

@ -0,0 +1,348 @@
use futures::Stream;
use std::pin::Pin;
use thiserror::Error;
use tonic::{Response, Status, Streaming};
#[derive(Debug, Error)]
pub enum Error {
#[error("Cannot route request: {0}")]
RoutingError(#[from] Box<dyn std::error::Error + Sync + Send>),
}
impl From<Error> for Status {
fn from(error: Error) -> Self {
Self::internal(error.to_string())
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A gRPC router containing an invocation of the [`grpc_router`] macro must
/// implement this trait.
#[tonic::async_trait]
pub trait Router<R, S, C> {
/// For a given request return the routing decision for the call.
async fn route_for(&self, request: &R) -> Result<RoutingDestination<'_, S, C>>;
}
/// A [`RoutingDestination`] is either a local in-process grpc Service or a remote grpc Client for that service.
///
/// Unfortunately tonic clients and servers don't share any traits, so it's up to
/// you to ensure that C is a client for service S.
#[derive(Debug)]
pub enum RoutingDestination<'a, S, C> {
/// Reference to an implementation of a gRPC service trait. This causes the router to
/// transfer control to an in-process implementation of a service, effectively zero cost routing for
/// a local service.
Local(&'a S),
/// Routing to a remote service via a gRPC client instance connected to the remote endpoint.
Remote(C),
}
/// Needs to be public because it's used by the [`grpc_router`] macro.
pub type PinnedStream<T> = Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + Send + Sync>>;
/// Needs to be public because it's used by the [`grpc_router`] macro.
pub fn pinned_response<T: 'static>(res: Response<Streaming<T>>) -> Response<PinnedStream<T>> {
tonic::Response::new(Box::pin(res.into_inner()))
}
/// This is a macro that parses a quasi gRPC protobuf service declaration and emits the required
/// boilerplate to implement a service trait that forwards the calls to a remote gRPC service
/// and to a local service depending on what the [`Router`] trait for `self` says for a given request.
///
/// Currently it cannot parse the gRPC IDL syntax (TODO): you have to explicitly specify `unary` or `stream`.
///
/// # Examples
///
/// ```ignore
/// impl Foo for FooRouter
/// {
/// grpc_router! {
/// rpc Bar (unary BarRequest) returns (unary BarResponse);
/// rpc Baz (unary BazRequest) returns (stream BazResponse);
/// }
/// }
/// ```
#[macro_export]
macro_rules! grpc_router {
{ $( rpc $method:ident ( $request_kind:ident $request:ty ) returns ( $response_kind:ident $response:expr ) ; )* } => {
$( grpc_router! { @ $request_kind $response_kind $method, $request, $response } )*
};
{ @ unary unary $method:ident, $request:ty, $response:expr } => {
paste::paste! {
// NOTE: we cannot emit an `async fn` from this macro since #[tonic::async_trait] transforms
// the body of the trait/impl before macros get expanded.
// Thus, we have to do what async_trait would have done:
//
// ```
// #[tonic::async_trait]
// async fn [<$method:snake>](
// &self,
// req: Request<$request>,
// ) -> Result<tonic::Response<$response>, Status> {
// ...
// }
// ```
//
// -->
//
fn [<$method:snake>]<'a, 'async_trait>(
&'a self,
req: tonic::Request<$request>,
) -> std::pin::Pin<
Box<
dyn futures::Future<Output = Result<tonic::Response<$response>, tonic::Status>>
+ Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move {
match self.route_for(&req).await? {
$crate::RoutingDestination::Local(svc) => svc.[<$method:snake>](req).await,
$crate::RoutingDestination::Remote(mut svc) => svc.[<$method:snake>](req).await,
}
})
}
}
};
{ @ unary stream $method:ident, $request:ty, $response:expr } => {
paste::paste! {
type [<$method:camel Stream>] = $crate::router::PinnedStream<$response>;
fn [<$method:snake>]<'a, 'async_trait>(
&'a self,
req: tonic::Request<$request>,
) -> std::pin::Pin<
Box<
dyn futures::Future<Output = Result<tonic::Response<Self::[<$method:camel Stream>]>, tonic::Status>>
+ Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move {
use $crate::router::pinned_response;
match self.route_for(&req).await? {
$crate::RoutingDestination::Local(svc) => svc.[<$method:snake>](req).await,
$crate::RoutingDestination::Remote(mut svc) => Ok(pinned_response(svc.[<$method:snake>](req).await?)),
}
})
}
}
};
}
#[cfg(test)]
pub mod tests {
// intentionally doesn't use super::* in order to use the public interface only
use super::PinnedStream; // just a utility
use crate::connection_manager::{CachingConnectionManager, ConnectionManager};
use crate::{grpc_router, router, Router, RoutingDestination};
use futures::{FutureExt, StreamExt};
use grpc_router_test_gen::test_proto::{test_client::TestClient, test_server::TestServer, *};
use std::net::SocketAddr;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::{Channel, Server};
use tonic::{Request, Response, Status};
#[derive(Clone)]
struct TestService {
base: u64,
}
#[tonic::async_trait]
impl test_server::Test for TestService {
async fn test_unary(
&self,
_request: Request<TestRequest>,
) -> Result<Response<TestUnaryResponse>, Status> {
Ok(tonic::Response::new(TestUnaryResponse {
answer: self.base + 1,
}))
}
type TestServerStreamStream = PinnedStream<TestServerStreamResponse>;
async fn test_server_stream(
&self,
_request: Request<TestRequest>,
) -> Result<Response<Self::TestServerStreamStream>, Status> {
let it = (self.base + 1..=self.base + 2)
.map(|answer| Ok(TestServerStreamResponse { answer }));
Ok(tonic::Response::new(Box::pin(futures::stream::iter(it))))
}
}
#[derive(Debug)]
pub struct Fixture {
pub local_addr: String,
pub client: TestClient<Channel>,
shutdown_tx: tokio::sync::oneshot::Sender<()>,
}
impl Fixture {
/// Start up a grpc server listening on `port`, returning
/// a fixture with the server and client.
async fn new<T>(svc: T) -> Result<Self, Box<dyn std::error::Error>>
where
T: test_server::Test,
{
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let addr: SocketAddr = "127.0.0.1:0".parse()?;
let listener = tokio::net::TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
let local_addr = format!("http://{}", local_addr.to_string());
tokio::spawn(async move {
Server::builder()
.add_service(TestServer::new(svc))
.serve_with_incoming_shutdown(
TcpListenerStream::new(listener),
shutdown_rx.map(drop),
)
.await
.unwrap();
});
// Give the test server a few ms to become available
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Construct client and send request, extract response
let client = TestClient::connect(local_addr.clone())
.await
.expect("connect");
Ok(Self {
local_addr,
client,
shutdown_tx,
})
}
}
impl Drop for Fixture {
fn drop(&mut self) {
let (tmp_tx, _) = tokio::sync::oneshot::channel();
let shutdown_tx = std::mem::replace(&mut self.shutdown_tx, tmp_tx);
if let Err(e) = shutdown_tx.send(()) {
eprintln!("error shutting down text fixture: {:?}", e);
}
}
}
async fn test(mut client: TestClient<Channel>, route_me: bool, base: u64) {
let res = client
.test_unary(TestRequest { route_me })
.await
.expect("call");
assert_eq!(res.into_inner().answer, base + 1);
let res = client
.test_server_stream(TestRequest { route_me })
.await
.expect("call");
let res: Vec<_> = res
.into_inner()
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<_, _>>()
.expect("success");
assert_eq!(
res,
vec![
TestServerStreamResponse { answer: base + 1 },
TestServerStreamResponse { answer: base + 2 }
]
);
}
#[tokio::test]
async fn test_local() {
const BASE: u64 = 40;
let fixture = Fixture::new(TestService { base: BASE })
.await
.expect("fixture");
test(fixture.client.clone(), false, BASE).await;
}
#[tokio::test]
async fn test_routed() {
const REMOTE_BASE: u64 = 20;
const LOCAL_BASE: u64 = 40;
struct TestRouter<M>
where
M: ConnectionManager<TestClient<Channel>> + Send + Sync + 'static,
{
remote_addr: String,
local: TestService,
connection_manager: M,
}
impl<M> test_server::Test for TestRouter<M>
where
M: ConnectionManager<TestClient<Channel>> + Send + Sync + 'static,
{
grpc_router! {
rpc TestUnary (unary TestRequest) returns (unary TestUnaryResponse);
rpc TestServerStream (unary TestRequest) returns (stream TestServerStreamResponse);
}
}
#[tonic::async_trait]
impl<M> Router<Request<TestRequest>, TestService, test_client::TestClient<Channel>>
for TestRouter<M>
where
M: ConnectionManager<TestClient<Channel>> + Send + Sync + 'static,
{
async fn route_for(
&self,
request: &Request<TestRequest>,
) -> router::Result<RoutingDestination<'_, TestService, test_client::TestClient<Channel>>>
{
Ok(if request.get_ref().route_me {
RoutingDestination::Remote(
self.connection_manager
.remote_server(self.remote_addr.clone())
.await?,
)
} else {
RoutingDestination::Local(&self.local)
})
}
}
// a connection manager for TestClient
let connection_manager = CachingConnectionManager::builder()
.with_make_client(|dst| Box::pin(TestClient::connect(dst)))
.build();
// a remote TestService
let remote_fixture = Fixture::new(TestService { base: REMOTE_BASE })
.await
.expect("remote fixture");
// a router that can route to a remote TestService or serve from a local TestService
let router = TestRouter {
remote_addr: remote_fixture.local_addr.clone(),
local: TestService { base: LOCAL_BASE },
connection_manager,
};
let router_fixture = Fixture::new(router).await.expect("router fixture");
test(router_fixture.client.clone(), false, LOCAL_BASE).await;
test(router_fixture.client.clone(), true, REMOTE_BASE).await;
}
}