diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index e3adce8fea..23eb27b113 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -7,10 +7,6 @@ use iox_catalog::interface::Catalog; use ioxd_common::{ add_service, http::error::{HttpApiError, HttpApiErrorSource}, - reexport::{ - generated_types::influxdata::iox::ingester::v1::write_service_client::WriteServiceClient, - tonic::transport::Channel, - }, rpc::RpcBuilderInput, serve_builder, server_type::{CommonServerState, RpcError, ServerType}, @@ -38,7 +34,7 @@ use router::{ }, shard::Shard, }; -use sharder::{JumpHash, Sharder}; +use sharder::{JumpHash, RoundRobin, Sharder}; use std::{ collections::BTreeSet, fmt::{Debug, Display}, @@ -263,17 +259,14 @@ pub async fn create_router2_server_type( // Hack to handle multiple ingester addresses separated by commas in potentially many uses of // the CLI arg - let ingester_connections = router_config.ingester_addresses.join(","); - let ingester_connections = ingester_connections.split(',').map(|s| { - WriteServiceClient::new( - Channel::from_shared(format!("http://{s}")) - .expect("invalid ingester connection address") - .connect_lazy(), - ) - }); + let ingester_addresses = router_config.ingester_addresses.join(","); + + let grpc_connections = router::dml_handlers::build_ingester_connection( + ingester_addresses.split(',').map(|s| format!("http://{s}")), + ); // Initialise the DML handler that sends writes to the ingester using the RPC write path. - let rpc_writer = RpcWrite::new(ingester_connections); + let rpc_writer = RpcWrite::new(RoundRobin::new([grpc_connections])); let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer); // 1. END diff --git a/router/src/dml_handlers/rpc_write.rs b/router/src/dml_handlers/rpc_write.rs index 477055adf5..825b7c373b 100644 --- a/router/src/dml_handlers/rpc_write.rs +++ b/router/src/dml_handlers/rpc_write.rs @@ -1,25 +1,73 @@ -mod balancer; mod circuit_breaker; -mod circuit_breaking_client; mod client; -use crate::dml_handlers::rpc_write::client::WriteClient; - -use self::{balancer::Balancer, circuit_breaking_client::CircuitBreakingClient}; - use super::{DmlHandler, Partitioned}; use async_trait::async_trait; use data_types::{DeletePredicate, NamespaceId, NamespaceName, TableId}; use dml::{DmlMeta, DmlWrite}; -use generated_types::influxdata::iox::ingester::v1::WriteRequest; +use generated_types::influxdata::iox::ingester::v1::{ + write_service_client::WriteServiceClient, WriteRequest, +}; use hashbrown::HashMap; use mutable_batch::MutableBatch; use mutable_batch_pb::encode::encode_write; use observability_deps::tracing::*; -use std::{fmt::Debug, time::Duration}; +use sharder::RoundRobin; +use std::{fmt::Debug, str::FromStr, time::Duration}; use thiserror::Error; +use tonic::transport::{Channel, Endpoint}; use trace::ctx::SpanContext; +/// Create a connection to one or more ingesters, load-balancing requests across +/// all of them. +/// +/// Connections are lazily established. +pub fn build_ingester_connection(addrs: impl Iterator) -> WriteServiceClient +where + T: AsRef, +{ + let endpoints = addrs + .map(|s| Endpoint::from_str(s.as_ref()).expect("invalid ingester address")) + .collect::>(); + + let (channel, tx) = Channel::balance_channel(endpoints.len()); + + // BUG: tower balance removes failed nodes from the pool, except the last + // node in the pool, which leads to a router talking to one ingester. + // + // As an absolute hack, keep inserting the nodes into the pool to drive + // discovery after they have failed. + // + // https://github.com/influxdata/influxdb_iox/issues/6508 + // + tokio::spawn(async move { + loop { + for e in &endpoints { + // The gRPC balance listener will stop first during shutdown. + if tx + .send(tower::discover::Change::Insert( + e.uri().to_owned(), + e.clone(), + )) + .await + .is_err() + { + // The gRPC balancer task has stopped - likely because + // the server has stopped. + // + // Do not leak this task, or panic trying to send on the + // closed channel. + return; + } + } + + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); + + WriteServiceClient::new(channel) +} + /// The bound on RPC request duration. /// /// This includes the time taken to send the request, and wait for the response. @@ -36,15 +84,14 @@ pub enum RpcWriteError { #[error("timeout writing to upstream ingester")] Timeout(#[from] tokio::time::error::Elapsed), - /// There are no healthy ingesters to route a write to. - #[error("no healthy upstream ingesters available")] - NoUpstream, - /// A delete request was rejected (not supported). #[error("deletes are not supported")] DeletesUnsupported, } +/// A convenience alias for the generated gRPC client. +type GrpcClient = WriteServiceClient; + /// An [`RpcWrite`] handler submits a write directly to an Ingester via the /// [gRPC write service]. /// @@ -57,22 +104,17 @@ pub enum RpcWriteError { /// This handler drops delete requests, logging the attempt and returning an /// error to the client. /// -/// [gRPC write service]: client::WriteClient +/// [gRPC write service]: WriteServiceClient #[derive(Debug)] -pub struct RpcWrite { - endpoints: Balancer, +pub struct RpcWrite { + endpoints: RoundRobin, } impl RpcWrite { /// Initialise a new [`RpcWrite`] that sends requests to an arbitrary /// downstream Ingester, using a round-robin strategy. - pub fn new(endpoints: impl IntoIterator) -> Self - where - C: Send + Sync + Debug, - { - Self { - endpoints: Balancer::new(endpoints.into_iter().map(CircuitBreakingClient::new)), - } + pub fn new(endpoints: RoundRobin) -> Self { + Self { endpoints } } } @@ -117,7 +159,19 @@ where }; // Perform the gRPC write to an ingester. - tokio::time::timeout(RPC_TIMEOUT, write_loop(self.endpoints.endpoints(), req)).await??; + // + // This includes a dirt simple retry mechanism that WILL need improving + // (#6173). + tokio::time::timeout(RPC_TIMEOUT, async { + loop { + match self.endpoints.next().write(req.clone()).await { + Ok(()) => break, + Err(e) => warn!(error=%e, "failed ingester rpc write"), + }; + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await?; debug!( %partition_key, @@ -150,29 +204,6 @@ where } } -async fn write_loop( - mut endpoints: impl Iterator + Send, - req: WriteRequest, -) -> Result<(), RpcWriteError> -where - T: WriteClient, -{ - let mut delay = Duration::from_millis(50); - loop { - match endpoints - .next() - .ok_or(RpcWriteError::NoUpstream)? - .write(req.clone()) - .await - { - Ok(()) => return Ok(()), - Err(e) => warn!(error=%e, "failed ingester rpc write"), - }; - tokio::time::sleep(delay).await; - delay = delay.saturating_mul(2); - } -} - #[cfg(test)] mod tests { use std::{collections::HashSet, sync::Arc}; @@ -214,7 +245,7 @@ mod tests { // Init the write handler with a mock client to capture the rpc calls. let client = Arc::new(MockWriteClient::default()); - let handler = RpcWrite::new([Arc::clone(&client)]); + let handler = RpcWrite::new(RoundRobin::new([Arc::clone(&client)])); // Drive the RPC writer let got = handler @@ -266,7 +297,10 @@ mod tests { .with_ret([Err(RpcWriteError::Upstream(tonic::Status::internal("")))]), ); let client2 = Arc::new(MockWriteClient::default()); - let handler = RpcWrite::new([Arc::clone(&client1), Arc::clone(&client2)]); + let handler = RpcWrite::new(RoundRobin::new([ + Arc::clone(&client1), + Arc::clone(&client2), + ])); // Drive the RPC writer let got = handler diff --git a/router/src/dml_handlers/rpc_write/balancer.rs b/router/src/dml_handlers/rpc_write/balancer.rs deleted file mode 100644 index 1ad1fc4d57..0000000000 --- a/router/src/dml_handlers/rpc_write/balancer.rs +++ /dev/null @@ -1,268 +0,0 @@ -use std::{cell::RefCell, cmp::max, fmt::Debug}; - -use super::{ - circuit_breaker::CircuitBreaker, - circuit_breaking_client::{CircuitBreakerState, CircuitBreakingClient}, -}; - -thread_local! { - /// A per-thread counter incremented once per call to - /// [`Balancer::endpoints()`]. - static COUNTER: RefCell = RefCell::new(0); -} - -/// A set of health-checked gRPC endpoints, with an approximate round-robin -/// distribution of load over healthy nodes. -/// -/// # Health Checking -/// -/// The health evaluation of a node is delegated to the -/// [`CircuitBreakingClient`]. -/// -/// # Request Distribution -/// -/// Requests are distributed uniformly across all shards **per thread**. Given -/// enough requests (where `N` is significantly larger than the number of -/// threads) an approximately uniform distribution is achieved. -#[derive(Debug)] -pub(super) struct Balancer { - endpoints: Vec>, -} - -impl Balancer -where - T: Send + Sync + Debug, - C: CircuitBreakerState, -{ - /// Construct a new [`Balancer`] distributing work over the healthy - /// `endpoints`. - pub(super) fn new(endpoints: impl IntoIterator>) -> Self { - Self { - endpoints: endpoints.into_iter().collect(), - } - } - - /// Return an (infinite) iterator of healthy [`CircuitBreakingClient`]. - /// - /// A snapshot of healthy nodes is taken at call time and the health state - /// is evaluated at this point and the result is returned to the caller as - /// an infinite / cycling iterator. A node that becomes unavailable after - /// the snapshot was taken will continue to be returned by the iterator. - pub(super) fn endpoints(&self) -> impl Iterator> { - // Grab and increment the current counter. - let counter = COUNTER.with(|cell| { - let mut cell = cell.borrow_mut(); - let new_value = cell.wrapping_add(1); - *cell = new_value; - new_value - }); - - // Take a snapshot containing only healthy nodes. - // - // This ensures unhealthy nodes are not continuously (and unnecessarily) - // polled/probed in the iter cycle below. The low frequency and impact - // of a node becoming unavailable during a single request easily - // outweighs the trade-off of the constant health evaluation overhead. - let snapshot = self - .endpoints - .iter() - .filter(|e| e.is_usable()) - .collect::>(); - - // Reduce it to the range of [0, N) where N is the number of healthy - // clients in this balancer, ensuring not to calculate the remainder of - // a division by 0. - let idx = counter % max(snapshot.len(), 1); - - snapshot.into_iter().cycle().skip(idx) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use assert_matches::assert_matches; - use generated_types::influxdata::iox::ingester::v1::WriteRequest; - - use crate::dml_handlers::rpc_write::{ - circuit_breaking_client::mock::MockCircuitBreaker, - client::{mock::MockWriteClient, WriteClient}, - }; - - use super::*; - - /// No healthy nodes yields an empty iterator. - #[tokio::test] - async fn test_balancer_empty_iter() { - const BALANCER_CALLS: usize = 10; - - // Initialise 3 RPC clients and configure their mock circuit breakers; - // two returns a unhealthy state, one is healthy. - let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); - circuit_err_1.set_usable(false); - let client_err_1 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_err_1)); - - let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); - circuit_err_2.set_usable(false); - let client_err_2 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_err_2)); - - assert_eq!(circuit_err_1.ok_count(), 0); - assert_eq!(circuit_err_2.ok_count(), 0); - - let balancer = Balancer::new([client_err_1, client_err_2]); - let mut endpoints = balancer.endpoints(); - - assert_matches!(endpoints.next(), None); - } - - /// A test that ensures only healthy clients are returned by the balancer, - /// and that they are polled exactly once per request. - #[tokio::test] - async fn test_balancer_yield_healthy_polled_once() { - const BALANCER_CALLS: usize = 10; - - // Initialise 3 RPC clients and configure their mock circuit breakers; - // two returns a unhealthy state, one is healthy. - let circuit_err_1 = Arc::new(MockCircuitBreaker::default()); - circuit_err_1.set_usable(false); - let client_err_1 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_err_1)); - - let circuit_err_2 = Arc::new(MockCircuitBreaker::default()); - circuit_err_2.set_usable(false); - let client_err_2 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_err_2)); - - let circuit_ok = Arc::new(MockCircuitBreaker::default()); - circuit_ok.set_usable(true); - let client_ok = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_ok)); - - assert_eq!(circuit_ok.ok_count(), 0); - assert_eq!(circuit_err_1.ok_count(), 0); - assert_eq!(circuit_err_2.ok_count(), 0); - - let balancer = Balancer::new([client_err_1, client_ok, client_err_2]); - let mut endpoints = balancer.endpoints(); - - // Only the health client should be yielded, and it should cycle - // indefinitely. - for i in 1..=BALANCER_CALLS { - endpoints - .next() - .expect("should yield healthy client") - .write(WriteRequest::default()) - .await - .expect("should succeed"); - - assert_eq!(circuit_ok.ok_count(), i); - assert_eq!(circuit_ok.err_count(), 0); - } - - // There health of the endpoints should not be constantly re-evaluated - // by a single request (reducing overhead / hot spinning - in the - // probing phase this would serialise clients). - assert_eq!(circuit_ok.is_usable_count(), 1); - assert_eq!(circuit_err_1.is_usable_count(), 1); - assert_eq!(circuit_err_1.is_usable_count(), 1); - - // The other clients should not have been invoked. - assert_eq!(circuit_err_1.ok_count(), 0); - assert_eq!(circuit_err_1.err_count(), 0); - - assert_eq!(circuit_err_2.ok_count(), 0); - assert_eq!(circuit_err_2.err_count(), 0); - } - - /// An unhealthy node that recovers is yielded to the caller. - #[tokio::test] - async fn test_balancer_upstream_recovery() { - const BALANCER_CALLS: usize = 10; - - // Initialise 3 RPC clients and configure their mock circuit breakers; - // two returns a unhealthy state, one is healthy. - let circuit = Arc::new(MockCircuitBreaker::default()); - circuit.set_usable(false); - let client = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit)); - - assert_eq!(circuit.ok_count(), 0); - - let balancer = Balancer::new([client]); - - let mut endpoints = balancer.endpoints(); - assert_matches!(endpoints.next(), None); - assert_eq!(circuit.is_usable_count(), 1); - - circuit.set_usable(true); - - let mut endpoints = balancer.endpoints(); - assert_matches!(endpoints.next(), Some(_)); - assert_eq!(circuit.is_usable_count(), 2); - - // The now-healthy client is constantly yielded. - const N: usize = 3; - for _ in 0..N { - endpoints - .next() - .expect("should yield healthy client") - .write(WriteRequest::default()) - .await - .expect("should succeed"); - } - assert_eq!(circuit.ok_count(), N); - } - - // Ensure the balancer round-robins across all healthy clients. - // - // Note this is a property test that asserts the even distribution of the - // client calls, not the order themselves. - #[tokio::test] - async fn test_round_robin() { - const N: usize = 100; - #[allow(clippy::assertions_on_constants)] - { - assert!(N % 2 == 0, "test iterations must be even"); - } - - // Initialise 3 RPC clients and configure their mock circuit breakers; - // two returns a healthy state, one is unhealthy. - let circuit_err = Arc::new(MockCircuitBreaker::default()); - circuit_err.set_usable(false); - let client_err = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_err)); - - let circuit_ok_1 = Arc::new(MockCircuitBreaker::default()); - circuit_ok_1.set_usable(true); - let client_ok_1 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_ok_1)); - - let circuit_ok_2 = Arc::new(MockCircuitBreaker::default()); - circuit_ok_2.set_usable(true); - let client_ok_2 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default())) - .with_circuit_breaker(Arc::clone(&circuit_ok_2)); - - let balancer = Balancer::new([client_err, client_ok_1, client_ok_2]); - - for _ in 0..N { - balancer - .endpoints() - .next() - .expect("should yield healthy client") - .write(WriteRequest::default()) - .await - .expect("should succeed"); - } - - assert_eq!(circuit_err.ok_count(), 0); - assert_eq!(circuit_ok_1.ok_count(), N / 2); - assert_eq!(circuit_ok_2.ok_count(), N / 2); - - assert_eq!(circuit_err.err_count(), 0); - assert_eq!(circuit_ok_1.err_count(), 0); - assert_eq!(circuit_ok_2.err_count(), 0); - } -} diff --git a/router/src/dml_handlers/rpc_write/circuit_breaker.rs b/router/src/dml_handlers/rpc_write/circuit_breaker.rs index 80b0ecd05d..fd498eceb8 100644 --- a/router/src/dml_handlers/rpc_write/circuit_breaker.rs +++ b/router/src/dml_handlers/rpc_write/circuit_breaker.rs @@ -1,3 +1,4 @@ +#![allow(unused)] use std::{ fmt::Debug, sync::{ diff --git a/router/src/dml_handlers/rpc_write/circuit_breaking_client.rs b/router/src/dml_handlers/rpc_write/circuit_breaking_client.rs deleted file mode 100644 index 188de23c48..0000000000 --- a/router/src/dml_handlers/rpc_write/circuit_breaking_client.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::fmt::Debug; - -use async_trait::async_trait; -use generated_types::influxdata::iox::ingester::v1::WriteRequest; - -use super::{circuit_breaker::CircuitBreaker, client::WriteClient, RpcWriteError}; - -/// An internal abstraction over the health probing & result recording -/// functionality of a circuit breaker. -pub(super) trait CircuitBreakerState: Send + Sync + Debug { - /// Returns `true` if this client can be used to make a request with an - /// expectation of success. - fn is_usable(&self) -> bool; - /// Record the result of a request made by this client. - fn observe(&self, r: &Result); -} - -impl CircuitBreakerState for CircuitBreaker { - fn is_usable(&self) -> bool { - self.is_healthy() || self.should_probe() - } - - fn observe(&self, r: &Result) { - self.observe(r) - } -} - -/// A thin composite type decorating the [`WriteClient`] functionality of `T`, -/// with circuit breaking logic from [`CircuitBreaker`]. -#[derive(Debug)] -pub(super) struct CircuitBreakingClient { - /// The underlying [`WriteClient`] implementation. - inner: T, - /// The circuit-breaking logic. - state: C, -} - -impl CircuitBreakingClient { - pub(super) fn new(inner: T) -> Self { - let state = CircuitBreaker::default(); - state.set_healthy(); - Self { inner, state } - } -} - -impl CircuitBreakingClient -where - C: CircuitBreakerState, -{ - /// Returns `true` if this client can be used to make a request with an - /// expectation of success. - pub(super) fn is_usable(&self) -> bool { - self.state.is_usable() - } - - #[cfg(test)] - pub(super) fn with_circuit_breaker(self, breaker: U) -> CircuitBreakingClient { - CircuitBreakingClient { - inner: self.inner, - state: breaker, - } - } -} - -#[async_trait] -impl WriteClient for &CircuitBreakingClient -where - T: WriteClient, - C: CircuitBreakerState, -{ - async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> { - let res = self.inner.write(op).await; - self.state.observe(&res); - res - } -} - -#[cfg(test)] -pub(crate) mod mock { - use super::*; - use std::sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, - }; - - #[derive(Debug, Default)] - pub(crate) struct MockCircuitBreaker { - is_usable: AtomicBool, - is_usable_calls: AtomicUsize, - ok: AtomicUsize, - err: AtomicUsize, - } - - impl MockCircuitBreaker { - pub(crate) fn set_usable(&self, healthy: bool) { - self.is_usable.store(healthy, Ordering::Relaxed); - } - pub(crate) fn ok_count(&self) -> usize { - self.ok.load(Ordering::Relaxed) - } - pub(crate) fn err_count(&self) -> usize { - self.err.load(Ordering::Relaxed) - } - pub(crate) fn is_usable_count(&self) -> usize { - self.is_usable_calls.load(Ordering::Relaxed) - } - } - - impl CircuitBreakerState for Arc { - fn is_usable(&self) -> bool { - self.is_usable_calls.fetch_add(1, Ordering::Relaxed); - self.is_usable.load(Ordering::Relaxed) - } - - fn observe(&self, r: &Result) { - match r { - Ok(_) => &self.ok, - Err(_) => &self.err, - } - .fetch_add(1, Ordering::Relaxed); - } - } -} - -#[cfg(test)] -mod tests { - use std::{borrow::Borrow, sync::Arc}; - - use crate::dml_handlers::rpc_write::client::mock::MockWriteClient; - - use super::{mock::MockCircuitBreaker, *}; - - #[tokio::test] - async fn test_healthy() { - let circuit_breaker = Arc::new(MockCircuitBreaker::default()); - let wrapper = CircuitBreakingClient::new(MockWriteClient::default()) - .with_circuit_breaker(Arc::clone(&circuit_breaker)); - - circuit_breaker.set_usable(true); - assert_eq!(wrapper.is_usable(), circuit_breaker.is_usable()); - circuit_breaker.set_usable(false); - assert_eq!(wrapper.is_usable(), circuit_breaker.is_usable()); - circuit_breaker.set_usable(true); - assert_eq!(wrapper.is_usable(), circuit_breaker.is_usable()); - } - - #[tokio::test] - async fn test_observe() { - let circuit_breaker = Arc::new(MockCircuitBreaker::default()); - let mock_client = Arc::new( - MockWriteClient::default() - .with_ret(vec![Ok(()), Err(RpcWriteError::DeletesUnsupported)]), - ); - let wrapper = CircuitBreakingClient::new(Arc::clone(&mock_client)) - .with_circuit_breaker(Arc::clone(&circuit_breaker)); - - assert_eq!(circuit_breaker.ok_count(), 0); - assert_eq!(circuit_breaker.err_count(), 0); - - wrapper - .borrow() - .write(WriteRequest::default()) - .await - .expect("wrapper should return Ok mock value"); - assert_eq!(circuit_breaker.ok_count(), 1); - assert_eq!(circuit_breaker.err_count(), 0); - - wrapper - .borrow() - .write(WriteRequest::default()) - .await - .expect_err("wrapper should return Err mock value"); - assert_eq!(circuit_breaker.ok_count(), 1); - assert_eq!(circuit_breaker.err_count(), 1); - } -} diff --git a/router/src/server/http.rs b/router/src/server/http.rs index 226388635b..d95a031d37 100644 --- a/router/src/server/http.rs +++ b/router/src/server/http.rs @@ -153,7 +153,6 @@ impl From<&DmlError> for StatusCode { DmlError::RpcWrite(RpcWriteError::Upstream(_)) => StatusCode::INTERNAL_SERVER_ERROR, DmlError::RpcWrite(RpcWriteError::DeletesUnsupported) => StatusCode::NOT_IMPLEMENTED, DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT, - DmlError::RpcWrite(RpcWriteError::NoUpstream) => StatusCode::SERVICE_UNAVAILABLE, } } }