diff --git a/.circleci/config.yml b/.circleci/config.yml index 4c88bb10d6..c6b8140f30 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -148,7 +148,7 @@ jobs: - cache_restore - run: name: Cargo test - command: cargo test --workspace + command: cargo test --features=jaeger --workspace - cache_save # end to end tests with Heappy (heap profiling enabled) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a057eb4940..4efc5c0667 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1 +1,2 @@ pub mod server_fixture; +pub mod udp_listener; diff --git a/tests/common/server_fixture.rs b/tests/common/server_fixture.rs index 2be6fca6e5..764d570e06 100644 --- a/tests/common/server_fixture.rs +++ b/tests/common/server_fixture.rs @@ -15,6 +15,8 @@ use futures::prelude::*; use generated_types::influxdata::iox::management::v1::{ database_status::DatabaseState, ServerStatus, }; +use http::header::HeaderName; +use http::HeaderValue; use influxdb_iox_client::connection::Connection; use once_cell::sync::OnceCell; use tempfile::{NamedTempFile, TempDir}; @@ -138,14 +140,14 @@ impl ServerFixture { /// waits. The database is left unconfigured (no writer id) and /// is not shared with any other tests. pub async fn create_single_use() -> Self { - Self::create_single_use_with_env(Default::default()).await + Self::create_single_use_with_config(Default::default()).await } /// Create a new server fixture with the provided additional environment variables /// and wait for it to be ready. The database is left unconfigured (no writer id) /// and is not shared with any other tests. - pub async fn create_single_use_with_env(env: Vec<(String, String)>) -> Self { - let server = TestServer::new(env); + pub async fn create_single_use_with_config(test_config: TestConfig) -> Self { + let server = TestServer::new(test_config); let server = Arc::new(server); // ensure the server is ready @@ -279,8 +281,38 @@ struct TestServer { /// dropped after the database closes. dir: TempDir, + /// Configuration values for starting the test server + test_config: TestConfig, +} + +// Options for creating test servers +#[derive(Default, Debug)] +pub struct TestConfig { /// Additional environment variables env: Vec<(String, String)>, + + /// Headers to add to all client requests + client_headers: Vec<(HeaderName, HeaderValue)>, +} + +impl TestConfig { + pub fn new() -> Self { + Default::default() + } + // add a name=value environment variable when starting the server + pub fn with_env(mut self, name: impl Into, value: impl Into) -> Self { + self.env.push((name.into(), value.into())); + self + } + + // add a name=value http header to all client requests made to the server + pub fn with_client_header(mut self, name: impl AsRef, value: impl AsRef) -> Self { + self.client_headers.push(( + name.as_ref().parse().expect("valid header name"), + value.as_ref().parse().expect("valid header value"), + )); + self + } } struct Process { @@ -289,20 +321,21 @@ struct Process { } impl TestServer { - fn new(env: Vec<(String, String)>) -> Self { + fn new(test_config: TestConfig) -> Self { let addrs = BindAddresses::default(); let ready = Mutex::new(ServerState::Started); let dir = test_helpers::tmp_dir().unwrap(); - let server_process = Mutex::new(Self::create_server_process(&addrs, &dir, &env)); + let server_process = + Mutex::new(Self::create_server_process(&addrs, &dir, &test_config.env)); Self { ready, server_process, addrs, dir, - env, + test_config, } } @@ -311,7 +344,8 @@ impl TestServer { let mut server_process = self.server_process.lock().await; server_process.child.kill().unwrap(); server_process.child.wait().unwrap(); - *server_process = Self::create_server_process(&self.addrs, &self.dir, &self.env); + *server_process = + Self::create_server_process(&self.addrs, &self.dir, &self.test_config.env); *ready_guard = ServerState::Started; } @@ -379,7 +413,7 @@ impl TestServer { // Poll the RPC and HTTP servers separately as they listen on // different ports but both need to be up for the test to run - let try_grpc_connect = wait_for_grpc(self.addrs()); + let try_grpc_connect = self.wait_for_grpc(); let try_http_connect = async { let client = reqwest::Client::new(); @@ -470,50 +504,51 @@ impl TestServer { }; } - /// Create a connection channel for the gRPC endpoint - async fn grpc_channel(&self) -> influxdb_iox_client::connection::Result { - grpc_channel(&self.addrs).await - } + pub async fn wait_for_grpc(&self) { + let mut interval = tokio::time::interval(Duration::from_millis(1000)); - fn addrs(&self) -> &BindAddresses { - &self.addrs - } -} + loop { + match self.grpc_channel().await { + Ok(channel) => { + println!("Successfully connected to server"); -/// Create a connection channel for the gRPC endpoint -pub async fn grpc_channel( - addrs: &BindAddresses, -) -> influxdb_iox_client::connection::Result { - influxdb_iox_client::connection::Builder::default() - .build(&addrs.grpc_base) - .await -} + let mut health = influxdb_iox_client::health::Client::new(channel); -pub async fn wait_for_grpc(addrs: &BindAddresses) { - let mut interval = tokio::time::interval(Duration::from_millis(1000)); - - loop { - match grpc_channel(addrs).await { - Ok(channel) => { - println!("Successfully connected to server"); - - let mut health = influxdb_iox_client::health::Client::new(channel); - - match health.check_storage().await { - Ok(_) => { - println!("Storage service is running"); - return; - } - Err(e) => { - println!("Error checking storage service status: {}", e); + match health.check_storage().await { + Ok(_) => { + println!("Storage service is running"); + return; + } + Err(e) => { + println!("Error checking storage service status: {}", e); + } } } + Err(e) => { + println!("Waiting for gRPC API to be up: {}", e); + } } - Err(e) => { - println!("Waiting for gRPC API to be up: {}", e); - } + interval.tick().await; } - interval.tick().await; + } + + /// Create a connection channel for the gRPC endpoint + async fn grpc_channel(&self) -> influxdb_iox_client::connection::Result { + let builder = influxdb_iox_client::connection::Builder::default(); + + self.test_config + .client_headers + .iter() + .fold(builder, |builder, (header_name, header_value)| { + builder.header(header_name, header_value) + }) + .build(&self.addrs.grpc_base) + .await + } + + /// Returns the addresses to which the server has been bound + fn addrs(&self) -> &BindAddresses { + &self.addrs } } diff --git a/tests/common/udp_listener.rs b/tests/common/udp_listener.rs new file mode 100644 index 0000000000..597c786be5 --- /dev/null +++ b/tests/common/udp_listener.rs @@ -0,0 +1,134 @@ +//! Captures UDP packets + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +/// UDP listener server that captures UDP messages (e.g. Jaeger spans) +/// for use in tests +use parking_lot::Mutex; +use tokio::{net::UdpSocket, select}; +use tokio_util::sync::CancellationToken; + +/// Maximum time to wait for a message, in seconds +const MAX_WAIT_TIME_SEC: u64 = 2; + +/// A UDP message received by this server +#[derive(Clone)] +pub struct Message { + data: Vec, +} + +impl std::fmt::Debug for Message { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Message({} bytes: {}", self.data.len(), self.to_string()) + } +} + +impl ToString for Message { + fn to_string(&self) -> String { + String::from_utf8_lossy(&self.data).to_string() + } +} + +pub struct UdpCapture { + socket_addr: std::net::SocketAddr, + join_handle: tokio::task::JoinHandle<()>, + token: CancellationToken, + messages: Arc>>, +} + +impl UdpCapture { + // Create a new server, listening for Udp messages + pub async fn new() -> Self { + // Bind to some address, letting the OS pick + let socket = UdpSocket::bind("127.0.0.1:0") + .await + .expect("bind udp listener"); + + let socket_addr = socket.local_addr().unwrap(); + + println!( + "UDP server listening at {} port {}", + socket_addr.ip(), + socket_addr.port() + ); + + let token = CancellationToken::new(); + let messages = Arc::new(Mutex::new(vec![])); + + // Spawns a background task that listens on the + let captured_messages = Arc::clone(&messages); + let captured_token = token.clone(); + + let join_handle = tokio::spawn(async move { + println!("Starting udp listen"); + loop { + let mut data = vec![0; 1024]; + + select! { + _ = captured_token.cancelled() => { + println!("Received shutdown request"); + return; + }, + res = socket.recv_from(&mut data) => { + let (sz, _origin) = res.expect("successful socket read"); + data.resize(sz, 0); + let mut messages = captured_messages.lock(); + messages.push(Message { data }); + } + } + } + }); + + Self { + socket_addr, + join_handle, + token, + messages, + } + } + + /// return the ip on which this server is listening + pub fn ip(&self) -> String { + self.socket_addr.ip().to_string() + } + + /// return the port on which this server is listening + pub fn port(&self) -> String { + self.socket_addr.port().to_string() + } + + /// stop and wait for succesful shutdown of this server + pub async fn stop(self) { + self.token.cancel(); + if let Err(e) = self.join_handle.await { + println!("Error waiting for shutdown of udp server: {}", e); + } + } + + // Return all messages this server has seen so far + pub fn messages(&self) -> Vec { + let messages = self.messages.lock(); + messages.clone() + } + + // wait for a message to appear that passes `pred` or the timeout expires + pub fn wait_for

