test(ingester): Integration test for RPC write trace context inheritrance

pull/24376/head
Fraser Savage 2023-07-12 15:48:41 +01:00
parent 458b1bf1a6
commit 729851be58
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
7 changed files with 100 additions and 7 deletions

1
Cargo.lock generated
View File

@ -2712,6 +2712,7 @@ dependencies = [
"tokio",
"tokio-util",
"tonic",
"trace",
"workspace-hack",
]

View File

@ -40,8 +40,14 @@ async fn init(
let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await;
// Write the test data
ctx.write_lp(TEST_NAMESPACE, lp, PartitionKey::from(PARTITION_KEY), 42)
.await;
ctx.write_lp(
TEST_NAMESPACE,
lp,
PartitionKey::from(PARTITION_KEY),
42,
None,
)
.await;
let table_id = ctx.table_id(TEST_NAMESPACE, "bananas").await;

View File

@ -36,8 +36,14 @@ async fn init(lp: impl AsRef<str>) -> (TestContext<impl IngesterRpcInterface>, D
let ns = ctx.ensure_namespace(TEST_NAMESPACE, None).await;
// Perform a write to drive table / schema population in the catalog.
ctx.write_lp(TEST_NAMESPACE, lp, PartitionKey::from(PARTITION_KEY), 42)
.await;
ctx.write_lp(
TEST_NAMESPACE,
lp,
PartitionKey::from(PARTITION_KEY),
42,
None,
)
.await;
// Construct the write request once, and reuse it for each iteration.
let batches = lines_to_batches(lp, 0).unwrap();

View File

@ -18,6 +18,7 @@ async fn write_query() {
"bananas greatness=\"unbounded\" 10",
partition_key.clone(),
0,
None,
)
.await;
@ -27,6 +28,7 @@ async fn write_query() {
"cpu bar=2 20\ncpu bar=3 30",
partition_key.clone(),
7,
None,
)
.await;
@ -37,6 +39,7 @@ async fn write_query() {
"bananas count=42 200",
partition_key.clone(),
42,
None,
)
.await;
@ -100,6 +103,7 @@ async fn write_query_projection() {
"bananas greatness=\"unbounded\",level=42 10",
partition_key.clone(),
0,
None,
)
.await;
@ -109,6 +113,7 @@ async fn write_query_projection() {
"bananas count=42,level=4242 200",
partition_key.clone(),
42,
None,
)
.await;

View File

@ -9,6 +9,7 @@ use metric::{
};
use parquet_file::ParquetFilePath;
use std::{sync::Arc, time::Duration};
use trace::{ctx::SpanContext, RingBufferTraceCollector};
// Write data to an ingester through the RPC interface and persist the data.
#[tokio::test]
@ -23,6 +24,7 @@ async fn write_persist() {
r#"bananas count=42,greatness="inf" 200"#,
partition_key.clone(),
42,
None,
)
.await;
@ -191,6 +193,7 @@ async fn wal_replay() {
"bananas greatness=\"unbounded\" 10",
partition_key.clone(),
0,
None,
)
.await;
@ -200,6 +203,7 @@ async fn wal_replay() {
"cpu bar=2 20\ncpu bar=3 30",
partition_key.clone(),
7,
None,
)
.await;
@ -210,6 +214,7 @@ async fn wal_replay() {
"bananas count=42 200",
partition_key.clone(),
42,
None,
)
.await;
@ -283,6 +288,7 @@ async fn graceful_shutdown() {
"bananas greatness=\"unbounded\" 10",
partition_key.clone(),
0,
None,
)
.await;
@ -295,6 +301,7 @@ async fn graceful_shutdown() {
"cpu bar=2 20\ncpu bar=3 30",
partition_key.clone(),
7,
None,
)
.await;
@ -305,6 +312,7 @@ async fn graceful_shutdown() {
"bananas count=42 200",
partition_key.clone(),
42,
None,
)
.await;
@ -358,3 +366,61 @@ async fn graceful_shutdown() {
.unwrap();
assert_eq!(parquet_files.len(), 3);
}
#[tokio::test]
async fn write_tracing() {
let namespace_name = "write_tracing_test_namespace";
let mut ctx = TestContextBuilder::default().build().await;
let ns = ctx.ensure_namespace(namespace_name, None).await;
let trace_collector = Arc::new(RingBufferTraceCollector::new(5));
let span_ctx = SpanContext::new(Arc::new(Arc::clone(&trace_collector)));
let request_span = span_ctx.child("write request span");
let partition_key = PartitionKey::from("1970-01-01");
ctx.write_lp(
namespace_name,
r#"bananas count=42,greatness="inf" 200"#,
partition_key.clone(),
42,
Some(request_span.ctx.clone()),
)
.await;
// Perform a query to validate the actual data buffered.
let table_id = ctx.table_id(namespace_name, "bananas").await.get();
let data: Vec<_> = ctx
.query(IngesterQueryRequest {
namespace_id: ns.id.get(),
table_id,
columns: vec![],
predicate: None,
})
.await
.expect("query request failed");
let expected = vec![
"+-------+-----------+--------------------------------+",
"| count | greatness | time |",
"+-------+-----------+--------------------------------+",
"| 42.0 | inf | 1970-01-01T00:00:00.000000200Z |",
"+-------+-----------+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &data);
// Check the spans emitted for the write request align capture what is expected
let spans = trace_collector.spans();
assert_matches!(spans.as_slice(), [span3, span2, span1, handler_span] => {
// Check that the DML handlers are hit, and that they inherit from the
// handler span, which in turn inherits from the request
assert_eq!(handler_span.name, "ingester write");
assert_eq!(span1.name, "write_apply");
assert_eq!(span2.name, "wal");
assert_eq!(span3.name, "buffer");
assert_eq!(handler_span.ctx.parent_span_id, Some(request_span.ctx.span_id));
assert_eq!(span1.ctx.parent_span_id, Some(handler_span.ctx.span_id));
assert_eq!(span2.ctx.parent_span_id, Some(handler_span.ctx.span_id));
assert_eq!(span3.ctx.parent_span_id, Some(handler_span.ctx.span_id));
})
}

View File

@ -32,4 +32,5 @@ test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = "0.7.8"
tonic = { workspace = true }
trace = { version = "0.1.0", path = "../trace" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

View File

@ -49,6 +49,7 @@ use test_helpers::timeout::FutureTimeout;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tonic::Request;
use trace::ctx::SpanContext;
/// The default max persist queue depth - configurable with
/// [`TestContextBuilder::with_max_persist_queue_depth()`].
@ -243,6 +244,7 @@ where
lp: &str,
partition_key: PartitionKey,
sequence_number: u64,
span_ctx: Option<SpanContext>,
) {
// Resolve the namespace ID needed to construct the DML op
let namespace_id = self.namespace_id(namespace).await;
@ -313,12 +315,18 @@ where
),
);
let mut req = tonic::Request::new(WriteRequest {
payload: Some(encode_write(namespace_id.get(), &op)),
});
// Mock out the trace extraction middleware by inserting the given
// span context straight into the requests extensions
span_ctx.map(|span_ctx| req.extensions_mut().insert(span_ctx));
self.ingester
.rpc()
.write_service()
.write(tonic::Request::new(WriteRequest {
payload: Some(encode_write(namespace_id.get(), &op)),
}))
.write(req)
.await
.unwrap();