feat(router): Send tracing SpanContext header to ingester during RPC write

pull/24376/head
Fraser Savage 2023-07-12 11:29:20 +01:00
parent b8f1c8f68a
commit 5a37c92c2c
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
11 changed files with 147 additions and 42 deletions

1
Cargo.lock generated
View File

@ -4699,6 +4699,7 @@ dependencies = [
"tokio-stream",
"tonic",
"trace",
"trace_http",
"workspace-hack",
]

View File

@ -636,6 +636,10 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&catalog),
Arc::clone(&object_store),
&router_config,
router_run_config
.tracing_config()
.traces_jaeger_trace_context_header_name
.clone(),
)
.await?;

View File

@ -98,6 +98,11 @@ pub async fn command(config: Config) -> Result<()> {
catalog,
object_store,
&config.router_config,
config
.run_config
.tracing_config()
.traces_jaeger_trace_context_header_name
.clone(),
)
.await?;

View File

@ -218,6 +218,7 @@ pub async fn create_router_server_type(
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
router_config: &RouterConfig,
trace_context_header_name: String,
) -> Result<Arc<dyn ServerType>> {
let ingester_connections = router_config.ingester_addresses.iter().map(|addr| {
let addr = addr.to_string();
@ -228,6 +229,7 @@ pub async fn create_router_server_type(
endpoint,
router_config.rpc_write_timeout_seconds,
router_config.rpc_write_max_outgoing_bytes,
trace_context_header_name.clone(),
),
addr,
)

View File

@ -39,6 +39,8 @@ thiserror = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
tonic = { workspace = true }
trace = { path = "../trace/" }
trace_http = { path = "../trace_http" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]

View File

@ -5,17 +5,11 @@ pub mod client;
pub mod lazy_connector;
mod upstream_snapshot;
use crate::dml_handlers::rpc_write::client::WriteClient;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use self::{
balancer::Balancer,
circuit_breaker::CircuitBreaker,
circuit_breaking_client::{CircuitBreakerState, CircuitBreakingClient},
client::RpcWriteClientError,
upstream_snapshot::UpstreamSnapshot,
};
use super::{DmlHandler, Partitioned};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema, TableId};
use dml::{DmlMeta, DmlWrite};
@ -25,10 +19,19 @@ use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use mutable_batch_pb::encode::encode_write;
use observability_deps::tracing::*;
use std::{fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};
use thiserror::Error;
use trace::ctx::SpanContext;
use self::{
balancer::Balancer,
circuit_breaker::CircuitBreaker,
circuit_breaking_client::{CircuitBreakerState, CircuitBreakingClient},
client::RpcWriteClientError,
upstream_snapshot::UpstreamSnapshot,
};
use super::{DmlHandler, Partitioned};
use crate::dml_handlers::rpc_write::client::WriteClient;
/// The bound on RPC request duration.
///
/// This includes the time taken to send the request, and wait for the response.
@ -193,7 +196,9 @@ where
namespace_id,
writes,
partition_key.clone(),
DmlMeta::unsequenced(span_ctx.clone()),
// The downstream ingester does not receive the [`DmlMeta`] type,
// so the span context must be passed in the request.
DmlMeta::unsequenced(None),
);
// Serialise this write into the wire format.
@ -223,7 +228,8 @@ where
// invariant.
let mut snap = snap.clone();
let req = req.clone();
async move { write_loop(&mut snap, &req).await }
let span_ctx = span_ctx.clone();
async move { write_loop(&mut snap, &req, span_ctx).await }
})
.collect::<FuturesUnordered<_>>()
.enumerate();
@ -286,6 +292,7 @@ where
async fn write_loop<T>(
endpoints: &mut UpstreamSnapshot<T>,
req: &WriteRequest,
span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteError>
where
T: WriteClient,
@ -308,7 +315,7 @@ where
.next()
.expect("not enough replicas in snapshot to satisfy replication factor");
match client.write(req.clone()).await {
match client.write(req.clone(), span_ctx.clone()).await {
Ok(()) => {
endpoints.remove(client);
return Ok(());

View File

@ -278,7 +278,7 @@ mod tests {
let _ = endpoints
.next()
.unwrap()
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await;
assert!((circuit_err_1.ok_count() == 1) ^ (circuit_err_2.ok_count() == 1));
assert!(circuit_ok.ok_count() == 0);
@ -287,7 +287,7 @@ mod tests {
let _ = endpoints
.next()
.unwrap()
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await;
assert!((circuit_err_1.ok_count() == 1) ^ (circuit_err_2.ok_count() == 1));
assert!(circuit_ok.ok_count() == 1);
@ -296,7 +296,7 @@ mod tests {
let _ = endpoints
.next()
.unwrap()
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await;
assert!((circuit_err_1.ok_count() == 2) ^ (circuit_err_2.ok_count() == 2));
assert!(circuit_ok.ok_count() == 1);
@ -305,7 +305,7 @@ mod tests {
let _ = endpoints
.next()
.unwrap()
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await;
assert!((circuit_err_1.ok_count() == 2) ^ (circuit_err_2.ok_count() == 2));
assert!(circuit_ok.ok_count() == 2);
@ -352,7 +352,7 @@ mod tests {
endpoints
.next()
.expect("should yield healthy client")
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await
.expect("should succeed");
@ -409,7 +409,7 @@ mod tests {
endpoints
.next()
.expect("should yield healthy client")
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await
.expect("should succeed");
}
@ -456,7 +456,7 @@ mod tests {
.unwrap()
.next()
.expect("should yield healthy client")
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await
.expect("should succeed");
}

View File

@ -2,6 +2,7 @@ use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::WriteRequest;
use trace::ctx::SpanContext;
use super::{
circuit_breaker::CircuitBreaker,
@ -98,8 +99,12 @@ where
T: WriteClient,
C: CircuitBreakerState,
{
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
let res = self.inner.write(op).await;
async fn write(
&self,
op: WriteRequest,
span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteClientError> {
let res = self.inner.write(op, span_ctx).await;
self.state.observe(&res);
res
}
@ -216,7 +221,7 @@ mod tests {
wrapper
.borrow()
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await
.expect("wrapper should return Ok mock value");
assert_eq!(circuit_breaker.ok_count(), 1);
@ -224,7 +229,7 @@ mod tests {
wrapper
.borrow()
.write(WriteRequest::default())
.write(WriteRequest::default(), None)
.await
.expect_err("wrapper should return Err mock value");
assert_eq!(circuit_breaker.ok_count(), 1);

View File

@ -1,12 +1,15 @@
//! Abstraction over RPC client
use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::{
write_service_client::WriteServiceClient, WriteRequest,
};
use thiserror::Error;
use tonic::IntoRequest;
use trace::ctx::SpanContext;
use trace_http::ctx::format_jaeger_trace_context;
/// Request errors returned by [`WriteClient`] implementations.
#[derive(Debug, Error)]
@ -19,13 +22,25 @@ pub enum RpcWriteClientError {
/// The upstream ingester returned an error response.
#[error("upstream ingester error: {0}")]
Upstream(#[from] tonic::Status),
/// The client is misconfigured and has produced an invalid request metadata key.
#[error("misconfigured client producing invalid metadata key: {0}")]
MisconfiguredMetadataKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
/// The client is misconfigured and has produced an invalid request metadata value.
#[error("misconfigured client producing invalid metadata value: {0}")]
MisconfiguredMetadataValue(#[from] tonic::metadata::errors::InvalidMetadataValue),
}
/// An abstract RPC client that pushes `op` to an opaque receiver.
#[async_trait]
pub(super) trait WriteClient: Send + Sync + std::fmt::Debug {
/// Write `op` and wait for a response.
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError>;
async fn write(
&self,
op: WriteRequest,
span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteClientError>;
}
#[async_trait]
@ -33,16 +48,52 @@ impl<T> WriteClient for Arc<T>
where
T: WriteClient,
{
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
(**self).write(op).await
async fn write(
&self,
op: WriteRequest,
span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteClientError> {
(**self).write(op, span_ctx).await
}
}
#[derive(Debug)]
pub(crate) struct TracePropagatingWriteClient<'a> {
inner: WriteServiceClient<tonic::transport::Channel>,
trace_context_header_name: &'a str,
}
impl<'a> TracePropagatingWriteClient<'a> {
pub(crate) fn new(
inner: WriteServiceClient<tonic::transport::Channel>,
trace_context_header_name: &'a str,
) -> Self {
Self {
inner,
trace_context_header_name,
}
}
}
/// An implementation of [`WriteClient`] for the tonic gRPC client.
#[async_trait]
impl WriteClient for WriteServiceClient<tonic::transport::Channel> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
WriteServiceClient::write(&mut self.clone(), op).await?;
impl<'a> WriteClient for TracePropagatingWriteClient<'a> {
async fn write(
&self,
op: WriteRequest,
span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteClientError> {
let mut req = tonic::Request::new(op).into_request();
if let Some(span_ctx) = span_ctx {
req.metadata_mut().insert(
tonic::metadata::MetadataKey::from_bytes(
self.trace_context_header_name.as_bytes(),
)?,
tonic::metadata::MetadataValue::try_from(&format_jaeger_trace_context(&span_ctx))?,
);
};
WriteServiceClient::write(&mut self.inner.clone(), req).await?;
Ok(())
}
}
@ -112,7 +163,11 @@ pub mod mock {
#[async_trait]
impl WriteClient for Arc<MockWriteClient> {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
async fn write(
&self,
op: WriteRequest,
_span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteClientError> {
let mut guard = self.state.lock();
guard.calls.push(op);

View File

@ -20,8 +20,9 @@ use tonic::{
transport::{Channel, Endpoint},
Code,
};
use trace::ctx::SpanContext;
use super::client::{RpcWriteClientError, WriteClient};
use super::client::{RpcWriteClientError, TracePropagatingWriteClient, WriteClient};
const RETRY_INTERVAL: Duration = Duration::from_secs(1);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
@ -57,11 +58,18 @@ pub struct LazyConnector {
/// A task that periodically opens a new connection to `addr` when
/// `consecutive_errors` is more than [`RECONNECT_ERROR_COUNT`].
connection_task: JoinHandle<()>,
trace_context_header_name: String,
}
impl LazyConnector {
/// Lazily connect to `addr`.
pub fn new(addr: Endpoint, request_timeout: Duration, max_outgoing_msg_bytes: usize) -> Self {
pub fn new(
addr: Endpoint,
request_timeout: Duration,
max_outgoing_msg_bytes: usize,
trace_context_header_name: String,
) -> Self {
let addr = addr
.connect_timeout(CONNECT_TIMEOUT)
.timeout(request_timeout);
@ -79,6 +87,7 @@ impl LazyConnector {
Arc::clone(&consecutive_errors),
)),
consecutive_errors,
trace_context_header_name,
}
}
@ -94,17 +103,24 @@ impl LazyConnector {
#[async_trait]
impl WriteClient for LazyConnector {
async fn write(&self, op: WriteRequest) -> Result<(), RpcWriteClientError> {
async fn write(
&self,
op: WriteRequest,
span_ctx: Option<SpanContext>,
) -> Result<(), RpcWriteClientError> {
let conn = self.connection.lock().clone();
let conn = conn.ok_or_else(|| {
RpcWriteClientError::UpstreamNotConnected(self.addr.uri().to_string())
})?;
match WriteServiceClient::new(conn)
.max_encoding_message_size(self.max_outgoing_msg_bytes)
.max_decoding_message_size(MAX_INCOMING_MSG_BYTES)
.write(op)
.await
match TracePropagatingWriteClient::new(
WriteServiceClient::new(conn)
.max_encoding_message_size(self.max_outgoing_msg_bytes)
.max_decoding_message_size(MAX_INCOMING_MSG_BYTES),
&self.trace_context_header_name,
)
.write(op, span_ctx)
.await
{
Err(e) if is_envoy_unavailable_error(&e) => {
warn!(error=%e, "detected envoy proxy upstream network error translation, reconnecting");
@ -141,6 +157,8 @@ fn is_envoy_unavailable_error(e: &RpcWriteClientError) -> bool {
.map(|v| v == AsciiMetadataValue::from_static("envoy"))
.unwrap_or(false),
RpcWriteClientError::Upstream(_) => false,
RpcWriteClientError::MisconfiguredMetadataKey(_) => false,
RpcWriteClientError::MisconfiguredMetadataValue(_) => false,
RpcWriteClientError::UpstreamNotConnected(_) => unreachable!(),
}
}

View File

@ -156,6 +156,12 @@ impl From<&DmlError> for StatusCode {
DmlError::RpcWrite(RpcWriteError::Client(RpcWriteClientError::Upstream(_))) => {
StatusCode::INTERNAL_SERVER_ERROR
}
DmlError::RpcWrite(RpcWriteError::Client(
RpcWriteClientError::MisconfiguredMetadataKey(_),
)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::RpcWrite(RpcWriteError::Client(
RpcWriteClientError::MisconfiguredMetadataValue(_),
)) => StatusCode::INTERNAL_SERVER_ERROR,
DmlError::RpcWrite(RpcWriteError::Client(
RpcWriteClientError::UpstreamNotConnected(_),
)) => StatusCode::SERVICE_UNAVAILABLE,