fix(router2): lazily connect to ingesters
Allow the routers to start up without requiring full availability of all downstream ingesters. Previously a single unavailable ingester prevented the routers from starting up. This has downsides: * Lazily initialising a connection will cause the first writes to have higher latency as the connection is established. * The routers MAY come up in a state that will never work (i.e. bad ingester addresses) * Using the opaque gRPC load balancing mechanism restricts the visibility into which nodes are up/down (hindering useful log messages) and prevents us from implementing more advanced circuit breaking / probing logic / load-balancing strategies. This change is a quick fix - it leaves the round-robin handler in place, load-balancing over a single tonic Channel, which internally load-balances. This will need cleaning up.pull/24376/head
parent
84485b57b0
commit
a5a26f5efb
|
@ -18,9 +18,8 @@ use object_store::DynObjectStore;
|
|||
use observability_deps::tracing::info;
|
||||
use router::{
|
||||
dml_handlers::{
|
||||
write_service_client, DmlHandler, DmlHandlerChainExt, FanOutAdaptor,
|
||||
InstrumentationDecorator, Partitioner, RetentionValidator, RpcWrite, SchemaValidator,
|
||||
ShardedWriteBuffer, WriteSummaryAdapter,
|
||||
DmlHandler, DmlHandlerChainExt, FanOutAdaptor, InstrumentationDecorator, Partitioner,
|
||||
RetentionValidator, RpcWrite, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter,
|
||||
},
|
||||
namespace_cache::{
|
||||
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
|
||||
|
@ -259,14 +258,13 @@ pub async fn create_router2_server_type(
|
|||
// Hack to handle multiple ingester addresses separated by commas in potentially many uses of
|
||||
// the CLI arg
|
||||
let ingester_addresses = router_config.ingester_addresses.join(",");
|
||||
let ingester_addresses_list: Vec<_> = ingester_addresses.split(',').collect();
|
||||
let mut ingester_clients = Vec::with_capacity(ingester_addresses_list.len());
|
||||
for ingester_addr in ingester_addresses_list {
|
||||
ingester_clients.push(write_service_client(ingester_addr).await);
|
||||
}
|
||||
|
||||
let grpc_connections = router::dml_handlers::build_ingester_connection(
|
||||
ingester_addresses.split(',').map(|s| format!("http://{s}")),
|
||||
);
|
||||
|
||||
// Initialise the DML handler that sends writes to the ingester using the RPC write path.
|
||||
let rpc_writer = RpcWrite::new(RoundRobin::new(ingester_clients));
|
||||
let rpc_writer = RpcWrite::new(RoundRobin::new([grpc_connections]));
|
||||
let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer);
|
||||
// 1. END
|
||||
|
||||
|
|
|
@ -12,19 +12,22 @@ use mutable_batch::MutableBatch;
|
|||
use mutable_batch_pb::encode::encode_write;
|
||||
use observability_deps::tracing::*;
|
||||
use sharder::RoundRobin;
|
||||
use std::{fmt::Debug, time::Duration};
|
||||
use std::{fmt::Debug, str::FromStr, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
/// Create a client to the ingester's write service.
|
||||
pub async fn write_service_client(
|
||||
ingester_addr: &str,
|
||||
) -> WriteServiceClient<client_util::connection::GrpcConnection> {
|
||||
let connection = client_util::connection::Builder::default()
|
||||
.build(format!("http://{}", ingester_addr))
|
||||
.await
|
||||
.unwrap_or_else(|e| panic!("failed to connect to server {ingester_addr}: {e}"));
|
||||
WriteServiceClient::new(connection.into_grpc_connection())
|
||||
/// Create a connection to one or more ingesters, load-balancing requests across
|
||||
/// all of them.
|
||||
///
|
||||
/// Connections are lazily established.
|
||||
pub fn build_ingester_connection<T>(addrs: impl Iterator<Item = T>) -> WriteServiceClient<Channel>
|
||||
where
|
||||
T: AsRef<str>,
|
||||
{
|
||||
WriteServiceClient::new(Channel::balance_list(
|
||||
addrs.map(|s| Endpoint::from_str(s.as_ref()).expect("invalid ingester address")),
|
||||
))
|
||||
}
|
||||
|
||||
/// The bound on RPC request duration.
|
||||
|
@ -127,6 +130,7 @@ where
|
|||
Ok(()) => break,
|
||||
Err(e) => warn!(error=%e, "failed ingester rpc write"),
|
||||
};
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
|
|
|
@ -12,7 +12,8 @@ pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
|
|||
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError>;
|
||||
}
|
||||
|
||||
/// An implementation of [`WriteClient`] for the tonic gRPC client.
|
||||
/// An implementation of [`WriteClient`] for the bespoke IOx wrapper over the
|
||||
/// tonic gRPC client.
|
||||
#[async_trait]
|
||||
impl WriteClient for WriteServiceClient<client_util::connection::GrpcConnection> {
|
||||
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
|
||||
|
@ -21,6 +22,15 @@ impl WriteClient for WriteServiceClient<client_util::connection::GrpcConnection>
|
|||
}
|
||||
}
|
||||
|
||||
/// An implementation of [`WriteClient`] for the tonic gRPC client.
|
||||
#[async_trait]
|
||||
impl WriteClient for WriteServiceClient<tonic::transport::Channel> {
|
||||
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
|
||||
WriteServiceClient::write(&mut self.clone(), op).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod mock {
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
|
Loading…
Reference in New Issue