feat: Allow iox clients to specify connector

This allows to implement custom transports for iox clients (such as socks5 proxies, see #2683).
pull/24376/head
Marko Mikulicic 2021-10-01 10:52:37 +02:00
parent 1134e4337b
commit 399d67e6b0
No known key found for this signature in database
GPG Key ID: D02A41F91A687DB3
1 changed files with 31 additions and 8 deletions

View File

@ -4,7 +4,8 @@ use http::{uri::InvalidUri, HeaderValue, Uri};
use std::convert::TryInto;
use std::time::Duration;
use thiserror::Error;
use tonic::transport::Endpoint;
use tonic::transport::{Channel, Endpoint};
use tower::make::MakeConnection;
/// The connection type used for clients
pub type Connection = SetRequestHeadersService<tonic::transport::Channel>;
@ -89,22 +90,44 @@ impl std::default::Default for Builder {
impl Builder {
/// Construct the [`Connection`] instance using the specified base URL.
pub async fn build<D>(self, dst: D) -> Result<Connection>
where
D: TryInto<Uri, Error = InvalidUri> + Send,
{
let endpoint = self.create_endpoint(dst)?;
let channel = endpoint.connect().await?;
Ok(self.compose_middleware(channel))
}
/// Construct the [`Connection`] instance using the specified base URL and custom connector.
pub async fn build_with_connector<D, C>(self, dst: D, connector: C) -> Result<Connection>
where
D: TryInto<Uri, Error = InvalidUri> + Send,
C: MakeConnection<Uri> + Send + 'static,
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
Box<dyn std::error::Error + Send + Sync>: From<C::Error> + Send + 'static,
{
let endpoint = self.create_endpoint(dst)?;
let channel = endpoint.connect_with_connector(connector).await?;
Ok(self.compose_middleware(channel))
}
fn create_endpoint<D>(&self, dst: D) -> Result<Endpoint>
where
D: TryInto<Uri, Error = InvalidUri> + Send,
{
let endpoint = Endpoint::from(dst.try_into()?)
.user_agent(self.user_agent)?
.user_agent(&self.user_agent)?
.connect_timeout(self.connect_timeout)
.timeout(self.timeout);
Ok(endpoint)
}
let channel = endpoint.connect().await?;
fn compose_middleware(self, channel: Channel) -> Connection {
// Compose channel with new tower middleware stack
let channel = tower::ServiceBuilder::new()
tower::ServiceBuilder::new()
.layer(SetRequestHeadersLayer::new(self.headers))
.service(channel);
Ok(channel)
.service(channel)
}
/// Set the `User-Agent` header sent by this client.