revert: influxdata/dom/rpc-balancer

This reverts commit a3805dbccf, reversing
changes made to bcb1232c5d.
pull/24376/head
Dom Dwyer 2023-01-24 14:47:05 +01:00
parent a3805dbccf
commit 107006c801
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
6 changed files with 90 additions and 507 deletions

View File

@ -7,10 +7,6 @@ use iox_catalog::interface::Catalog;
use ioxd_common::{
add_service,
http::error::{HttpApiError, HttpApiErrorSource},
reexport::{
generated_types::influxdata::iox::ingester::v1::write_service_client::WriteServiceClient,
tonic::transport::Channel,
},
rpc::RpcBuilderInput,
serve_builder,
server_type::{CommonServerState, RpcError, ServerType},
@ -38,7 +34,7 @@ use router::{
},
shard::Shard,
};
use sharder::{JumpHash, Sharder};
use sharder::{JumpHash, RoundRobin, Sharder};
use std::{
collections::BTreeSet,
fmt::{Debug, Display},
@ -263,17 +259,14 @@ pub async fn create_router2_server_type(
// Hack to handle multiple ingester addresses separated by commas in potentially many uses of
// the CLI arg
let ingester_connections = router_config.ingester_addresses.join(",");
let ingester_connections = ingester_connections.split(',').map(|s| {
WriteServiceClient::new(
Channel::from_shared(format!("http://{s}"))
.expect("invalid ingester connection address")
.connect_lazy(),
)
});
let ingester_addresses = router_config.ingester_addresses.join(",");
let grpc_connections = router::dml_handlers::build_ingester_connection(
ingester_addresses.split(',').map(|s| format!("http://{s}")),
);
// Initialise the DML handler that sends writes to the ingester using the RPC write path.
let rpc_writer = RpcWrite::new(ingester_connections);
let rpc_writer = RpcWrite::new(RoundRobin::new([grpc_connections]));
let rpc_writer = InstrumentationDecorator::new("rpc_writer", &metrics, rpc_writer);
// 1. END

View File

@ -1,25 +1,73 @@
mod balancer;
mod circuit_breaker;
mod circuit_breaking_client;
mod client;
use crate::dml_handlers::rpc_write::client::WriteClient;
use self::{balancer::Balancer, circuit_breaking_client::CircuitBreakingClient};
use super::{DmlHandler, Partitioned};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName, TableId};
use dml::{DmlMeta, DmlWrite};
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use generated_types::influxdata::iox::ingester::v1::{
write_service_client::WriteServiceClient, WriteRequest,
};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use mutable_batch_pb::encode::encode_write;
use observability_deps::tracing::*;
use std::{fmt::Debug, time::Duration};
use sharder::RoundRobin;
use std::{fmt::Debug, str::FromStr, time::Duration};
use thiserror::Error;
use tonic::transport::{Channel, Endpoint};
use trace::ctx::SpanContext;
/// Create a connection to one or more ingesters, load-balancing requests across
/// all of them.
///
/// Connections are lazily established.
pub fn build_ingester_connection<T>(addrs: impl Iterator<Item = T>) -> WriteServiceClient<Channel>
where
T: AsRef<str>,
{
let endpoints = addrs
.map(|s| Endpoint::from_str(s.as_ref()).expect("invalid ingester address"))
.collect::<Vec<_>>();
let (channel, tx) = Channel::balance_channel(endpoints.len());
// BUG: tower balance removes failed nodes from the pool, except the last
// node in the pool, which leads to a router talking to one ingester.
//
// As an absolute hack, keep inserting the nodes into the pool to drive
// discovery after they have failed.
//
// https://github.com/influxdata/influxdb_iox/issues/6508
//
tokio::spawn(async move {
loop {
for e in &endpoints {
// The gRPC balance listener will stop first during shutdown.
if tx
.send(tower::discover::Change::Insert(
e.uri().to_owned(),
e.clone(),
))
.await
.is_err()
{
// The gRPC balancer task has stopped - likely because
// the server has stopped.
//
// Do not leak this task, or panic trying to send on the
// closed channel.
return;
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
WriteServiceClient::new(channel)
}
/// The bound on RPC request duration.
///
/// This includes the time taken to send the request, and wait for the response.
@ -36,15 +84,14 @@ pub enum RpcWriteError {
#[error("timeout writing to upstream ingester")]
Timeout(#[from] tokio::time::error::Elapsed),
/// There are no healthy ingesters to route a write to.
#[error("no healthy upstream ingesters available")]
NoUpstream,
/// A delete request was rejected (not supported).
#[error("deletes are not supported")]
DeletesUnsupported,
}
/// A convenience alias for the generated gRPC client.
type GrpcClient = WriteServiceClient<client_util::connection::GrpcConnection>;
/// An [`RpcWrite`] handler submits a write directly to an Ingester via the
/// [gRPC write service].
///
@ -57,22 +104,17 @@ pub enum RpcWriteError {
/// This handler drops delete requests, logging the attempt and returning an
/// error to the client.
///
/// [gRPC write service]: client::WriteClient
/// [gRPC write service]: WriteServiceClient
#[derive(Debug)]
pub struct RpcWrite<C> {
endpoints: Balancer<C>,
pub struct RpcWrite<C = GrpcClient> {
endpoints: RoundRobin<C>,
}
impl<C> RpcWrite<C> {
/// Initialise a new [`RpcWrite`] that sends requests to an arbitrary
/// downstream Ingester, using a round-robin strategy.
pub fn new(endpoints: impl IntoIterator<Item = C>) -> Self
where
C: Send + Sync + Debug,
{
Self {
endpoints: Balancer::new(endpoints.into_iter().map(CircuitBreakingClient::new)),
}
pub fn new(endpoints: RoundRobin<C>) -> Self {
Self { endpoints }
}
}
@ -117,7 +159,19 @@ where
};
// Perform the gRPC write to an ingester.
tokio::time::timeout(RPC_TIMEOUT, write_loop(self.endpoints.endpoints(), req)).await??;
//
// This includes a dirt simple retry mechanism that WILL need improving
// (#6173).
tokio::time::timeout(RPC_TIMEOUT, async {
loop {
match self.endpoints.next().write(req.clone()).await {
Ok(()) => break,
Err(e) => warn!(error=%e, "failed ingester rpc write"),
};
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await?;
debug!(
%partition_key,
@ -150,29 +204,6 @@ where
}
}
async fn write_loop<T>(
mut endpoints: impl Iterator<Item = T> + Send,
req: WriteRequest,
) -> Result<(), RpcWriteError>
where
T: WriteClient,
{
let mut delay = Duration::from_millis(50);
loop {
match endpoints
.next()
.ok_or(RpcWriteError::NoUpstream)?
.write(req.clone())
.await
{
Ok(()) => return Ok(()),
Err(e) => warn!(error=%e, "failed ingester rpc write"),
};
tokio::time::sleep(delay).await;
delay = delay.saturating_mul(2);
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::Arc};
@ -214,7 +245,7 @@ mod tests {
// Init the write handler with a mock client to capture the rpc calls.
let client = Arc::new(MockWriteClient::default());
let handler = RpcWrite::new([Arc::clone(&client)]);
let handler = RpcWrite::new(RoundRobin::new([Arc::clone(&client)]));
// Drive the RPC writer
let got = handler
@ -266,7 +297,10 @@ mod tests {
.with_ret([Err(RpcWriteError::Upstream(tonic::Status::internal("")))]),
);
let client2 = Arc::new(MockWriteClient::default());
let handler = RpcWrite::new([Arc::clone(&client1), Arc::clone(&client2)]);
let handler = RpcWrite::new(RoundRobin::new([
Arc::clone(&client1),
Arc::clone(&client2),
]));
// Drive the RPC writer
let got = handler

View File

@ -1,268 +0,0 @@
use std::{cell::RefCell, cmp::max, fmt::Debug};
use super::{
circuit_breaker::CircuitBreaker,
circuit_breaking_client::{CircuitBreakerState, CircuitBreakingClient},
};
thread_local! {
/// A per-thread counter incremented once per call to
/// [`Balancer::endpoints()`].
static COUNTER: RefCell<usize> = RefCell::new(0);
}
/// A set of health-checked gRPC endpoints, with an approximate round-robin
/// distribution of load over healthy nodes.
///
/// # Health Checking
///
/// The health evaluation of a node is delegated to the
/// [`CircuitBreakingClient`].
///
/// # Request Distribution
///
/// Requests are distributed uniformly across all shards **per thread**. Given
/// enough requests (where `N` is significantly larger than the number of
/// threads) an approximately uniform distribution is achieved.
#[derive(Debug)]
pub(super) struct Balancer<T, C = CircuitBreaker> {
endpoints: Vec<CircuitBreakingClient<T, C>>,
}
impl<T, C> Balancer<T, C>
where
T: Send + Sync + Debug,
C: CircuitBreakerState,
{
/// Construct a new [`Balancer`] distributing work over the healthy
/// `endpoints`.
pub(super) fn new(endpoints: impl IntoIterator<Item = CircuitBreakingClient<T, C>>) -> Self {
Self {
endpoints: endpoints.into_iter().collect(),
}
}
/// Return an (infinite) iterator of healthy [`CircuitBreakingClient`].
///
/// A snapshot of healthy nodes is taken at call time and the health state
/// is evaluated at this point and the result is returned to the caller as
/// an infinite / cycling iterator. A node that becomes unavailable after
/// the snapshot was taken will continue to be returned by the iterator.
pub(super) fn endpoints(&self) -> impl Iterator<Item = &'_ CircuitBreakingClient<T, C>> {
// Grab and increment the current counter.
let counter = COUNTER.with(|cell| {
let mut cell = cell.borrow_mut();
let new_value = cell.wrapping_add(1);
*cell = new_value;
new_value
});
// Take a snapshot containing only healthy nodes.
//
// This ensures unhealthy nodes are not continuously (and unnecessarily)
// polled/probed in the iter cycle below. The low frequency and impact
// of a node becoming unavailable during a single request easily
// outweighs the trade-off of the constant health evaluation overhead.
let snapshot = self
.endpoints
.iter()
.filter(|e| e.is_usable())
.collect::<Vec<_>>();
// Reduce it to the range of [0, N) where N is the number of healthy
// clients in this balancer, ensuring not to calculate the remainder of
// a division by 0.
let idx = counter % max(snapshot.len(), 1);
snapshot.into_iter().cycle().skip(idx)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use crate::dml_handlers::rpc_write::{
circuit_breaking_client::mock::MockCircuitBreaker,
client::{mock::MockWriteClient, WriteClient},
};
use super::*;
/// No healthy nodes yields an empty iterator.
#[tokio::test]
async fn test_balancer_empty_iter() {
const BALANCER_CALLS: usize = 10;
// Initialise 3 RPC clients and configure their mock circuit breakers;
// two returns a unhealthy state, one is healthy.
let circuit_err_1 = Arc::new(MockCircuitBreaker::default());
circuit_err_1.set_usable(false);
let client_err_1 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_err_1));
let circuit_err_2 = Arc::new(MockCircuitBreaker::default());
circuit_err_2.set_usable(false);
let client_err_2 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_err_2));
assert_eq!(circuit_err_1.ok_count(), 0);
assert_eq!(circuit_err_2.ok_count(), 0);
let balancer = Balancer::new([client_err_1, client_err_2]);
let mut endpoints = balancer.endpoints();
assert_matches!(endpoints.next(), None);
}
/// A test that ensures only healthy clients are returned by the balancer,
/// and that they are polled exactly once per request.
#[tokio::test]
async fn test_balancer_yield_healthy_polled_once() {
const BALANCER_CALLS: usize = 10;
// Initialise 3 RPC clients and configure their mock circuit breakers;
// two returns a unhealthy state, one is healthy.
let circuit_err_1 = Arc::new(MockCircuitBreaker::default());
circuit_err_1.set_usable(false);
let client_err_1 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_err_1));
let circuit_err_2 = Arc::new(MockCircuitBreaker::default());
circuit_err_2.set_usable(false);
let client_err_2 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_err_2));
let circuit_ok = Arc::new(MockCircuitBreaker::default());
circuit_ok.set_usable(true);
let client_ok = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_ok));
assert_eq!(circuit_ok.ok_count(), 0);
assert_eq!(circuit_err_1.ok_count(), 0);
assert_eq!(circuit_err_2.ok_count(), 0);
let balancer = Balancer::new([client_err_1, client_ok, client_err_2]);
let mut endpoints = balancer.endpoints();
// Only the health client should be yielded, and it should cycle
// indefinitely.
for i in 1..=BALANCER_CALLS {
endpoints
.next()
.expect("should yield healthy client")
.write(WriteRequest::default())
.await
.expect("should succeed");
assert_eq!(circuit_ok.ok_count(), i);
assert_eq!(circuit_ok.err_count(), 0);
}
// There health of the endpoints should not be constantly re-evaluated
// by a single request (reducing overhead / hot spinning - in the
// probing phase this would serialise clients).
assert_eq!(circuit_ok.is_usable_count(), 1);
assert_eq!(circuit_err_1.is_usable_count(), 1);
assert_eq!(circuit_err_1.is_usable_count(), 1);
// The other clients should not have been invoked.
assert_eq!(circuit_err_1.ok_count(), 0);
assert_eq!(circuit_err_1.err_count(), 0);
assert_eq!(circuit_err_2.ok_count(), 0);
assert_eq!(circuit_err_2.err_count(), 0);
}
/// An unhealthy node that recovers is yielded to the caller.
#[tokio::test]
async fn test_balancer_upstream_recovery() {
const BALANCER_CALLS: usize = 10;
// Initialise 3 RPC clients and configure their mock circuit breakers;
// two returns a unhealthy state, one is healthy.
let circuit = Arc::new(MockCircuitBreaker::default());
circuit.set_usable(false);
let client = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit));
assert_eq!(circuit.ok_count(), 0);
let balancer = Balancer::new([client]);
let mut endpoints = balancer.endpoints();
assert_matches!(endpoints.next(), None);
assert_eq!(circuit.is_usable_count(), 1);
circuit.set_usable(true);
let mut endpoints = balancer.endpoints();
assert_matches!(endpoints.next(), Some(_));
assert_eq!(circuit.is_usable_count(), 2);
// The now-healthy client is constantly yielded.
const N: usize = 3;
for _ in 0..N {
endpoints
.next()
.expect("should yield healthy client")
.write(WriteRequest::default())
.await
.expect("should succeed");
}
assert_eq!(circuit.ok_count(), N);
}
// Ensure the balancer round-robins across all healthy clients.
//
// Note this is a property test that asserts the even distribution of the
// client calls, not the order themselves.
#[tokio::test]
async fn test_round_robin() {
const N: usize = 100;
#[allow(clippy::assertions_on_constants)]
{
assert!(N % 2 == 0, "test iterations must be even");
}
// Initialise 3 RPC clients and configure their mock circuit breakers;
// two returns a healthy state, one is unhealthy.
let circuit_err = Arc::new(MockCircuitBreaker::default());
circuit_err.set_usable(false);
let client_err = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_err));
let circuit_ok_1 = Arc::new(MockCircuitBreaker::default());
circuit_ok_1.set_usable(true);
let client_ok_1 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_ok_1));
let circuit_ok_2 = Arc::new(MockCircuitBreaker::default());
circuit_ok_2.set_usable(true);
let client_ok_2 = CircuitBreakingClient::new(Arc::new(MockWriteClient::default()))
.with_circuit_breaker(Arc::clone(&circuit_ok_2));
let balancer = Balancer::new([client_err, client_ok_1, client_ok_2]);
for _ in 0..N {
balancer
.endpoints()
.next()
.expect("should yield healthy client")
.write(WriteRequest::default())
.await
.expect("should succeed");
}
assert_eq!(circuit_err.ok_count(), 0);
assert_eq!(circuit_ok_1.ok_count(), N / 2);
assert_eq!(circuit_ok_2.ok_count(), N / 2);
assert_eq!(circuit_err.err_count(), 0);
assert_eq!(circuit_ok_1.err_count(), 0);
assert_eq!(circuit_ok_2.err_count(), 0);
}
}

