feat: add end-to-end tracing test (#2285)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
7cee2a4d7a
commit
756f5c6699
src
influxdb_ioxd
trace/src
|
@ -13,6 +13,7 @@ use server::{
|
|||
};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{convert::TryFrom, net::SocketAddr, sync::Arc};
|
||||
use trace::{LogTraceCollector, TraceCollector};
|
||||
|
||||
mod http;
|
||||
mod planner;
|
||||
|
@ -21,23 +22,15 @@ pub(crate) mod serving_readiness;
|
|||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display(
|
||||
"Unable to bind to listen for HTTP requests on {}: {}",
|
||||
bind_addr,
|
||||
source
|
||||
))]
|
||||
#[snafu(display("Unable to bind to listen for HTTP requests on {}: {}", addr, source))]
|
||||
StartListeningHttp {
|
||||
bind_addr: SocketAddr,
|
||||
addr: SocketAddr,
|
||||
source: hyper::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unable to bind to listen for gRPC requests on {}: {}",
|
||||
grpc_bind_addr,
|
||||
source
|
||||
))]
|
||||
#[snafu(display("Unable to bind to listen for gRPC requests on {}: {}", addr, source))]
|
||||
StartListeningGrpc {
|
||||
grpc_bind_addr: SocketAddr,
|
||||
addr: SocketAddr,
|
||||
source: std::io::Error,
|
||||
},
|
||||
|
||||
|
@ -160,7 +153,39 @@ pub async fn main(config: Config) -> Result<()> {
|
|||
let application = make_application(&config).await?;
|
||||
let app_server = make_server(Arc::clone(&application), &config);
|
||||
|
||||
serve(config, application, app_server).await
|
||||
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
|
||||
let http_listener = http_listener(config.http_bind_address).await?;
|
||||
let trace_collector = Arc::new(LogTraceCollector::new());
|
||||
|
||||
serve(
|
||||
config,
|
||||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
trace_collector,
|
||||
app_server,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
|
||||
let listener = tokio::net::TcpListener::bind(addr)
|
||||
.await
|
||||
.context(StartListeningGrpc { addr })?;
|
||||
|
||||
match listener.local_addr() {
|
||||
Ok(local_addr) => info!(%local_addr, "bound gRPC listener"),
|
||||
Err(_) => info!(%addr, "bound gRPC listener"),
|
||||
}
|
||||
|
||||
Ok(listener)
|
||||
}
|
||||
|
||||
async fn http_listener(addr: SocketAddr) -> Result<AddrIncoming> {
|
||||
let listener = AddrIncoming::bind(&addr).context(StartListeningHttp { addr })?;
|
||||
info!(bind_addr=%listener.local_addr(), "bound HTTP listener");
|
||||
|
||||
Ok(listener)
|
||||
}
|
||||
|
||||
/// Instantiates the gRPC and HTTP listeners and returns a Future that completes when
|
||||
|
@ -170,42 +195,39 @@ pub async fn main(config: Config) -> Result<()> {
|
|||
async fn serve(
|
||||
config: Config,
|
||||
application: Arc<ApplicationState>,
|
||||
grpc_listener: tokio::net::TcpListener,
|
||||
http_listener: AddrIncoming,
|
||||
trace_collector: Arc<dyn TraceCollector>,
|
||||
app_server: Arc<AppServer<ConnectionManager>>,
|
||||
) -> Result<()> {
|
||||
// Construct a token to trigger shutdown of API services
|
||||
let frontend_shutdown = tokio_util::sync::CancellationToken::new();
|
||||
|
||||
// Construct and start up gRPC server
|
||||
let grpc_bind_addr = config.grpc_bind_address;
|
||||
let socket = tokio::net::TcpListener::bind(grpc_bind_addr)
|
||||
.await
|
||||
.context(StartListeningGrpc { grpc_bind_addr })?;
|
||||
|
||||
let grpc_server = rpc::serve(
|
||||
socket,
|
||||
grpc_listener,
|
||||
Arc::clone(&application),
|
||||
Arc::clone(&app_server),
|
||||
trace_collector,
|
||||
frontend_shutdown.clone(),
|
||||
config.initial_serving_state.into(),
|
||||
)
|
||||
.fuse();
|
||||
|
||||
info!(bind_address=?grpc_bind_addr, "gRPC server listening");
|
||||
|
||||
let bind_addr = config.http_bind_address;
|
||||
let addr = AddrIncoming::bind(&bind_addr).context(StartListeningHttp { bind_addr })?;
|
||||
info!("gRPC server listening");
|
||||
|
||||
let max_http_request_size = config.max_http_request_size;
|
||||
|
||||
let http_server = http::serve(
|
||||
addr,
|
||||
http_listener,
|
||||
Arc::clone(&application),
|
||||
Arc::clone(&app_server),
|
||||
frontend_shutdown.clone(),
|
||||
max_http_request_size,
|
||||
)
|
||||
.fuse();
|
||||
info!(bind_address=?bind_addr, "HTTP server listening");
|
||||
info!("HTTP server listening");
|
||||
|
||||
info!("InfluxDB IOx server ready");
|
||||
|
||||
|
@ -298,21 +320,56 @@ async fn serve(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use ::http::{header::HeaderName, HeaderValue};
|
||||
use data_types::{database_rules::DatabaseRules, DatabaseName};
|
||||
use std::convert::TryInto;
|
||||
use structopt::StructOpt;
|
||||
use trace::RingBufferTraceCollector;
|
||||
|
||||
fn test_config(server_id: Option<u32>) -> Config {
|
||||
let mut config = Config::from_iter(&[
|
||||
"run",
|
||||
"--api-bind",
|
||||
"127.0.0.1:0",
|
||||
"--grpc-bind",
|
||||
"127.0.0.1:0",
|
||||
]);
|
||||
config.server_id = server_id.map(|x| x.try_into().unwrap());
|
||||
config
|
||||
}
|
||||
|
||||
async fn test_serve(
|
||||
config: Config,
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<AppServer<ConnectionManager>>,
|
||||
) {
|
||||
let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap();
|
||||
let http_listener = http_listener(config.grpc_bind_address).await.unwrap();
|
||||
|
||||
serve(
|
||||
config,
|
||||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
Arc::new(LogTraceCollector::new()),
|
||||
server,
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_server_shutdown() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a server and wait for it to initialize
|
||||
let config = Config::from_iter(&["run", "--server-id", "23"]);
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
// Start serving
|
||||
let serve_fut = serve(config, application, Arc::clone(&server)).fuse();
|
||||
let serve_fut = test_serve(config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing to trigger termination, so serve future should continue running
|
||||
|
@ -334,17 +391,11 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_server_shutdown_uninit() {
|
||||
// Create a server but don't set a server id
|
||||
let config = Config::from_iter(&[
|
||||
"run",
|
||||
"--api-bind",
|
||||
"127.0.0.1:0",
|
||||
"--grpc-bind",
|
||||
"127.0.0.1:0",
|
||||
]);
|
||||
let config = test_config(None);
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
|
||||
let serve_fut = serve(config, application, Arc::clone(&server)).fuse();
|
||||
let serve_fut = test_serve(config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
|
@ -371,20 +422,12 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_server_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = Config::from_iter(&[
|
||||
"run",
|
||||
"--server-id",
|
||||
"999999999",
|
||||
"--api-bind",
|
||||
"127.0.0.1:0",
|
||||
"--grpc-bind",
|
||||
"127.0.0.1:0",
|
||||
]);
|
||||
let config = test_config(Some(999999999));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let serve_fut = serve(config, application, Arc::clone(&server)).fuse();
|
||||
let serve_fut = test_serve(config, application, Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
|
@ -406,15 +449,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_database_panic() {
|
||||
// Create a server and wait for it to initialize
|
||||
let config = Config::from_iter(&[
|
||||
"run",
|
||||
"--server-id",
|
||||
"23",
|
||||
"--api-bind",
|
||||
"127.0.0.1:0",
|
||||
"--grpc-bind",
|
||||
"127.0.0.1:0",
|
||||
]);
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
@ -428,7 +463,7 @@ mod tests {
|
|||
|
||||
let other_db = server.database(&other_db_name).unwrap();
|
||||
|
||||
let serve_fut = serve(config, Arc::clone(&application), Arc::clone(&server)).fuse();
|
||||
let serve_fut = test_serve(config, Arc::clone(&application), Arc::clone(&server)).fuse();
|
||||
pin_mut!(serve_fut);
|
||||
|
||||
// Nothing should have triggered shutdown so serve shouldn't finish
|
||||
|
@ -476,4 +511,87 @@ mod tests {
|
|||
tracker::TaskResult::Success
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tracing() {
|
||||
let config = test_config(Some(23));
|
||||
let application = make_application(&config).await.unwrap();
|
||||
let server = make_server(Arc::clone(&application), &config);
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
let trace_collector = Arc::new(RingBufferTraceCollector::new(20));
|
||||
|
||||
let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap();
|
||||
let http_listener = http_listener(config.grpc_bind_address).await.unwrap();
|
||||
|
||||
let addr = grpc_listener.local_addr().unwrap();
|
||||
|
||||
let fut = serve(
|
||||
config,
|
||||
application,
|
||||
grpc_listener,
|
||||
http_listener,
|
||||
Arc::<RingBufferTraceCollector>::clone(&trace_collector),
|
||||
Arc::clone(&server),
|
||||
);
|
||||
|
||||
let join = tokio::spawn(fut);
|
||||
|
||||
let client = influxdb_iox_client::connection::Builder::default()
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut client = influxdb_iox_client::management::Client::new(client);
|
||||
|
||||
client.list_databases().await.unwrap();
|
||||
|
||||
assert_eq!(trace_collector.spans().len(), 0);
|
||||
|
||||
let tracing_client = influxdb_iox_client::connection::Builder::default()
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-sampled"),
|
||||
HeaderValue::from_static("1"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-traceid"),
|
||||
HeaderValue::from_static("999999999"),
|
||||
)
|
||||
.header(
|
||||
HeaderName::from_static("x-b3-spanid"),
|
||||
HeaderValue::from_static("111111"),
|
||||
)
|
||||
.build(format!("http://{}", addr))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut tracing_client = influxdb_iox_client::management::Client::new(tracing_client);
|
||||
|
||||
tracing_client.list_databases().await.unwrap();
|
||||
tracing_client.get_server_status().await.unwrap();
|
||||
|
||||
let spans = trace_collector.spans();
|
||||
assert_eq!(spans.len(), 2);
|
||||
|
||||
let spans: Vec<trace::span::Span<'_>> = spans
|
||||
.iter()
|
||||
.map(|x| serde_json::from_str(x.as_str()).unwrap())
|
||||
.collect();
|
||||
|
||||
assert_eq!(spans[0].name, "IOx");
|
||||
assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 111111);
|
||||
assert_eq!(spans[0].ctx.trace_id.0.get(), 999999999);
|
||||
assert!(spans[0].start.is_some());
|
||||
assert!(spans[0].end.is_some());
|
||||
|
||||
assert_eq!(spans[1].name, "IOx");
|
||||
assert_eq!(spans[1].ctx.parent_span_id.unwrap().0.get(), 111111);
|
||||
assert_eq!(spans[1].ctx.trace_id.0.get(), 999999999);
|
||||
assert!(spans[1].start.is_some());
|
||||
assert!(spans[1].end.is_some());
|
||||
|
||||
assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id);
|
||||
server.shutdown();
|
||||
join.await.unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ use tonic::transport::NamedService;
|
|||
|
||||
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
|
||||
use server::{ApplicationState, ConnectionManager, Server};
|
||||
use trace::TraceCollector;
|
||||
|
||||
pub mod error;
|
||||
mod flight;
|
||||
|
@ -70,15 +71,13 @@ pub async fn serve<M>(
|
|||
socket: TcpListener,
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
trace_collector: Arc<dyn TraceCollector>,
|
||||
shutdown: CancellationToken,
|
||||
serving_readiness: ServingReadiness,
|
||||
) -> Result<()>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
// TODO: Replace this with a jaeger collector
|
||||
let trace_collector = Arc::new(trace::LogTraceCollector::new());
|
||||
|
||||
let stream = TcpListenerStream::new(socket);
|
||||
|
||||
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
|
||||
|
|
|
@ -47,7 +47,7 @@ pub enum DecodeError {
|
|||
ZeroError,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TraceId(pub NonZeroU128);
|
||||
|
||||
impl<'a> FromStr for TraceId {
|
||||
|
@ -64,7 +64,7 @@ impl<'a> FromStr for TraceId {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct SpanId(pub NonZeroU64);
|
||||
|
||||
impl SpanId {
|
||||
|
|
Loading…
Reference in New Issue