(&self, mut pred: P) + where + P: FnMut(&Message) -> bool, + { + let end = Instant::now() + Duration::from_secs(MAX_WAIT_TIME_SEC); + + while Instant::now() < end { + if self.messages.lock().iter().any(|m| pred(m)) { + return; + } + } + panic!( + "Timeout expired before finding find messages that matches predicate. Messages:\n{:#?}", + self.messages.lock() + ) + } +} diff --git a/tests/end_to_end_cases/metrics.rs b/tests/end_to_end_cases/metrics.rs index 9417a24e62..d567ea2bb5 100644 --- a/tests/end_to_end_cases/metrics.rs +++ b/tests/end_to_end_cases/metrics.rs @@ -1,14 +1,11 @@ -use crate::common::server_fixture::ServerFixture; +use crate::common::server_fixture::{ServerFixture, TestConfig}; use crate::end_to_end_cases::scenario::Scenario; use test_helpers::assert_contains; #[tokio::test] pub async fn test_row_timestamp() { - let env = vec![( - "INFLUXDB_IOX_ROW_TIMESTAMP_METRICS".to_string(), - "system".to_string(), - )]; - let server_fixture = ServerFixture::create_single_use_with_env(env).await; + let test_config = TestConfig::new().with_env("INFLUXDB_IOX_ROW_TIMESTAMP_METRICS", "system"); + let server_fixture = ServerFixture::create_single_use_with_config(test_config).await; let mut management_client = server_fixture.management_client(); management_client.update_server_id(1).await.unwrap(); diff --git a/tests/end_to_end_cases/mod.rs b/tests/end_to_end_cases/mod.rs index 79557adb29..4082a2a443 100644 --- a/tests/end_to_end_cases/mod.rs +++ b/tests/end_to_end_cases/mod.rs @@ -17,6 +17,7 @@ pub mod scenario; mod sql_cli; mod storage_api; mod system_tables; +mod tracing; mod write_api; mod write_buffer; mod write_cli; diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index 03c889ee71..9ae199d0f2 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -35,7 +35,7 @@ use write_buffer::core::WriteBufferWriting; use write_buffer::kafka::test_utils::{kafka_sequencer_options, purge_kafka_topic}; use write_buffer::kafka::KafkaBufferProducer; -use crate::common::server_fixture::{ServerFixture, DEFAULT_SERVER_ID}; +use crate::common::server_fixture::{ServerFixture, TestConfig, DEFAULT_SERVER_ID}; type Error = Box; type Result = std::result::Result; @@ -588,12 +588,9 @@ pub async fn list_chunks(fixture: &ServerFixture, db_name: &str) -> Vec ServerFixture { let server_id = DEFAULT_SERVER_ID; - let env = vec![( - "INFLUXDB_IOX_WIPE_CATALOG_ON_ERROR".to_string(), - "no".to_string(), - )]; + let test_config = TestConfig::new().with_env("INFLUXDB_IOX_WIPE_CATALOG_ON_ERROR", "no"); - let fixture = ServerFixture::create_single_use_with_env(env).await; + let fixture = ServerFixture::create_single_use_with_config(test_config).await; fixture .management_client() .update_server_id(server_id) @@ -661,9 +658,9 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture { pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> ServerFixture { let server_id = DEFAULT_SERVER_ID; - let env = vec![("INFLUXDB_IOX_SKIP_REPLAY".to_string(), "no".to_string())]; + let test_config = TestConfig::new().with_env("INFLUXDB_IOX_SKIP_REPLAY", "no"); - let fixture = ServerFixture::create_single_use_with_env(env).await; + let fixture = ServerFixture::create_single_use_with_config(test_config).await; fixture .management_client() .update_server_id(server_id) diff --git a/tests/end_to_end_cases/storage_api.rs b/tests/end_to_end_cases/storage_api.rs index b7c10d31fe..91793dee12 100644 --- a/tests/end_to_end_cases/storage_api.rs +++ b/tests/end_to_end_cases/storage_api.rs @@ -20,11 +20,11 @@ use std::str; #[tokio::test] pub async fn test() { - let storage_fixture = ServerFixture::create_shared().await; + let server_fixture = ServerFixture::create_shared().await; - let influxdb2 = storage_fixture.influxdb2_client(); - let mut storage_client = StorageClient::new(storage_fixture.grpc_channel()); - let mut management_client = storage_fixture.management_client(); + let influxdb2 = server_fixture.influxdb2_client(); + let mut storage_client = StorageClient::new(server_fixture.grpc_channel()); + let mut management_client = server_fixture.management_client(); let scenario = Scenario::new(); scenario.create_database(&mut management_client).await; diff --git a/tests/end_to_end_cases/tracing.rs b/tests/end_to_end_cases/tracing.rs new file mode 100644 index 0000000000..6e477c0220 --- /dev/null +++ b/tests/end_to_end_cases/tracing.rs @@ -0,0 +1,122 @@ +use super::scenario::{collect_query, Scenario}; +use crate::common::{ + server_fixture::{ServerFixture, TestConfig}, + udp_listener::UdpCapture, +}; +use futures::TryStreamExt; +use generated_types::{storage_client::StorageClient, ReadFilterRequest}; + +// cfg at this level so IDE can resolve code even when jaeger feature is not active +#[cfg(feature = "jaeger")] +fn run_test() -> bool { + true +} + +#[cfg(not(feature = "jaeger"))] +fn run_test() -> bool { + println!("Skipping test because jaeger feature not enabled"); + false +} + +async fn setup() -> (UdpCapture, ServerFixture) { + let udp_capture = UdpCapture::new().await; + + let test_config = TestConfig::new() + .with_env("TRACES_EXPORTER", "jaeger") + .with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip()) + .with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port()) + .with_client_header("uber-trace-id", "4:3:2:1"); + + let server_fixture = ServerFixture::create_single_use_with_config(test_config).await; + + let mut management_client = server_fixture.management_client(); + + management_client.update_server_id(1).await.unwrap(); + server_fixture.wait_server_initialized().await; + + (udp_capture, server_fixture) +} + +#[tokio::test] +pub async fn test_tracing_sql() { + if !run_test() { + return; + } + + let (udp_capture, server_fixture) = setup().await; + + let scenario = Scenario::new(); + scenario + .create_database(&mut server_fixture.management_client()) + .await; + scenario.load_data(&server_fixture.influxdb2_client()).await; + + // run a query, ensure we get traces + let sql_query = "select * from cpu_load_short"; + let mut client = server_fixture.flight_client(); + + let query_results = client + .perform_query(scenario.database_name(), sql_query) + .await + .unwrap(); + + collect_query(query_results).await; + + // "shallow" packet inspection and verify the UDP server got + // something that had some expected results (maybe we could + // eventually verify the payload here too) + udp_capture.wait_for(|m| m.to_string().contains("IOxReadFilterNode")); + + // debugging assistance + //println!("Traces received (1):\n\n{:#?}", udp_capture.messages()); + + // wait for the UDP server to shutdown + udp_capture.stop().await +} + +#[tokio::test] +pub async fn test_tracing_storage_api() { + if !run_test() { + return; + } + + let (udp_capture, server_fixture) = setup().await; + + let scenario = Scenario::new(); + scenario + .create_database(&mut server_fixture.management_client()) + .await; + scenario.load_data(&server_fixture.influxdb2_client()).await; + + // run a query via gRPC, ensure we get traces + let read_source = scenario.read_source(); + let range = scenario.timestamp_range(); + let predicate = None; + let read_filter_request = tonic::Request::new(ReadFilterRequest { + read_source, + range, + predicate, + }); + let mut storage_client = StorageClient::new(server_fixture.grpc_channel()); + let read_response = storage_client + .read_filter(read_filter_request) + .await + .unwrap(); + + read_response + .into_inner() + .try_collect::>() + .await + .unwrap(); + + // "shallow" packet inspection and verify the UDP server got + // something that had some expected results (maybe we could + // eventually verify the payload here too) + udp_capture.wait_for(|m| m.to_string().contains("IOxReadFilterNode")); + + // debugging assistance + //println!("Traces received (2):\n\n{:#?}", udp_capture.messages()); + + // wait for the UDP server to shutdown + udp_capture.stop().await +}