Merge branch 'main' into dom/persist
commit
346337ad9f
|
@ -255,14 +255,13 @@ pub async fn create_router_grpc_write_server_type(
|
|||
) -> Result<Arc<dyn ServerType>> {
|
||||
// 1. START: Different Setup Per Router Path: this part is only relevant to using RPC write
|
||||
// path and should not be added to `create_router_server_type`.
|
||||
let mut ingester_clients = Vec::with_capacity(router_config.ingester_addresses.len());
|
||||
for ingester_addr in &router_config.ingester_addresses {
|
||||
ingester_clients.push(write_service_client(ingester_addr).await);
|
||||
}
|
||||
|
||||
// Initialise the DML handler that sends writes to the ingester using the RPC write path.
|
||||
let rpc_writer = RpcWrite::new(RoundRobin::new(
|
||||
router_config
|
||||
.ingester_addresses
|
||||
.iter()
|
||||
.map(|ingester_addr| write_service_client(ingester_addr)),
|
||||
));
|
||||
let rpc_writer = RpcWrite::new(RoundRobin::new(ingester_clients));
|
||||
let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer);
|
||||
// 1. END
|
||||
|
||||
|
|
|
@ -14,12 +14,17 @@ use observability_deps::tracing::*;
|
|||
use sharder::RoundRobin;
|
||||
use std::{fmt::Debug, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tonic::transport::Channel;
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
/// Create a client to the ingester's write service.
|
||||
pub fn write_service_client(ingester_addr: &str) -> WriteServiceClient<Channel> {
|
||||
WriteServiceClient::new(Channel::builder(ingester_addr.parse().unwrap()).connect_lazy())
|
||||
pub async fn write_service_client(
|
||||
ingester_addr: &str,
|
||||
) -> WriteServiceClient<client_util::connection::GrpcConnection> {
|
||||
let connection = client_util::connection::Builder::default()
|
||||
.build(format!("http://{}", ingester_addr))
|
||||
.await
|
||||
.unwrap_or_else(|e| panic!("failed to connect to server {ingester_addr}: {e}"));
|
||||
WriteServiceClient::new(connection.into_grpc_connection())
|
||||
}
|
||||
|
||||
/// The bound on RPC request duration.
|
||||
|
@ -44,7 +49,7 @@ pub enum RpcWriteError {
|
|||
}
|
||||
|
||||
/// A convenience alias for the generated gRPC client.
|
||||
type GrpcClient = WriteServiceClient<Channel>;
|
||||
type GrpcClient = WriteServiceClient<client_util::connection::GrpcConnection>;
|
||||
|
||||
/// An [`RpcWrite`] handler submits a write directly to an Ingester via the
|
||||
/// [gRPC write service].
|
||||
|
|
|
@ -2,7 +2,6 @@ use async_trait::async_trait;
|
|||
use generated_types::influxdata::iox::ingester::v1::{
|
||||
write_service_client::WriteServiceClient, WriteRequest,
|
||||
};
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use super::RpcWriteError;
|
||||
|
||||
|
@ -15,7 +14,7 @@ pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
|
|||
|
||||
/// An implementation of [`WriteClient`] for the tonic gRPC client.
|
||||
#[async_trait]
|
||||
impl WriteClient for WriteServiceClient<Channel> {
|
||||
impl WriteClient for WriteServiceClient<client_util::connection::GrpcConnection> {
|
||||
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
|
||||
WriteServiceClient::write(&mut self.clone(), op).await?;
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in New Issue