View File

@ -1,3 +1,4 @@
#![allow(unused)]
use std::{
fmt::Debug,
sync::{

View File

@ -1,176 +0,0 @@
use std::fmt::Debug;
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use super::{circuit_breaker::CircuitBreaker, client::WriteClient, RpcWriteError};
/// An internal abstraction over the health probing & result recording
/// functionality of a circuit breaker.
pub(super) trait CircuitBreakerState: Send + Sync + Debug {
/// Returns `true` if this client can be used to make a request with an
/// expectation of success.
fn is_usable(&self) -> bool;
/// Record the result of a request made by this client.
fn observe<T, E>(&self, r: &Result<T, E>);
}
impl CircuitBreakerState for CircuitBreaker {
fn is_usable(&self) -> bool {
self.is_healthy() || self.should_probe()
}
fn observe<T, E>(&self, r: &Result<T, E>) {
self.observe(r)
}
}
/// A thin composite type decorating the [`WriteClient`] functionality of `T`,
/// with circuit breaking logic from [`CircuitBreaker`].
#[derive(Debug)]
pub(super) struct CircuitBreakingClient<T, C = CircuitBreaker> {
/// The underlying [`WriteClient`] implementation.
inner: T,
/// The circuit-breaking logic.
state: C,
}
impl<T> CircuitBreakingClient<T> {
pub(super) fn new(inner: T) -> Self {
let state = CircuitBreaker::default();
state.set_healthy();
Self { inner, state }
}
}
impl<T, C> CircuitBreakingClient<T, C>
where
C: CircuitBreakerState,
{
/// Returns `true` if this client can be used to make a request with an
/// expectation of success.
pub(super) fn is_usable(&self) -> bool {
self.state.is_usable()
}
#[cfg(test)]
pub(super) fn with_circuit_breaker<U>(self, breaker: U) -> CircuitBreakingClient<T, U> {
CircuitBreakingClient {
inner: self.inner,
state: breaker,
}
}
}
#[async_trait]
impl<T, C> WriteClient for &CircuitBreakingClient<T, C>
where
T: WriteClient,
C: CircuitBreakerState,
{
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteError> {
let res = self.inner.write(op).await;
self.state.observe(&res);
res
}
}
#[cfg(test)]
pub(crate) mod mock {
use super::*;
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
#[derive(Debug, Default)]
pub(crate) struct MockCircuitBreaker {
is_usable: AtomicBool,
is_usable_calls: AtomicUsize,
ok: AtomicUsize,
err: AtomicUsize,
}
impl MockCircuitBreaker {
pub(crate) fn set_usable(&self, healthy: bool) {
self.is_usable.store(healthy, Ordering::Relaxed);
}
pub(crate) fn ok_count(&self) -> usize {
self.ok.load(Ordering::Relaxed)
}
pub(crate) fn err_count(&self) -> usize {
self.err.load(Ordering::Relaxed)
}
pub(crate) fn is_usable_count(&self) -> usize {
self.is_usable_calls.load(Ordering::Relaxed)
}
}
impl CircuitBreakerState for Arc<MockCircuitBreaker> {
fn is_usable(&self) -> bool {
self.is_usable_calls.fetch_add(1, Ordering::Relaxed);
self.is_usable.load(Ordering::Relaxed)
}
fn observe<T, E>(&self, r: &Result<T, E>) {
match r {
Ok(_) => &self.ok,
Err(_) => &self.err,
}
.fetch_add(1, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use std::{borrow::Borrow, sync::Arc};
use crate::dml_handlers::rpc_write::client::mock::MockWriteClient;
use super::{mock::MockCircuitBreaker, *};
#[tokio::test]
async fn test_healthy() {
let circuit_breaker = Arc::new(MockCircuitBreaker::default());
let wrapper = CircuitBreakingClient::new(MockWriteClient::default())
.with_circuit_breaker(Arc::clone(&circuit_breaker));
circuit_breaker.set_usable(true);
assert_eq!(wrapper.is_usable(), circuit_breaker.is_usable());
circuit_breaker.set_usable(false);
assert_eq!(wrapper.is_usable(), circuit_breaker.is_usable());
circuit_breaker.set_usable(true);
assert_eq!(wrapper.is_usable(), circuit_breaker.is_usable());
}
#[tokio::test]
async fn test_observe() {
let circuit_breaker = Arc::new(MockCircuitBreaker::default());
let mock_client = Arc::new(
MockWriteClient::default()
.with_ret(vec![Ok(()), Err(RpcWriteError::DeletesUnsupported)]),
);
let wrapper = CircuitBreakingClient::new(Arc::clone(&mock_client))
.with_circuit_breaker(Arc::clone(&circuit_breaker));
assert_eq!(circuit_breaker.ok_count(), 0);
assert_eq!(circuit_breaker.err_count(), 0);
wrapper
.borrow()
.write(WriteRequest::default())
.await
.expect("wrapper should return Ok mock value");
assert_eq!(circuit_breaker.ok_count(), 1);
assert_eq!(circuit_breaker.err_count(), 0);
wrapper
.borrow()
.write(WriteRequest::default())
.await
.expect_err("wrapper should return Err mock value");
assert_eq!(circuit_breaker.ok_count(), 1);
assert_eq!(circuit_breaker.err_count(), 1);
}
}

View File

@ -153,7 +153,6 @@ impl From<&DmlError> for StatusCode {
DmlError::RpcWrite(RpcWriteError::Upstream(_)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::RpcWrite(RpcWriteError::DeletesUnsupported) => StatusCode::NOT_IMPLEMENTED,
DmlError::RpcWrite(RpcWriteError::Timeout(_)) => StatusCode::GATEWAY_TIMEOUT,
DmlError::RpcWrite(RpcWriteError::NoUpstream) => StatusCode::SERVICE_UNAVAILABLE,
}
}
}