diff --git a/Cargo.lock b/Cargo.lock index f4fdcc65ac..db4581051b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2712,6 +2712,7 @@ dependencies = [ "tokio", "tokio-util", "tonic", + "trace", "workspace-hack", ] diff --git a/ingester/benches/query.rs b/ingester/benches/query.rs index 7bbcfcb630..2c9fb5e6f6 100644 --- a/ingester/benches/query.rs +++ b/ingester/benches/query.rs @@ -40,8 +40,14 @@ async fn init( let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await; // Write the test data - ctx.write_lp(TEST_NAMESPACE, lp, PartitionKey::from(PARTITION_KEY), 42) - .await; + ctx.write_lp( + TEST_NAMESPACE, + lp, + PartitionKey::from(PARTITION_KEY), + 42, + None, + ) + .await; let table_id = ctx.table_id(TEST_NAMESPACE, "bananas").await; diff --git a/ingester/benches/write.rs b/ingester/benches/write.rs index 8a06587453..ac310d58c5 100644 --- a/ingester/benches/write.rs +++ b/ingester/benches/write.rs @@ -36,8 +36,14 @@ async fn init(lp: impl AsRef) -> (TestContext, D let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await; // Perform a write to drive table / schema population in the catalog. - ctx.write_lp(TEST_NAMESPACE, lp, PartitionKey::from(PARTITION_KEY), 42) - .await; + ctx.write_lp( + TEST_NAMESPACE, + lp, + PartitionKey::from(PARTITION_KEY), + 42, + None, + ) + .await; // Construct the write request once, and reuse it for each iteration. let batches = lines_to_batches(lp, 0).unwrap(); diff --git a/ingester/tests/query.rs b/ingester/tests/query.rs index e0f1b32501..dcf3dc3faf 100644 --- a/ingester/tests/query.rs +++ b/ingester/tests/query.rs @@ -18,6 +18,7 @@ async fn write_query() { "bananas greatness=\"unbounded\" 10", partition_key.clone(), 0, + None, ) .await; @@ -27,6 +28,7 @@ async fn write_query() { "cpu bar=2 20\ncpu bar=3 30", partition_key.clone(), 7, + None, ) .await; @@ -37,6 +39,7 @@ async fn write_query() { "bananas count=42 200", partition_key.clone(), 42, + None, ) .await; @@ -100,6 +103,7 @@ async fn write_query_projection() { "bananas greatness=\"unbounded\",level=42 10", partition_key.clone(), 0, + None, ) .await; @@ -109,6 +113,7 @@ async fn write_query_projection() { "bananas count=42,level=4242 200", partition_key.clone(), 42, + None, ) .await; diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index 2cb48fd1ee..d2c7b3c639 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -9,6 +9,7 @@ use metric::{ }; use parquet_file::ParquetFilePath; use std::{sync::Arc, time::Duration}; +use trace::{ctx::SpanContext, RingBufferTraceCollector}; // Write data to an ingester through the RPC interface and persist the data. #[tokio::test] @@ -23,6 +24,7 @@ async fn write_persist() { r#"bananas count=42,greatness="inf" 200"#, partition_key.clone(), 42, + None, ) .await; @@ -191,6 +193,7 @@ async fn wal_replay() { "bananas greatness=\"unbounded\" 10", partition_key.clone(), 0, + None, ) .await; @@ -200,6 +203,7 @@ async fn wal_replay() { "cpu bar=2 20\ncpu bar=3 30", partition_key.clone(), 7, + None, ) .await; @@ -210,6 +214,7 @@ async fn wal_replay() { "bananas count=42 200", partition_key.clone(), 42, + None, ) .await; @@ -283,6 +288,7 @@ async fn graceful_shutdown() { "bananas greatness=\"unbounded\" 10", partition_key.clone(), 0, + None, ) .await; @@ -295,6 +301,7 @@ async fn graceful_shutdown() { "cpu bar=2 20\ncpu bar=3 30", partition_key.clone(), 7, + None, ) .await; @@ -305,6 +312,7 @@ async fn graceful_shutdown() { "bananas count=42 200", partition_key.clone(), 42, + None, ) .await; @@ -358,3 +366,61 @@ async fn graceful_shutdown() { .unwrap(); assert_eq!(parquet_files.len(), 3); } + +#[tokio::test] +async fn write_tracing() { + let namespace_name = "write_tracing_test_namespace"; + let mut ctx = TestContextBuilder::default().build().await; + let ns = ctx.ensure_namespace(namespace_name, None).await; + + let trace_collector = Arc::new(RingBufferTraceCollector::new(5)); + let span_ctx = SpanContext::new(Arc::new(Arc::clone(&trace_collector))); + let request_span = span_ctx.child("write request span"); + + let partition_key = PartitionKey::from("1970-01-01"); + ctx.write_lp( + namespace_name, + r#"bananas count=42,greatness="inf" 200"#, + partition_key.clone(), + 42, + Some(request_span.ctx.clone()), + ) + .await; + + // Perform a query to validate the actual data buffered. + let table_id = ctx.table_id(namespace_name, "bananas").await.get(); + let data: Vec<_> = ctx + .query(IngesterQueryRequest { + namespace_id: ns.id.get(), + table_id, + columns: vec![], + predicate: None, + }) + .await + .expect("query request failed"); + + let expected = vec![ + "+-------+-----------+--------------------------------+", + "| count | greatness | time |", + "+-------+-----------+--------------------------------+", + "| 42.0 | inf | 1970-01-01T00:00:00.000000200Z |", + "+-------+-----------+--------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &data); + + // Check the spans emitted for the write request align capture what is expected + let spans = trace_collector.spans(); + assert_matches!(spans.as_slice(), [span3, span2, span1, handler_span] => { + // Check that the DML handlers are hit, and that they inherit from the + // handler span, which in turn inherits from the request + assert_eq!(handler_span.name, "ingester write"); + assert_eq!(span1.name, "write_apply"); + assert_eq!(span2.name, "wal"); + assert_eq!(span3.name, "buffer"); + + assert_eq!(handler_span.ctx.parent_span_id, Some(request_span.ctx.span_id)); + assert_eq!(span1.ctx.parent_span_id, Some(handler_span.ctx.span_id)); + assert_eq!(span2.ctx.parent_span_id, Some(handler_span.ctx.span_id)); + assert_eq!(span3.ctx.parent_span_id, Some(handler_span.ctx.span_id)); + }) +} diff --git a/ingester_test_ctx/Cargo.toml b/ingester_test_ctx/Cargo.toml index 71371e9dca..c75fcd8d30 100644 --- a/ingester_test_ctx/Cargo.toml +++ b/ingester_test_ctx/Cargo.toml @@ -32,4 +32,5 @@ test_helpers = { path = "../test_helpers", features = ["future_timeout"] } tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } tokio-util = "0.7.8" tonic = { workspace = true } +trace = { version = "0.1.0", path = "../trace" } workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/ingester_test_ctx/src/lib.rs b/ingester_test_ctx/src/lib.rs index ceeca0df00..a41448a213 100644 --- a/ingester_test_ctx/src/lib.rs +++ b/ingester_test_ctx/src/lib.rs @@ -49,6 +49,7 @@ use test_helpers::timeout::FutureTimeout; use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tonic::Request; +use trace::ctx::SpanContext; /// The default max persist queue depth - configurable with /// [`TestContextBuilder::with_max_persist_queue_depth()`]. @@ -243,6 +244,7 @@ where lp: &str, partition_key: PartitionKey, sequence_number: u64, + span_ctx: Option, ) { // Resolve the namespace ID needed to construct the DML op let namespace_id = self.namespace_id(namespace).await; @@ -313,12 +315,18 @@ where ), ); + let mut req = tonic::Request::new(WriteRequest { + payload: Some(encode_write(namespace_id.get(), &op)), + }); + + // Mock out the trace extraction middleware by inserting the given + // span context straight into the requests extensions + span_ctx.map(|span_ctx| req.extensions_mut().insert(span_ctx)); + self.ingester .rpc() .write_service() - .write(tonic::Request::new(WriteRequest { - payload: Some(encode_write(namespace_id.get(), &op)), - })) + .write(req) .await .unwrap();