From 399d67e6b053b80b58dceb0294ba40ea51510c00 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Fri, 1 Oct 2021 10:52:37 +0200 Subject: [PATCH] feat: Allow iox clients to specify connector This allows to implement custom transports for iox clients (such as socks5 proxies, see #2683). --- client_util/src/connection.rs | 39 ++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/client_util/src/connection.rs b/client_util/src/connection.rs index acd8bf18ae..1729ee5e7f 100644 --- a/client_util/src/connection.rs +++ b/client_util/src/connection.rs @@ -4,7 +4,8 @@ use http::{uri::InvalidUri, HeaderValue, Uri}; use std::convert::TryInto; use std::time::Duration; use thiserror::Error; -use tonic::transport::Endpoint; +use tonic::transport::{Channel, Endpoint}; +use tower::make::MakeConnection; /// The connection type used for clients pub type Connection = SetRequestHeadersService; @@ -89,22 +90,44 @@ impl std::default::Default for Builder { impl Builder { /// Construct the [`Connection`] instance using the specified base URL. pub async fn build(self, dst: D) -> Result + where + D: TryInto + Send, + { + let endpoint = self.create_endpoint(dst)?; + let channel = endpoint.connect().await?; + Ok(self.compose_middleware(channel)) + } + + /// Construct the [`Connection`] instance using the specified base URL and custom connector. + pub async fn build_with_connector(self, dst: D, connector: C) -> Result + where + D: TryInto + Send, + C: MakeConnection + Send + 'static, + C::Connection: Unpin + Send + 'static, + C::Future: Send + 'static, + Box: From + Send + 'static, + { + let endpoint = self.create_endpoint(dst)?; + let channel = endpoint.connect_with_connector(connector).await?; + Ok(self.compose_middleware(channel)) + } + + fn create_endpoint(&self, dst: D) -> Result where D: TryInto + Send, { let endpoint = Endpoint::from(dst.try_into()?) - .user_agent(self.user_agent)? + .user_agent(&self.user_agent)? .connect_timeout(self.connect_timeout) .timeout(self.timeout); + Ok(endpoint) + } - let channel = endpoint.connect().await?; - + fn compose_middleware(self, channel: Channel) -> Connection { // Compose channel with new tower middleware stack - let channel = tower::ServiceBuilder::new() + tower::ServiceBuilder::new() .layer(SetRequestHeadersLayer::new(self.headers)) - .service(channel); - - Ok(channel) + .service(channel) } /// Set the `User-Agent` header sent by this client.