fix(router): envoy network error translation

Envoy will connect to an endpoint on demand, and return an
application-level error if it fails with a gRPC status code of
"Unavailable".

It also embeds a metadata entry of {"server": "envoy"} - this commit
uses the two signals (error status code + metadata entry) to drive an
immediate reconnection when observed, assuming the connection is bad.
pull/24376/head
Dom Dwyer 2023-01-30 12:15:38 +01:00
parent 7bf609ce7b
commit 0ddef54b09
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
1 changed files with 35 additions and 1 deletions

View File

@ -15,7 +15,11 @@ use generated_types::influxdata::iox::ingester::v1::{
use observability_deps::tracing::*;
use parking_lot::Mutex;
use tokio::task::JoinHandle;
use tonic::transport::{Channel, Endpoint};
use tonic::{
metadata::AsciiMetadataValue,
transport::{Channel, Endpoint},
Code,
};
use super::{client::WriteClient, RpcWriteError};
@ -86,6 +90,12 @@ impl WriteClient for LazyConnector {
conn.ok_or_else(|| RpcWriteError::UpstreamNotConnected(self.addr.uri().to_string()))?;
match WriteServiceClient::new(conn).write(op).await {
Err(e) if is_envoy_unavailable_error(&e) => {
warn!(error=%e, "detected envoy proxy upstream network error translation, reconnecting");
self.consecutive_errors
.store(RECONNECT_ERROR_COUNT + 1, Ordering::Relaxed);
return Err(e);
}
Err(e) => {
self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
return Err(e);
@ -98,6 +108,30 @@ impl WriteClient for LazyConnector {
}
}
/// Returns `true` if `e` is a gRPC error with the status [`Code::Unavailable`],
/// and a metadata entry indicating the response was generated by an envoy proxy
/// instance.
///
/// This is needed because the envoy proxy effectively converts network errors
/// (dial & I/O errors) into application-level (gRPC) errors, much like a pure
/// HTTP proxy would. Unfortunately this is a breaking change in behaviour for
/// networking code like [`tonic`]'s transport implementation, which can no
/// longer easily differentiate network errors from actual application errors.
fn is_envoy_unavailable_error(e: &RpcWriteError) -> bool {
match e {
RpcWriteError::Upstream(e) if e.code() == Code::Unavailable => e
.metadata()
.get("server")
.map(|v| v == AsciiMetadataValue::from_static("envoy"))
.unwrap_or(false),
RpcWriteError::Upstream(_)
| RpcWriteError::Timeout(_)
| RpcWriteError::NoUpstreams
| RpcWriteError::UpstreamNotConnected(_)
| RpcWriteError::DeletesUnsupported => false,
}
}
impl Drop for LazyConnector {
fn drop(&mut self) {
self.connection_task.abort();