From f6ad920f313503bf7248dc61c10c26545f7a5b4b Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Wed, 21 Jun 2023 15:58:16 +0100 Subject: [PATCH] refactor(ingester): Remove `set_span_context()` from `IngestOp` --- ingester/src/buffer_tree/namespace.rs | 1 + ingester/src/buffer_tree/root.rs | 19 +++++++++++++++++++ ingester/src/dml_payload/ingest_op.rs | 7 ------- ingester/src/dml_sink/instrumentation.rs | 1 + ingester/src/dml_sink/tracing.rs | 12 ++++-------- ingester/src/init/wal_replay.rs | 3 +++ ingester/src/persist/handle.rs | 1 + ingester/src/persist/mod.rs | 1 + ingester/src/test_util.rs | 6 ++++-- ingester/src/wal/wal_sink.rs | 1 + 10 files changed, 35 insertions(+), 17 deletions(-) diff --git a/ingester/src/buffer_tree/namespace.rs b/ingester/src/buffer_tree/namespace.rs index a59338ed07..aa599b3586 100644 --- a/ingester/src/buffer_tree/namespace.rs +++ b/ingester/src/buffer_tree/namespace.rs @@ -271,6 +271,7 @@ mod tests { r#"{},city=Medford day="sun",temp=55 22"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("buffer op should succeed"); diff --git a/ingester/src/buffer_tree/root.rs b/ingester/src/buffer_tree/root.rs index 3ae0656f8c..a87b05c9bc 100644 --- a/ingester/src/buffer_tree/root.rs +++ b/ingester/src/buffer_tree/root.rs @@ -294,6 +294,7 @@ mod tests { r#"{},city=Madrid day="sun",temp=55 22"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("buffer op should succeed"); @@ -392,6 +393,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, )], want = [ "+----------+------+-------------------------------+", @@ -427,6 +429,7 @@ mod tests { r#"{},region=Madrid temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ), make_write_op( &PartitionKey::from("p2"), @@ -438,6 +441,7 @@ mod tests { r#"{},region=Asturias temp=25 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ) ], want = [ @@ -477,6 +481,7 @@ mod tests { r#"{},region=Madrid temp=25 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ), make_write_op( &PartitionKey::from("p2"), @@ -488,6 +493,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ) ], want = [ @@ -525,6 +531,7 @@ mod tests { r#"{},region=Madrid temp=25 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ), make_write_op( &PartitionKey::from("p2"), @@ -536,6 +543,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ) ], want = [ @@ -568,6 +576,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ), make_write_op( &PartitionKey::from("p1"), @@ -579,6 +588,7 @@ mod tests { r#"{},region=Asturias temp=12 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ) ], want = [ @@ -635,6 +645,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to write initial data"); @@ -651,6 +662,7 @@ mod tests { r#"{},region=Asturias temp=12 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to overwrite data"); @@ -734,6 +746,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to write initial data"); @@ -751,6 +764,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to write initial data"); @@ -765,6 +779,7 @@ mod tests { TABLE2_ID, 0, &format!(r#"{},region=Asturias temp=35 4242424242"#, TABLE2_NAME), + None, ))) .await .expect("failed to write initial data"); @@ -823,6 +838,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to write data"); @@ -899,6 +915,7 @@ mod tests { r#"{},region=Madrid temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to write initial data"); @@ -923,6 +940,7 @@ mod tests { r#"{},region=Asturias temp=20 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to perform concurrent write to new partition"); @@ -939,6 +957,7 @@ mod tests { r#"{},region=Murcia temp=30 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ))) .await .expect("failed to perform concurrent write to existing partition"); diff --git a/ingester/src/dml_payload/ingest_op.rs b/ingester/src/dml_payload/ingest_op.rs index ccfed30d29..0eb2243fc8 100644 --- a/ingester/src/dml_payload/ingest_op.rs +++ b/ingester/src/dml_payload/ingest_op.rs @@ -25,13 +25,6 @@ impl IngestOp { Self::Write(w) => w.span_context.as_ref(), } } - - /// Sets the tracing context associated with the [`IngestOp`] - pub fn set_span_context(&mut self, ctx: SpanContext) { - match self { - Self::Write(w) => w.span_context = Some(ctx), - } - } } /// A decoded representation of the data contained by an RPC write diff --git a/ingester/src/dml_sink/instrumentation.rs b/ingester/src/dml_sink/instrumentation.rs index 9ac2e953b4..1a5b4c3697 100644 --- a/ingester/src/dml_sink/instrumentation.rs +++ b/ingester/src/dml_sink/instrumentation.rs @@ -129,6 +129,7 @@ mod tests { TABLE_ID, 42, "banana-report,tag=1 v=2 42424242", + None, )); // Call the decorator and assert the return value diff --git a/ingester/src/dml_sink/tracing.rs b/ingester/src/dml_sink/tracing.rs index ab16de3009..c4172a0dbf 100644 --- a/ingester/src/dml_sink/tracing.rs +++ b/ingester/src/dml_sink/tracing.rs @@ -119,18 +119,16 @@ mod tests { let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); let span = SpanContext::new(Arc::clone(&traces)); - let mut op = IngestOp::Write(make_write_op( + let op = IngestOp::Write(make_write_op( &PARTITION_KEY, NAMESPACE_ID, TABLE_NAME, TABLE_ID, 42, "banana-report,tag=1 v=2 42424242", + Some(span), )); - // Populate the metadata with a span context. - op.set_span_context(span); - // Drive the trace wrapper DmlSinkTracing::new(mock, "bananas") .apply(op) @@ -149,18 +147,16 @@ mod tests { let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); let span = SpanContext::new(Arc::clone(&traces)); - let mut op = IngestOp::Write(make_write_op( + let op = IngestOp::Write(make_write_op( &PARTITION_KEY, NAMESPACE_ID, TABLE_NAME, TABLE_ID, 42, "banana-report,tag=1 v=2 42424242", + Some(span), )); - // Populate the metadata with a span context. - op.set_span_context(span); - // Drive the trace wrapper let got = DmlSinkTracing::new(mock, "bananas") .apply(op) diff --git a/ingester/src/init/wal_replay.rs b/ingester/src/init/wal_replay.rs index cc000a4992..fb9d00d5dc 100644 --- a/ingester/src/init/wal_replay.rs +++ b/ingester/src/init/wal_replay.rs @@ -312,6 +312,7 @@ mod tests { r#"{},region=Madrid temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ); let op2 = make_write_op( &ARBITRARY_PARTITION_KEY, @@ -323,6 +324,7 @@ mod tests { r#"{},region=Asturias temp=25 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ); let op3 = make_write_op( &ARBITRARY_PARTITION_KEY, @@ -335,6 +337,7 @@ mod tests { r#"{},region=Asturias temp=15 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ); // The write portion of this test. diff --git a/ingester/src/persist/handle.rs b/ingester/src/persist/handle.rs index f00fa24284..28dd8c0a42 100644 --- a/ingester/src/persist/handle.rs +++ b/ingester/src/persist/handle.rs @@ -530,6 +530,7 @@ mod tests { ARBITRARY_TABLE_ID, 0, &format!("{},good=yes level=1000 4242424242", &*ARBITRARY_TABLE_NAME), + None, ))) .await .expect("failed to write partition test dataa"); diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index 285d429ef2..237c84b889 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -83,6 +83,7 @@ mod tests { r#"{},region=Asturias temp=35 4242424242"#, &*ARBITRARY_TABLE_NAME ), + None, ); let mut repos = catalog diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 000d47a996..ba6c66a8a5 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -5,6 +5,7 @@ use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace}; use lazy_static::lazy_static; use mutable_batch_lp::lines_to_batches; use schema::Projection; +use trace::ctx::SpanContext; use crate::{ buffer_tree::{ @@ -253,7 +254,7 @@ macro_rules! make_partition_stream { }}; } -/// Construct a [`DmlWrite`] with the specified parameters, for LP that contains +/// Construct a [`WriteOperation`] with the specified parameters, for LP that contains /// a single table identified by `table_id`. /// /// # Panics @@ -267,6 +268,7 @@ pub(crate) fn make_write_op( table_id: TableId, sequence_number: i64, lines: &str, + span_ctx: Option, ) -> WriteOperation { let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP"); assert_eq!( @@ -293,7 +295,7 @@ pub(crate) fn make_write_op( }) .collect(); - WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None) + WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), span_ctx) } pub(crate) async fn populate_catalog( diff --git a/ingester/src/wal/wal_sink.rs b/ingester/src/wal/wal_sink.rs index 4a568eaa1e..e801e89acf 100644 --- a/ingester/src/wal/wal_sink.rs +++ b/ingester/src/wal/wal_sink.rs @@ -285,6 +285,7 @@ mod tests { TABLE_ID, 42, r#"bananas,region=Madrid temp=35 4242424242"#, + None, ); let wal = Wal::new(dir.path())