test: Add end to end tracing test with mock Jaeger agent (#2594)

pull/24376/head
Andrew Lamb 2021-09-21 12:07:05 -04:00 committed by GitHub
parent 3e74859822
commit 823ff1029a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 351 additions and 64 deletions

View File

@ -148,7 +148,7 @@ jobs:
- cache_restore
- run:
name: Cargo test
command: cargo test --workspace
command: cargo test --features=jaeger --workspace
- cache_save
# end to end tests with Heappy (heap profiling enabled)

View File

@ -1 +1,2 @@
pub mod server_fixture;
pub mod udp_listener;

View File

@ -15,6 +15,8 @@ use futures::prelude::*;
use generated_types::influxdata::iox::management::v1::{
database_status::DatabaseState, ServerStatus,
};
use http::header::HeaderName;
use http::HeaderValue;
use influxdb_iox_client::connection::Connection;
use once_cell::sync::OnceCell;
use tempfile::{NamedTempFile, TempDir};
@ -138,14 +140,14 @@ impl ServerFixture {
/// waits. The database is left unconfigured (no writer id) and
/// is not shared with any other tests.
pub async fn create_single_use() -> Self {
Self::create_single_use_with_env(Default::default()).await
Self::create_single_use_with_config(Default::default()).await
}
/// Create a new server fixture with the provided additional environment variables
/// and wait for it to be ready. The database is left unconfigured (no writer id)
/// and is not shared with any other tests.
pub async fn create_single_use_with_env(env: Vec<(String, String)>) -> Self {
let server = TestServer::new(env);
pub async fn create_single_use_with_config(test_config: TestConfig) -> Self {
let server = TestServer::new(test_config);
let server = Arc::new(server);
// ensure the server is ready
@ -279,8 +281,38 @@ struct TestServer {
/// dropped after the database closes.
dir: TempDir,
/// Configuration values for starting the test server
test_config: TestConfig,
}
// Options for creating test servers
#[derive(Default, Debug)]
pub struct TestConfig {
/// Additional environment variables
env: Vec<(String, String)>,
/// Headers to add to all client requests
client_headers: Vec<(HeaderName, HeaderValue)>,
}
impl TestConfig {
pub fn new() -> Self {
Default::default()
}
// add a name=value environment variable when starting the server
pub fn with_env(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.env.push((name.into(), value.into()));
self
}
// add a name=value http header to all client requests made to the server
pub fn with_client_header(mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> Self {
self.client_headers.push((
name.as_ref().parse().expect("valid header name"),
value.as_ref().parse().expect("valid header value"),
));
self
}
}
struct Process {
@ -289,20 +321,21 @@ struct Process {
}
impl TestServer {
fn new(env: Vec<(String, String)>) -> Self {
fn new(test_config: TestConfig) -> Self {
let addrs = BindAddresses::default();
let ready = Mutex::new(ServerState::Started);
let dir = test_helpers::tmp_dir().unwrap();
let server_process = Mutex::new(Self::create_server_process(&addrs, &dir, &env));
let server_process =
Mutex::new(Self::create_server_process(&addrs, &dir, &test_config.env));
Self {
ready,
server_process,
addrs,
dir,
env,
test_config,
}
}
@ -311,7 +344,8 @@ impl TestServer {
let mut server_process = self.server_process.lock().await;
server_process.child.kill().unwrap();
server_process.child.wait().unwrap();
*server_process = Self::create_server_process(&self.addrs, &self.dir, &self.env);
*server_process =
Self::create_server_process(&self.addrs, &self.dir, &self.test_config.env);
*ready_guard = ServerState::Started;
}
@ -379,7 +413,7 @@ impl TestServer {
// Poll the RPC and HTTP servers separately as they listen on
// different ports but both need to be up for the test to run
let try_grpc_connect = wait_for_grpc(self.addrs());
let try_grpc_connect = self.wait_for_grpc();
let try_http_connect = async {
let client = reqwest::Client::new();
@ -470,50 +504,51 @@ impl TestServer {
};
}
/// Create a connection channel for the gRPC endpoint
async fn grpc_channel(&self) -> influxdb_iox_client::connection::Result<Connection> {
grpc_channel(&self.addrs).await
}
pub async fn wait_for_grpc(&self) {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
fn addrs(&self) -> &BindAddresses {
&self.addrs
}
}
loop {
match self.grpc_channel().await {
Ok(channel) => {
println!("Successfully connected to server");
/// Create a connection channel for the gRPC endpoint
pub async fn grpc_channel(
addrs: &BindAddresses,
) -> influxdb_iox_client::connection::Result<Connection> {
influxdb_iox_client::connection::Builder::default()
.build(&addrs.grpc_base)
.await
}
let mut health = influxdb_iox_client::health::Client::new(channel);
pub async fn wait_for_grpc(addrs: &BindAddresses) {
let mut interval = tokio::time::interval(Duration::from_millis(1000));
loop {
match grpc_channel(addrs).await {
Ok(channel) => {
println!("Successfully connected to server");
let mut health = influxdb_iox_client::health::Client::new(channel);
match health.check_storage().await {
Ok(_) => {
println!("Storage service is running");
return;
}
Err(e) => {
println!("Error checking storage service status: {}", e);
match health.check_storage().await {
Ok(_) => {
println!("Storage service is running");
return;
}
Err(e) => {
println!("Error checking storage service status: {}", e);
}
}
}
Err(e) => {
println!("Waiting for gRPC API to be up: {}", e);
}
}
Err(e) => {
println!("Waiting for gRPC API to be up: {}", e);
}
interval.tick().await;
}
interval.tick().await;
}
/// Create a connection channel for the gRPC endpoint
async fn grpc_channel(&self) -> influxdb_iox_client::connection::Result<Connection> {
let builder = influxdb_iox_client::connection::Builder::default();
self.test_config
.client_headers
.iter()
.fold(builder, |builder, (header_name, header_value)| {
builder.header(header_name, header_value)
})
.build(&self.addrs.grpc_base)
.await
}
/// Returns the addresses to which the server has been bound
fn addrs(&self) -> &BindAddresses {
&self.addrs
}
}

View File

@ -0,0 +1,134 @@
//! Captures UDP packets
use std::{
sync::Arc,
time::{Duration, Instant},
};
/// UDP listener server that captures UDP messages (e.g. Jaeger spans)
/// for use in tests
use parking_lot::Mutex;
use tokio::{net::UdpSocket, select};
use tokio_util::sync::CancellationToken;
/// Maximum time to wait for a message, in seconds
const MAX_WAIT_TIME_SEC: u64 = 2;
/// A UDP message received by this server
#[derive(Clone)]
pub struct Message {
data: Vec<u8>,
}
impl std::fmt::Debug for Message {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Message({} bytes: {}", self.data.len(), self.to_string())
}
}
impl ToString for Message {
fn to_string(&self) -> String {
String::from_utf8_lossy(&self.data).to_string()
}
}
pub struct UdpCapture {
socket_addr: std::net::SocketAddr,
join_handle: tokio::task::JoinHandle<()>,
token: CancellationToken,
messages: Arc<Mutex<Vec<Message>>>,
}
impl UdpCapture {
// Create a new server, listening for Udp messages
pub async fn new() -> Self {
// Bind to some address, letting the OS pick
let socket = UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind udp listener");
let socket_addr = socket.local_addr().unwrap();
println!(
"UDP server listening at {} port {}",
socket_addr.ip(),
socket_addr.port()
);
let token = CancellationToken::new();
let messages = Arc::new(Mutex::new(vec![]));
// Spawns a background task that listens on the
let captured_messages = Arc::clone(&messages);
let captured_token = token.clone();
let join_handle = tokio::spawn(async move {
println!("Starting udp listen");
loop {
let mut data = vec![0; 1024];
select! {
_ = captured_token.cancelled() => {
println!("Received shutdown request");
return;
},
res = socket.recv_from(&mut data) => {
let (sz, _origin) = res.expect("successful socket read");
data.resize(sz, 0);
let mut messages = captured_messages.lock();
messages.push(Message { data });
}
}
}
});
Self {
socket_addr,
join_handle,
token,
messages,
}
}
/// return the ip on which this server is listening
pub fn ip(&self) -> String {
self.socket_addr.ip().to_string()
}
/// return the port on which this server is listening
pub fn port(&self) -> String {
self.socket_addr.port().to_string()
}
/// stop and wait for succesful shutdown of this server
pub async fn stop(self) {
self.token.cancel();
if let Err(e) = self.join_handle.await {
println!("Error waiting for shutdown of udp server: {}", e);
}
}
// Return all messages this server has seen so far
pub fn messages(&self) -> Vec<Message> {
let messages = self.messages.lock();
messages.clone()
}
// wait for a message to appear that passes `pred` or the timeout expires
pub fn wait_for<P>(&self, mut pred: P)
where
P: FnMut(&Message) -> bool,
{
let end = Instant::now() + Duration::from_secs(MAX_WAIT_TIME_SEC);
while Instant::now() < end {
if self.messages.lock().iter().any(|m| pred(m)) {
return;
}
}
panic!(
"Timeout expired before finding find messages that matches predicate. Messages:\n{:#?}",
self.messages.lock()
)
}
}

View File

@ -1,14 +1,11 @@
use crate::common::server_fixture::ServerFixture;
use crate::common::server_fixture::{ServerFixture, TestConfig};
use crate::end_to_end_cases::scenario::Scenario;
use test_helpers::assert_contains;
#[tokio::test]
pub async fn test_row_timestamp() {
let env = vec![(
"INFLUXDB_IOX_ROW_TIMESTAMP_METRICS".to_string(),
"system".to_string(),
)];
let server_fixture = ServerFixture::create_single_use_with_env(env).await;
let test_config = TestConfig::new().with_env("INFLUXDB_IOX_ROW_TIMESTAMP_METRICS", "system");
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;
let mut management_client = server_fixture.management_client();
management_client.update_server_id(1).await.unwrap();

View File

@ -17,6 +17,7 @@ pub mod scenario;
mod sql_cli;
mod storage_api;
mod system_tables;
mod tracing;
mod write_api;
mod write_buffer;
mod write_cli;

View File

@ -35,7 +35,7 @@ use write_buffer::core::WriteBufferWriting;
use write_buffer::kafka::test_utils::{kafka_sequencer_options, purge_kafka_topic};
use write_buffer::kafka::KafkaBufferProducer;
use crate::common::server_fixture::{ServerFixture, DEFAULT_SERVER_ID};
use crate::common::server_fixture::{ServerFixture, TestConfig, DEFAULT_SERVER_ID};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -588,12 +588,9 @@ pub async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec<ChunkSum
pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture {
let server_id = DEFAULT_SERVER_ID;
let env = vec![(
"INFLUXDB_IOX_WIPE_CATALOG_ON_ERROR".to_string(),
"no".to_string(),
)];
let test_config = TestConfig::new().with_env("INFLUXDB_IOX_WIPE_CATALOG_ON_ERROR", "no");
let fixture = ServerFixture::create_single_use_with_env(env).await;
let fixture = ServerFixture::create_single_use_with_config(test_config).await;
fixture
.management_client()
.update_server_id(server_id)
@ -661,9 +658,9 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture {
pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> ServerFixture {
let server_id = DEFAULT_SERVER_ID;
let env = vec![("INFLUXDB_IOX_SKIP_REPLAY".to_string(), "no".to_string())];
let test_config = TestConfig::new().with_env("INFLUXDB_IOX_SKIP_REPLAY", "no");
let fixture = ServerFixture::create_single_use_with_env(env).await;
let fixture = ServerFixture::create_single_use_with_config(test_config).await;
fixture
.management_client()
.update_server_id(server_id)

View File

@ -20,11 +20,11 @@ use std::str;
#[tokio::test]
pub async fn test() {
let storage_fixture = ServerFixture::create_shared().await;
let server_fixture = ServerFixture::create_shared().await;
let influxdb2 = storage_fixture.influxdb2_client();
let mut storage_client = StorageClient::new(storage_fixture.grpc_channel());
let mut management_client = storage_fixture.management_client();
let influxdb2 = server_fixture.influxdb2_client();
let mut storage_client = StorageClient::new(server_fixture.grpc_channel());
let mut management_client = server_fixture.management_client();
let scenario = Scenario::new();
scenario.create_database(&mut management_client).await;

View File

@ -0,0 +1,122 @@
use super::scenario::{collect_query, Scenario};
use crate::common::{
server_fixture::{ServerFixture, TestConfig},
udp_listener::UdpCapture,
};
use futures::TryStreamExt;
use generated_types::{storage_client::StorageClient, ReadFilterRequest};
// cfg at this level so IDE can resolve code even when jaeger feature is not active
#[cfg(feature = "jaeger")]
fn run_test() -> bool {
true
}
#[cfg(not(feature = "jaeger"))]
fn run_test() -> bool {
println!("Skipping test because jaeger feature not enabled");
false
}
async fn setup() -> (UdpCapture, ServerFixture) {
let udp_capture = UdpCapture::new().await;
let test_config = TestConfig::new()
.with_env("TRACES_EXPORTER", "jaeger")
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
.with_client_header("uber-trace-id", "4:3:2:1");
let server_fixture = ServerFixture::create_single_use_with_config(test_config).await;
let mut management_client = server_fixture.management_client();
management_client.update_server_id(1).await.unwrap();
server_fixture.wait_server_initialized().await;
(udp_capture, server_fixture)
}
#[tokio::test]
pub async fn test_tracing_sql() {
if !run_test() {
return;
}
let (udp_capture, server_fixture) = setup().await;
let scenario = Scenario::new();
scenario
.create_database(&mut server_fixture.management_client())
.await;
scenario.load_data(&server_fixture.influxdb2_client()).await;
// run a query, ensure we get traces
let sql_query = "select * from cpu_load_short";
let mut client = server_fixture.flight_client();
let query_results = client
.perform_query(scenario.database_name(), sql_query)
.await
.unwrap();
collect_query(query_results).await;
// "shallow" packet inspection and verify the UDP server got
// something that had some expected results (maybe we could
// eventually verify the payload here too)
udp_capture.wait_for(|m| m.to_string().contains("IOxReadFilterNode"));
// debugging assistance
//println!("Traces received (1):\n\n{:#?}", udp_capture.messages());
// wait for the UDP server to shutdown
udp_capture.stop().await
}
#[tokio::test]
pub async fn test_tracing_storage_api() {
if !run_test() {
return;
}
let (udp_capture, server_fixture) = setup().await;
let scenario = Scenario::new();
scenario
.create_database(&mut server_fixture.management_client())
.await;
scenario.load_data(&server_fixture.influxdb2_client()).await;
// run a query via gRPC, ensure we get traces
let read_source = scenario.read_source();
let range = scenario.timestamp_range();
let predicate = None;
let read_filter_request = tonic::Request::new(ReadFilterRequest {
read_source,
range,
predicate,
});
let mut storage_client = StorageClient::new(server_fixture.grpc_channel());
let read_response = storage_client
.read_filter(read_filter_request)
.await
.unwrap();
read_response
.into_inner()
.try_collect::<Vec<_>>()
.await
.unwrap();
// "shallow" packet inspection and verify the UDP server got
// something that had some expected results (maybe we could
// eventually verify the payload here too)
udp_capture.wait_for(|m| m.to_string().contains("IOxReadFilterNode"));
// debugging assistance
//println!("Traces received (2):\n\n{:#?}", udp_capture.messages());
// wait for the UDP server to shutdown
udp_capture.stop().await
}