refactor(ingester): Remove `set_span_context()` from `IngestOp`
parent
35c5017410
commit
f6ad920f31
|
@ -271,6 +271,7 @@ mod tests {
|
|||
r#"{},city=Medford day="sun",temp=55 22"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("buffer op should succeed");
|
||||
|
|
|
@ -294,6 +294,7 @@ mod tests {
|
|||
r#"{},city=Madrid day="sun",temp=55 22"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("buffer op should succeed");
|
||||
|
@ -392,6 +393,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)],
|
||||
want = [
|
||||
"+----------+------+-------------------------------+",
|
||||
|
@ -427,6 +429,7 @@ mod tests {
|
|||
r#"{},region=Madrid temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
|
@ -438,6 +441,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=25 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)
|
||||
],
|
||||
want = [
|
||||
|
@ -477,6 +481,7 @@ mod tests {
|
|||
r#"{},region=Madrid temp=25 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
|
@ -488,6 +493,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)
|
||||
],
|
||||
want = [
|
||||
|
@ -525,6 +531,7 @@ mod tests {
|
|||
r#"{},region=Madrid temp=25 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p2"),
|
||||
|
@ -536,6 +543,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)
|
||||
],
|
||||
want = [
|
||||
|
@ -568,6 +576,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
),
|
||||
make_write_op(
|
||||
&PartitionKey::from("p1"),
|
||||
|
@ -579,6 +588,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=12 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)
|
||||
],
|
||||
want = [
|
||||
|
@ -635,6 +645,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
@ -651,6 +662,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=12 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to overwrite data");
|
||||
|
@ -734,6 +746,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
@ -751,6 +764,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
@ -765,6 +779,7 @@ mod tests {
|
|||
TABLE2_ID,
|
||||
0,
|
||||
&format!(r#"{},region=Asturias temp=35 4242424242"#, TABLE2_NAME),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
@ -823,6 +838,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write data");
|
||||
|
@ -899,6 +915,7 @@ mod tests {
|
|||
r#"{},region=Madrid temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write initial data");
|
||||
|
@ -923,6 +940,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=20 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to perform concurrent write to new partition");
|
||||
|
@ -939,6 +957,7 @@ mod tests {
|
|||
r#"{},region=Murcia temp=30 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to perform concurrent write to existing partition");
|
||||
|
|
|
@ -25,13 +25,6 @@ impl IngestOp {
|
|||
Self::Write(w) => w.span_context.as_ref(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the tracing context associated with the [`IngestOp`]
|
||||
pub fn set_span_context(&mut self, ctx: SpanContext) {
|
||||
match self {
|
||||
Self::Write(w) => w.span_context = Some(ctx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A decoded representation of the data contained by an RPC write
|
||||
|
|
|
@ -129,6 +129,7 @@ mod tests {
|
|||
TABLE_ID,
|
||||
42,
|
||||
"banana-report,tag=1 v=2 42424242",
|
||||
None,
|
||||
));
|
||||
|
||||
// Call the decorator and assert the return value
|
||||
|
|
|
@ -119,18 +119,16 @@ mod tests {
|
|||
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let mut op = IngestOp::Write(make_write_op(
|
||||
let op = IngestOp::Write(make_write_op(
|
||||
&PARTITION_KEY,
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
42,
|
||||
"banana-report,tag=1 v=2 42424242",
|
||||
Some(span),
|
||||
));
|
||||
|
||||
// Populate the metadata with a span context.
|
||||
op.set_span_context(span);
|
||||
|
||||
// Drive the trace wrapper
|
||||
DmlSinkTracing::new(mock, "bananas")
|
||||
.apply(op)
|
||||
|
@ -149,18 +147,16 @@ mod tests {
|
|||
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let mut op = IngestOp::Write(make_write_op(
|
||||
let op = IngestOp::Write(make_write_op(
|
||||
&PARTITION_KEY,
|
||||
NAMESPACE_ID,
|
||||
TABLE_NAME,
|
||||
TABLE_ID,
|
||||
42,
|
||||
"banana-report,tag=1 v=2 42424242",
|
||||
Some(span),
|
||||
));
|
||||
|
||||
// Populate the metadata with a span context.
|
||||
op.set_span_context(span);
|
||||
|
||||
// Drive the trace wrapper
|
||||
let got = DmlSinkTracing::new(mock, "bananas")
|
||||
.apply(op)
|
||||
|
|
|
@ -312,6 +312,7 @@ mod tests {
|
|||
r#"{},region=Madrid temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
);
|
||||
let op2 = make_write_op(
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
|
@ -323,6 +324,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=25 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
);
|
||||
let op3 = make_write_op(
|
||||
&ARBITRARY_PARTITION_KEY,
|
||||
|
@ -335,6 +337,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=15 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
);
|
||||
|
||||
// The write portion of this test.
|
||||
|
|
|
@ -530,6 +530,7 @@ mod tests {
|
|||
ARBITRARY_TABLE_ID,
|
||||
0,
|
||||
&format!("{},good=yes level=1000 4242424242", &*ARBITRARY_TABLE_NAME),
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
.expect("failed to write partition test dataa");
|
||||
|
|
|
@ -83,6 +83,7 @@ mod tests {
|
|||
r#"{},region=Asturias temp=35 4242424242"#,
|
||||
&*ARBITRARY_TABLE_NAME
|
||||
),
|
||||
None,
|
||||
);
|
||||
|
||||
let mut repos = catalog
|
||||
|
|
|
@ -5,6 +5,7 @@ use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
|
|||
use lazy_static::lazy_static;
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use schema::Projection;
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
use crate::{
|
||||
buffer_tree::{
|
||||
|
@ -253,7 +254,7 @@ macro_rules! make_partition_stream {
|
|||
}};
|
||||
}
|
||||
|
||||
/// Construct a [`DmlWrite`] with the specified parameters, for LP that contains
|
||||
/// Construct a [`WriteOperation`] with the specified parameters, for LP that contains
|
||||
/// a single table identified by `table_id`.
|
||||
///
|
||||
/// # Panics
|
||||
|
@ -267,6 +268,7 @@ pub(crate) fn make_write_op(
|
|||
table_id: TableId,
|
||||
sequence_number: i64,
|
||||
lines: &str,
|
||||
span_ctx: Option<SpanContext>,
|
||||
) -> WriteOperation {
|
||||
let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP");
|
||||
assert_eq!(
|
||||
|
@ -293,7 +295,7 @@ pub(crate) fn make_write_op(
|
|||
})
|
||||
.collect();
|
||||
|
||||
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None)
|
||||
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), span_ctx)
|
||||
}
|
||||
|
||||
pub(crate) async fn populate_catalog(
|
||||
|
|
|
@ -285,6 +285,7 @@ mod tests {
|
|||
TABLE_ID,
|
||||
42,
|
||||
r#"bananas,region=Madrid temp=35 4242424242"#,
|
||||
None,
|
||||
);
|
||||
|
||||
let wal = Wal::new(dir.path())
|
||||
|
|
Loading…
Reference in New Issue