refactor(ingester): Replace test_util `DmlWrite` with `WriteOperation`

This change replaces the test_util equality and write generation code
to use the new `IngestOp::Write(WriteOperation)` type, removing many
pointless conversions in tests.
pull/24376/head
Fraser Savage 2023-06-16 13:16:10 +01:00
parent 8908f5fc96
commit a3a4145774
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
10 changed files with 200 additions and 228 deletions

View File

@ -164,7 +164,7 @@ where
table_data
.buffer_table_write(
partitioned_data.sequence_number(),
partitioned_data.data(),
partitioned_data.into_data(),
partition_key.clone(),
)
.await?;
@ -214,7 +214,6 @@ where
mod tests {
use std::sync::Arc;
use dml::DmlOperation;
use metric::{Attributes, Metric};
use super::*;
@ -262,20 +261,17 @@ mod tests {
assert!(ns.table(ARBITRARY_TABLE_ID).is_none());
// Write some test data
ns.apply(
DmlOperation::Write(make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},city=Medford day="sun",temp=55 22"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
ns.apply(IngestOp::Write(make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},city=Medford day="sun",temp=55 22"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("buffer op should succeed");

View File

@ -232,7 +232,6 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{PartitionId, PartitionKey};
use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
use dml::DmlOperation;
use futures::StreamExt;
use metric::{Attributes, Metric};
@ -285,20 +284,17 @@ mod tests {
assert!(ns.table(ARBITRARY_TABLE_ID).is_none());
// Write some test data
ns.apply(
DmlOperation::Write(make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},city=Madrid day="sun",temp=55 22"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
ns.apply(IngestOp::Write(make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},city=Madrid day="sun",temp=55 22"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("buffer op should succeed");
@ -329,7 +325,7 @@ mod tests {
(
$name:ident,
partitions = [$($partition:expr), +], // The set of PartitionData for the mock partition provider
writes = [$($write:expr), *], // The set of DmlWrite to apply()
writes = [$($write:expr), *], // The set of WriteOperation to apply()
want = $want:expr // The expected results of querying ARBITRARY_NAMESPACE_ID and ARBITRARY_TABLE_ID
) => {
paste::paste! {
@ -352,9 +348,9 @@ mod tests {
Arc::new(metric::Registry::default()),
);
// Write the provided DmlWrites
// Write the provided WriteOperation
$(
buf.apply(DmlOperation::Write($write).into())
buf.apply(IngestOp::Write($write).into())
.await
.expect("failed to perform write");
)*
@ -629,39 +625,33 @@ mod tests {
);
// Write data to partition p1, in the arbitrary table
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to write initial data");
// Write a duplicate record with the same series key & timestamp, but a
// different temp value.
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p2"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
1,
&format!(
r#"{},region=Asturias temp=12 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p2"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
1,
&format!(
r#"{},region=Asturias temp=12 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to overwrite data");
@ -734,57 +724,48 @@ mod tests {
assert_eq!(buf.partitions().count(), 0);
// Write data to partition p1, in the arbitrary table
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to write initial data");
assert_eq!(buf.partitions().count(), 1);
// Write data to partition p2, in the arbitrary table
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p2"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p2"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to write initial data");
assert_eq!(buf.partitions().count(), 2);
// Write data to partition p3, in the second table
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p3"),
ARBITRARY_NAMESPACE_ID,
TABLE2_NAME,
TABLE2_ID,
0,
&format!(r#"{},region=Asturias temp=35 4242424242"#, TABLE2_NAME),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p3"),
ARBITRARY_NAMESPACE_ID,
TABLE2_NAME,
TABLE2_ID,
0,
&format!(r#"{},region=Asturias temp=35 4242424242"#, TABLE2_NAME),
)))
.await
.expect("failed to write initial data");
@ -832,20 +813,17 @@ mod tests {
});
// Write data to partition p1, in the arbitrary table
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Asturias temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to write data");
@ -911,20 +889,17 @@ mod tests {
);
// Write data to partition p1, in the arbitrary table
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Madrid temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!(
r#"{},region=Madrid temp=35 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to write initial data");
@ -938,39 +913,33 @@ mod tests {
// Perform a write concurrent to the consumption of the query stream
// that creates a new partition (p2) in the same table.
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p2"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
1,
&format!(
r#"{},region=Asturias temp=20 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p2"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
1,
&format!(
r#"{},region=Asturias temp=20 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to perform concurrent write to new partition");
// Perform another write that hits the partition within the query
// results snapshot (p1) before the partition is read.
buf.apply(
DmlOperation::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
2,
&format!(
r#"{},region=Murcia temp=30 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
))
.into(),
)
buf.apply(IngestOp::Write(make_write_op(
&PartitionKey::from("p1"),
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
2,
&format!(
r#"{},region=Murcia temp=30 4242424242"#,
&*ARBITRARY_TABLE_NAME
),
)))
.await
.expect("failed to perform concurrent write to existing partition");

View File

@ -40,6 +40,13 @@ impl IngestOp {
Self::Write(w) => w.span_context.as_ref(),
}
}
/// Sets the tracing context associated with the [`IngestOp`]
pub fn set_span_context(&mut self, ctx: SpanContext) {
match self {
Self::Write(w) => w.span_context = Some(ctx),
}
}
}
/// A decoded representation of the data contained by an RPC write
@ -224,8 +231,13 @@ impl PartitionedData {
self.sequence_number
}
/// Consumes `self`, returning the data
pub fn data(self) -> MutableBatch {
/// Returns a reference to the data
pub fn data(&self) -> &MutableBatch {
&self.data
}
/// Consumes `self`, returning the owned data
pub fn into_data(self) -> MutableBatch {
self.data
}
}

View File

@ -74,7 +74,6 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
use dml::DmlOperation;
use lazy_static::lazy_static;
use metric::Attributes;
@ -123,14 +122,14 @@ mod tests {
let metrics = metric::Registry::default();
let decorator = DmlSinkInstrumentation::new(LAYER_NAME, mock, &metrics);
let op = DmlOperation::Write(make_write_op(
let op = IngestOp::Write(make_write_op(
&PARTITION_KEY,
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
42,
"banana-report,tag=1 v=2 42424242",
)).into();
));
// Call the decorator and assert the return value
let got = decorator

View File

@ -59,7 +59,6 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, PartitionKey, TableId};
use dml::{DmlMeta, DmlOperation};
use lazy_static::lazy_static;
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
@ -120,7 +119,7 @@ mod tests {
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let mut op = DmlOperation::Write(make_write_op(
let mut op = IngestOp::Write(make_write_op(
&PARTITION_KEY,
NAMESPACE_ID,
TABLE_NAME,
@ -130,17 +129,11 @@ mod tests {
));
// Populate the metadata with a span context.
let meta = op.meta();
op.set_meta(DmlMeta::sequenced(
meta.sequence().unwrap(),
meta.producer_ts().unwrap(),
Some(span),
42,
));
op.set_span_context(span);
// Drive the trace wrapper
DmlSinkTracing::new(mock, "bananas")
.apply(op.into())
.apply(op)
.await
.expect("wrapper should not modify result");
@ -156,7 +149,7 @@ mod tests {
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
let mut op = DmlOperation::Write(make_write_op(
let mut op = IngestOp::Write(make_write_op(
&PARTITION_KEY,
NAMESPACE_ID,
TABLE_NAME,
@ -166,17 +159,11 @@ mod tests {
));
// Populate the metadata with a span context.
let meta = op.meta();
op.set_meta(DmlMeta::sequenced(
meta.sequence().unwrap(),
meta.producer_ts().unwrap(),
Some(span),
42,
));
op.set_span_context(span);
// Drive the trace wrapper
let got = DmlSinkTracing::new(mock, "bananas")
.apply(op.into())
.apply(op)
.await
.expect_err("wrapper should not modify result");
assert_matches!(got, DmlError::Wal(s) => {

View File

@ -350,12 +350,12 @@ mod tests {
// Apply the first op through the decorator
wal_sink
.apply(DmlOperation::Write(op1.clone()).into())
.apply(IngestOp::Write(op1.clone()))
.await
.expect("wal should not error");
// And the second op
wal_sink
.apply(DmlOperation::Write(op2.clone()).into())
.apply(IngestOp::Write(op2.clone()))
.await
.expect("wal should not error");
@ -364,7 +364,7 @@ mod tests {
// Write the third op
wal_sink
.apply(DmlOperation::Write(op3.clone()).into())
.apply(IngestOp::Write(op3.clone()))
.await
.expect("wal should not error");
@ -389,7 +389,13 @@ mod tests {
// Put at least one write into the buffer so it is a candidate for persistence
partition
.buffer_write(
op1.tables().next().unwrap().1.clone(),
op1.tables()
.next()
.unwrap()
.1
.partitioned_data()
.data()
.clone(),
SequenceNumber::new(1),
)
.unwrap();
@ -414,9 +420,9 @@ mod tests {
IngestOp::Write(ref w2),
IngestOp::Write(ref w3)
] => {
assert_dml_writes_eq(w1.into(), op1);
assert_dml_writes_eq(w2.into(), op2);
assert_dml_writes_eq(w3.into(), op3);
assert_dml_writes_eq(w1.clone(), op1);
assert_dml_writes_eq(w2.clone(), op2);
assert_dml_writes_eq(w3.clone(), op3);
}
);

View File

@ -475,7 +475,6 @@ mod tests {
use std::{sync::Arc, task::Poll, time::Duration};
use assert_matches::assert_matches;
use dml::DmlOperation;
use futures::Future;
use iox_catalog::mem::MemCatalog;
use object_store::memory::InMemory;
@ -492,6 +491,7 @@ mod tests {
post_write::mock::MockPostWriteObserver, BufferTree,
},
deferred_load::DeferredLoad,
dml_payload::IngestOp,
dml_sink::DmlSink,
ingest_state::IngestStateError,
persist::{
@ -523,17 +523,14 @@ mod tests {
);
buffer_tree
.apply(
DmlOperation::Write(make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!("{},good=yes level=1000 4242424242", &*ARBITRARY_TABLE_NAME),
))
.into(),
)
.apply(IngestOp::Write(make_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
0,
&format!("{},good=yes level=1000 4242424242", &*ARBITRARY_TABLE_NAME),
)))
.await
.expect("failed to write partition test dataa");

View File

@ -17,7 +17,6 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{CompactionLevel, ParquetFile};
use dml::DmlOperation;
use futures::TryStreamExt;
use iox_catalog::{
interface::{get_schema_by_id, Catalog, SoftDeletedRows},
@ -41,6 +40,7 @@ mod tests {
post_write::mock::MockPostWriteObserver,
BufferTree,
},
dml_payload::IngestOp,
dml_sink::DmlSink,
ingest_state::IngestState,
persist::handle::PersistHandle,
@ -98,7 +98,7 @@ mod tests {
validate_or_insert_schema(
write
.tables()
.map(|(_id, data)| (&***ARBITRARY_TABLE_NAME, data)),
.map(|(_id, data)| (&***ARBITRARY_TABLE_NAME, data.partitioned_data().data())),
&schema,
&mut *repos,
)
@ -108,7 +108,7 @@ mod tests {
drop(repos); // Don't you love this testing-only deadlock bug? #3859
// Apply the write
buf.apply(DmlOperation::Write(write).into())
buf.apply(IngestOp::Write(write))
.await
.expect("failed to apply write to buffer");

View File

@ -1,7 +1,6 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use data_types::{NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, TableId};
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
use lazy_static::lazy_static;
use mutable_batch_lp::lines_to_batches;
@ -20,6 +19,7 @@ use crate::{
},
},
deferred_load::DeferredLoad,
dml_payload::{PartitionedData, TableData, WriteOperation},
};
pub(crate) const ARBITRARY_PARTITION_ID: PartitionId = PartitionId::new(1);
@ -259,7 +259,6 @@ macro_rules! make_partition_stream {
/// # Panics
///
/// This method panics if `lines` contains data for more than one table.
// TODO(savage): Return a `dml_payload::WriteOperation` here.
#[track_caller]
pub(crate) fn make_write_op(
partition_key: &PartitionKey,
@ -268,7 +267,7 @@ pub(crate) fn make_write_op(
table_id: TableId,
sequence_number: i64,
lines: &str,
) -> DmlWrite {
) -> WriteOperation {
let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP");
assert_eq!(
tables_by_name.len(),
@ -283,19 +282,18 @@ pub(crate) fn make_write_op(
.expect("table_name does not exist in LP"),
)]
.into_iter()
.map(|(table_id, mutable_batch)| {
(
table_id,
TableData::new(
table_id,
PartitionedData::new(SequenceNumber::new(sequence_number), mutable_batch),
),
)
})
.collect();
DmlWrite::new(
namespace_id,
tables_by_id,
partition_key.clone(),
DmlMeta::sequenced(
SequenceNumber::new(sequence_number),
iox_time::Time::MIN,
None,
42,
),
)
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None)
}
pub(crate) async fn populate_catalog(
@ -318,17 +316,19 @@ pub(crate) async fn populate_catalog(
/// Assert `a` and `b` have identical metadata, and that when converting
/// them to Arrow batches they produces identical output.
#[track_caller]
pub(crate) fn assert_dml_writes_eq(a: DmlWrite, b: DmlWrite) {
assert_eq!(a.namespace_id(), b.namespace_id(), "namespace");
assert_eq!(a.table_count(), b.table_count(), "table count");
assert_eq!(a.min_timestamp(), b.min_timestamp(), "min timestamp");
assert_eq!(a.max_timestamp(), b.max_timestamp(), "max timestamp");
pub(crate) fn assert_dml_writes_eq(a: WriteOperation, b: WriteOperation) {
assert_eq!(a.namespace(), b.namespace(), "namespace");
assert_eq!(a.tables().count(), b.tables().count(), "table count");
assert_eq!(a.partition_key(), b.partition_key(), "partition key");
// Assert sequence numbers were reassigned
let seq_a = a.meta().sequence();
let seq_b = b.meta().sequence();
assert_eq!(seq_a, seq_b, "sequence numbers differ");
for (a_table, b_table) in a.tables().zip(b.tables()) {
assert_eq!(
a_table.1.partitioned_data().sequence_number(),
b_table.1.partitioned_data().sequence_number(),
"sequence number"
);
}
let a = a.into_tables().collect::<BTreeMap<_, _>>();
let b = b.into_tables().collect::<BTreeMap<_, _>>();
@ -336,9 +336,15 @@ pub(crate) fn assert_dml_writes_eq(a: DmlWrite, b: DmlWrite) {
a.into_iter().zip(b.into_iter()).for_each(|(a, b)| {
assert_eq!(a.0, b.0, "table IDs differ - a table is missing!");
assert_eq!(
a.1.to_arrow(Projection::All)
a.1.partitioned_data()
.data()
.clone()
.to_arrow(Projection::All)
.expect("failed projection for a"),
b.1.to_arrow(Projection::All)
b.1.partitioned_data()
.data()
.clone()
.to_arrow(Projection::All)
.expect("failed projection for b"),
"table data differs"
);

View File

@ -291,7 +291,7 @@ mod tests {
// Apply the op through the decorator, which should time out
let err = wal_sink
.apply(DmlOperation::Write(op.clone()).into())
.apply(IngestOp::Write(op.clone()))
.await
.expect_err("write should time out");