From 0ddef54b09a790be58f1aa1c54ac82b196a229ee Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 30 Jan 2023 12:15:38 +0100 Subject: [PATCH] fix(router): envoy network error translation Envoy will connect to an endpoint on demand, and return an application-level error if it fails with a gRPC status code of "Unavailable". It also embeds a metadata entry of {"server": "envoy"} - this commit uses the two signals (error status code + metadata entry) to drive an immediate reconnection when observed, assuming the connection is bad. --- .../dml_handlers/rpc_write/lazy_connector.rs | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/router/src/dml_handlers/rpc_write/lazy_connector.rs b/router/src/dml_handlers/rpc_write/lazy_connector.rs index e60d5298e2..6325c8e96f 100644 --- a/router/src/dml_handlers/rpc_write/lazy_connector.rs +++ b/router/src/dml_handlers/rpc_write/lazy_connector.rs @@ -15,7 +15,11 @@ use generated_types::influxdata::iox::ingester::v1::{ use observability_deps::tracing::*; use parking_lot::Mutex; use tokio::task::JoinHandle; -use tonic::transport::{Channel, Endpoint}; +use tonic::{ + metadata::AsciiMetadataValue, + transport::{Channel, Endpoint}, + Code, +}; use super::{client::WriteClient, RpcWriteError}; @@ -86,6 +90,12 @@ impl WriteClient for LazyConnector { conn.ok_or_else(|| RpcWriteError::UpstreamNotConnected(self.addr.uri().to_string()))?; match WriteServiceClient::new(conn).write(op).await { + Err(e) if is_envoy_unavailable_error(&e) => { + warn!(error=%e, "detected envoy proxy upstream network error translation, reconnecting"); + self.consecutive_errors + .store(RECONNECT_ERROR_COUNT + 1, Ordering::Relaxed); + return Err(e); + } Err(e) => { self.consecutive_errors.fetch_add(1, Ordering::Relaxed); return Err(e); @@ -98,6 +108,30 @@ impl WriteClient for LazyConnector { } } +/// Returns `true` if `e` is a gRPC error with the status [`Code::Unavailable`], +/// and a metadata entry indicating the response was generated by an envoy proxy +/// instance. +/// +/// This is needed because the envoy proxy effectively converts network errors +/// (dial & I/O errors) into application-level (gRPC) errors, much like a pure +/// HTTP proxy would. Unfortunately this is a breaking change in behaviour for +/// networking code like [`tonic`]'s transport implementation, which can no +/// longer easily differentiate network errors from actual application errors. +fn is_envoy_unavailable_error(e: &RpcWriteError) -> bool { + match e { + RpcWriteError::Upstream(e) if e.code() == Code::Unavailable => e + .metadata() + .get("server") + .map(|v| v == AsciiMetadataValue::from_static("envoy")) + .unwrap_or(false), + RpcWriteError::Upstream(_) + | RpcWriteError::Timeout(_) + | RpcWriteError::NoUpstreams + | RpcWriteError::UpstreamNotConnected(_) + | RpcWriteError::DeletesUnsupported => false, + } +} + impl Drop for LazyConnector { fn drop(&mut self) { self.connection_task.abort();