Merge pull request #2686 from influxdata/client-custom-connector

feat: Allow iox clients to specify connector
pull/24376/head
kodiakhq[bot] 2021-10-01 09:03:07 +00:00 committed by GitHub
commit 97052c9915
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 8 deletions

View File

@ -4,7 +4,8 @@ use http::{uri::InvalidUri, HeaderValue, Uri};
use std::convert::TryInto;
use std::time::Duration;
use thiserror::Error;
use tonic::transport::Endpoint;
use tonic::transport::{Channel, Endpoint};
use tower::make::MakeConnection;
/// The connection type used for clients
pub type Connection = SetRequestHeadersService<tonic::transport::Channel>;
@ -89,22 +90,44 @@ impl std::default::Default for Builder {
impl Builder {
/// Construct the [`Connection`] instance using the specified base URL.
pub async fn build<D>(self, dst: D) -> Result<Connection>
where
D: TryInto<Uri, Error = InvalidUri> + Send,
{
let endpoint = self.create_endpoint(dst)?;
let channel = endpoint.connect().await?;
Ok(self.compose_middleware(channel))
}
/// Construct the [`Connection`] instance using the specified base URL and custom connector.
pub async fn build_with_connector<D, C>(self, dst: D, connector: C) -> Result<Connection>
where
D: TryInto<Uri, Error = InvalidUri> + Send,
C: MakeConnection<Uri> + Send + 'static,
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
Box<dyn std::error::Error + Send + Sync>: From<C::Error> + Send + 'static,
{
let endpoint = self.create_endpoint(dst)?;
let channel = endpoint.connect_with_connector(connector).await?;
Ok(self.compose_middleware(channel))
}
fn create_endpoint<D>(&self, dst: D) -> Result<Endpoint>
where
D: TryInto<Uri, Error = InvalidUri> + Send,
{
let endpoint = Endpoint::from(dst.try_into()?)
.user_agent(self.user_agent)?
.user_agent(&self.user_agent)?
.connect_timeout(self.connect_timeout)
.timeout(self.timeout);
Ok(endpoint)
}
let channel = endpoint.connect().await?;
fn compose_middleware(self, channel: Channel) -> Connection {
// Compose channel with new tower middleware stack
let channel = tower::ServiceBuilder::new()
tower::ServiceBuilder::new()
.layer(SetRequestHeadersLayer::new(self.headers))
.service(channel);
Ok(channel)
.service(channel)
}
/// Set the `User-Agent` header sent by this client.