diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 2fbe296848..b5bbadbd9e 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -18,9 +18,8 @@ use object_store::DynObjectStore; use observability_deps::tracing::info; use router::{ dml_handlers::{ - write_service_client, DmlHandler, DmlHandlerChainExt, FanOutAdaptor, - InstrumentationDecorator, Partitioner, RetentionValidator, RpcWrite, SchemaValidator, - ShardedWriteBuffer, WriteSummaryAdapter, + DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioner, + RetentionValidator, RpcWrite, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter, }, namespace_cache::{ metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache, @@ -259,14 +258,13 @@ pub async fn create_router2_server_type( // Hack to handle multiple ingester addresses separated by commas in potentially many uses of // the CLI arg let ingester_addresses = router_config.ingester_addresses.join(","); - let ingester_addresses_list: Vec<_> = ingester_addresses.split(',').collect(); - let mut ingester_clients = Vec::with_capacity(ingester_addresses_list.len()); - for ingester_addr in ingester_addresses_list { - ingester_clients.push(write_service_client(ingester_addr).await); - } + + let grpc_connections = router::dml_handlers::build_ingester_connection( + ingester_addresses.split(',').map(|s| format!("http://{s}")), + ); // Initialise the DML handler that sends writes to the ingester using the RPC write path. - let rpc_writer = RpcWrite::new(RoundRobin::new(ingester_clients)); + let rpc_writer = RpcWrite::new(RoundRobin::new([grpc_connections])); let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer); // 1. END diff --git a/router/src/dml_handlers/rpc_write.rs b/router/src/dml_handlers/rpc_write.rs index 9deb55c5e0..2e5f4d0088 100644 --- a/router/src/dml_handlers/rpc_write.rs +++ b/router/src/dml_handlers/rpc_write.rs @@ -12,19 +12,22 @@ use mutable_batch::MutableBatch; use mutable_batch_pb::encode::encode_write; use observability_deps::tracing::*; use sharder::RoundRobin; -use std::{fmt::Debug, time::Duration}; +use std::{fmt::Debug, str::FromStr, time::Duration}; use thiserror::Error; +use tonic::transport::{Channel, Endpoint}; use trace::ctx::SpanContext; -/// Create a client to the ingester's write service. -pub async fn write_service_client( - ingester_addr: &str, -) -> WriteServiceClient { - let connection = client_util::connection::Builder::default() - .build(format!("http://{}", ingester_addr)) - .await - .unwrap_or_else(|e| panic!("failed to connect to server {ingester_addr}: {e}")); - WriteServiceClient::new(connection.into_grpc_connection()) +/// Create a connection to one or more ingesters, load-balancing requests across +/// all of them. +/// +/// Connections are lazily established. +pub fn build_ingester_connection(addrs: impl Iterator) -> WriteServiceClient +where + T: AsRef, +{ + WriteServiceClient::new(Channel::balance_list( + addrs.map(|s| Endpoint::from_str(s.as_ref()).expect("invalid ingester address")), + )) } /// The bound on RPC request duration. @@ -127,6 +130,7 @@ where Ok(()) => break, Err(e) => warn!(error=%e, "failed ingester rpc write"), }; + tokio::time::sleep(Duration::from_millis(50)).await; } }) .await?; diff --git a/router/src/dml_handlers/rpc_write/client.rs b/router/src/dml_handlers/rpc_write/client.rs index b3187551bb..da04469220 100644 --- a/router/src/dml_handlers/rpc_write/client.rs +++ b/router/src/dml_handlers/rpc_write/client.rs @@ -12,7 +12,8 @@ pub(super) trait WriteClient: Send + Sync + std::fmt::Debug { async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError>; } -/// An implementation of [`WriteClient`] for the tonic gRPC client. +/// An implementation of [`WriteClient`] for the bespoke IOx wrapper over the +/// tonic gRPC client. #[async_trait] impl WriteClient for WriteServiceClient { async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> { @@ -21,6 +22,15 @@ impl WriteClient for WriteServiceClient } } +/// An implementation of [`WriteClient`] for the tonic gRPC client. +#[async_trait] +impl WriteClient for WriteServiceClient { + async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> { + WriteServiceClient::write(&mut self.clone(), op).await?; + Ok(()) + } +} + #[cfg(test)] pub(crate) mod mock { use std::{collections::VecDeque, sync::Arc};