test(wal): Test correct assignment of write per-partition sequence numbers

This adds extra test coverage for the ingester's WAL replay & RPC write
paths, as well as the WAL E2E tests, to ensure that all sequence numbers
present in a WriteOperation/WalOperation are encoded and present when
decoded.
pull/24376/head
Fraser Savage 2023-06-27 16:34:04 +01:00
parent e6e09d0c15
commit e74a7a7dd4
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
5 changed files with 244 additions and 73 deletions

View File

@ -271,6 +271,7 @@ mod tests {
use assert_matches::assert_matches;
use async_trait::async_trait;
use lazy_static::lazy_static;
use metric::{Attributes, Metric};
use parking_lot::Mutex;
use wal::Wal;
@ -281,9 +282,9 @@ mod tests {
dml_sink::mock_sink::MockDmlSink,
persist::queue::mock::MockPersistQueue,
test_util::{
assert_dml_writes_eq, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID,
ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID,
ARBITRARY_TABLE_NAME,
assert_write_ops_eq, make_multi_table_write_op, make_write_op, PartitionDataBuilder,
ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY,
ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME,
},
wal::wal_sink::WalSink,
};
@ -311,6 +312,10 @@ mod tests {
}
}
lazy_static! {
static ref ALTERNATIVE_TABLE_NAME: &'static str = "arán";
}
#[tokio::test]
async fn test_replay() {
let dir = tempfile::tempdir().unwrap();
@ -340,18 +345,30 @@ mod tests {
),
None,
);
let op3 = make_write_op(
// Add a write hitting multiple tables for good measure
let op3 = make_multi_table_write_op(
&ARBITRARY_PARTITION_KEY,
ARBITRARY_NAMESPACE_ID,
&ARBITRARY_TABLE_NAME,
ARBITRARY_TABLE_ID,
42,
[
(
ARBITRARY_TABLE_NAME.to_string().as_str(),
ARBITRARY_TABLE_ID,
SequenceNumber::new(42),
),
(
&ALTERNATIVE_TABLE_NAME,
TableId::new(ARBITRARY_TABLE_ID.get() + 1),
SequenceNumber::new(43),
),
]
.into_iter(),
// Overwrite op2
&format!(
r#"{},region=Asturias temp=15 4242424242"#,
&*ARBITRARY_TABLE_NAME
r#"{},region=Asturias temp=15 4242424242
{},region=Mayo temp=12 4242424242"#,
&*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME,
),
None,
);
// The write portion of this test.
@ -427,7 +444,7 @@ mod tests {
.await
.expect("failed to replay WAL");
assert_eq!(max_sequence_number, Some(SequenceNumber::new(42)));
assert_eq!(max_sequence_number, Some(SequenceNumber::new(43)));
// Assert the ops were pushed into the DmlSink exactly as generated.
let ops = mock_iter.sink.get_calls();
@ -438,9 +455,9 @@ mod tests {
IngestOp::Write(ref w2),
IngestOp::Write(ref w3)
] => {
assert_dml_writes_eq(w1.clone(), op1);
assert_dml_writes_eq(w2.clone(), op2);
assert_dml_writes_eq(w3.clone(), op3);
assert_write_ops_eq(w1.clone(), op1);
assert_write_ops_eq(w2.clone(), op2);
assert_write_ops_eq(w3.clone(), op3);
}
);

View File

@ -227,17 +227,22 @@ mod tests {
column::{SemanticType, Values},
Column, DatabaseBatch, TableBatch,
};
use std::sync::Arc;
use lazy_static::lazy_static;
use std::{collections::HashSet, sync::Arc};
use super::*;
use crate::{
dml_payload::IngestOp,
dml_sink::mock_sink::MockDmlSink,
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID},
test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID},
};
use crate::{dml_sink::mock_sink::MockDmlSink, test_util::ARBITRARY_PARTITION_KEY};
const PERSIST_QUEUE_DEPTH: usize = 42;
lazy_static! {
static ref ALTERNATIVE_TABLE_ID: TableId = TableId::new(76);
}
macro_rules! test_rpc_write {
(
$name:ident,
@ -316,6 +321,75 @@ mod tests {
}
);
test_rpc_write!(
apply_ok_independently_sequenced_partitions,
request = proto::WriteRequest {
payload: Some(DatabaseBatch {
database_id: ARBITRARY_NAMESPACE_ID.get(),
partition_key: ARBITRARY_PARTITION_KEY.to_string(),
table_batches: vec![
TableBatch {
table_id: ARBITRARY_TABLE_ID.get(),
columns: vec![Column {
column_name: String::from("time"),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![4242],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count:1 ,
},
TableBatch {
table_id: ALTERNATIVE_TABLE_ID.get(),
columns: vec![Column {
column_name: String::from("time"),
semantic_type: SemanticType::Time.into(),
values: Some(Values {
i64_values: vec![7676],
f64_values: vec![],
u64_values: vec![],
string_values: vec![],
bool_values: vec![],
bytes_values: vec![],
packed_string_values: None,
interned_string_values: None,
}),
null_mask: vec![0],
}],
row_count: 1,
},
],
}),
},
sink_ret = Ok(()),
want_err = false,
want_calls = [IngestOp::Write(w)] => {
// Assert the properties of the applied IngestOp match the expected
// values. Notably a sequence number should be assigned _per partition_.
assert_eq!(w.namespace(), ARBITRARY_NAMESPACE_ID);
assert_eq!(w.tables().count(), 2);
assert_eq!(*w.partition_key(), *ARBITRARY_PARTITION_KEY);
let sequence_numbers = w.tables().map(|t| t.1.partitioned_data().sequence_number()).collect::<HashSet<_>>();
assert_eq!(
sequence_numbers,
[
SequenceNumber::new(1),
SequenceNumber::new(2),
]
.into_iter()
.collect::<HashSet<_>>(),
);
}
);
test_rpc_write!(
no_payload,
request = proto::WriteRequest { payload: None },

View File

@ -4,6 +4,7 @@ use data_types::{
partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey,
SequenceNumber, TableId,
};
use hashbrown::HashSet;
use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace};
use lazy_static::lazy_static;
use mutable_batch_lp::lines_to_batches;
@ -312,6 +313,52 @@ pub(crate) fn make_write_op(
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), span_ctx)
}
/// Construct a [`WriteOperation`] with the specified parameters for LP covering
/// multiple separately sequenced table writes.
///
/// # Panics
///
/// This method panics if `table_sequence_numbers` contains a different number
/// of tables to the batches derived from `lines` OR if a [`SequenceNumber`]
/// is re-used within the write.
#[track_caller]
pub(crate) fn make_multi_table_write_op<
'a,
I: ExactSizeIterator<Item = (&'a str, TableId, SequenceNumber)>,
>(
partition_key: &PartitionKey,
namespace_id: NamespaceId,
table_sequence_numbers: I,
lines: &str,
) -> WriteOperation {
let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP");
assert_eq!(
tables_by_name.len(),
table_sequence_numbers.len(),
"number of tables in LP does not match number of table_sequence_numbers"
);
let mut seen_sequence_numbers = HashSet::<SequenceNumber>::new();
let tables_by_id = table_sequence_numbers
.map(|(table_name, table_id, sequence_number)| {
let mb = tables_by_name
.remove(table_name)
.expect("table name does not exist in LP");
assert!(
seen_sequence_numbers.insert(sequence_number),
"duplicate sequence number {sequence_number:?} observed"
);
(
table_id,
TableData::new(table_id, PartitionedData::new(sequence_number, mb)),
)
})
.collect();
WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None)
}
pub(crate) async fn populate_catalog(
catalog: &dyn Catalog,
namespace: &str,
@ -332,17 +379,18 @@ pub(crate) async fn populate_catalog(
/// Assert `a` and `b` have identical metadata, and that when converting
/// them to Arrow batches they produces identical output.
#[track_caller]
pub(crate) fn assert_dml_writes_eq(a: WriteOperation, b: WriteOperation) {
pub(crate) fn assert_write_ops_eq(a: WriteOperation, b: WriteOperation) {
assert_eq!(a.namespace(), b.namespace(), "namespace");
assert_eq!(a.tables().count(), b.tables().count(), "table count");
assert_eq!(a.partition_key(), b.partition_key(), "partition key");
// Assert sequence numbers were reassigned
for (a_table, b_table) in a.tables().zip(b.tables()) {
assert_eq!(a_table.0, b_table.0, "table id mismatch");
assert_eq!(
a_table.1.partitioned_data().sequence_number(),
b_table.1.partitioned_data().sequence_number(),
"sequence number"
"sequence number mismatch"
);
}

View File

@ -696,22 +696,25 @@ mod tests {
let wal = Wal::new(&dir.path()).await.unwrap();
let w1 = test_data("m1,t=foo v=1i 1");
let w2 = test_data("m1,t=foo v=2i 2");
// Use multiple tables for a write to test per-partition sequencing is preserved
let w2 = test_data("m1,t=foo v=2i 2\nm2,u=bar v=1i 1");
let op1 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(w1),
};
let op2 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)]
.into_iter()
.collect(),
op: WalOp::Write(w2),
};
let op3 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
@ -732,7 +735,7 @@ mod tests {
// Assert the set has recorded the op IDs.
//
// Note that one op has a duplicate sequence number above!
assert_eq!(ids.len(), 3);
assert_eq!(ids.len(), 4);
// Assert the sequence number set contains the specified ops.
let ids = ids.iter().collect::<Vec<_>>();
@ -742,6 +745,7 @@ mod tests {
SequenceNumber::new(0),
SequenceNumber::new(1),
SequenceNumber::new(2),
SequenceNumber::new(3),
]
);
@ -752,9 +756,11 @@ mod tests {
.collect::<Vec<std::collections::HashMap<TableId, u64>>>(),
[
[(TableId::new(0), 0)].into_iter().collect(),
[(TableId::new(0), 1)].into_iter().collect(),
[(TableId::new(0), 2)].into_iter().collect(),
[(TableId::new(0), 2)].into_iter().collect(),
[(TableId::new(0), 1), (TableId::new(1), 2)]
.into_iter()
.collect(),
[(TableId::new(0), 3)].into_iter().collect(),
[(TableId::new(0), 3)].into_iter().collect(),
]
.into_iter()
.collect::<Vec<std::collections::HashMap<TableId, u64>>>(),
@ -807,7 +813,7 @@ mod tests {
let wal = Wal::new(dir.path()).await.unwrap();
let w1 = test_data("m1,t=foo v=1i 1");
let w2 = test_data("m2,u=foo w=2i 2");
let w2 = test_data("m1,t=foo v=2i 2\nm2,u=foo w=2i 2");
let w3 = test_data("m1,t=foo v=3i 3");
let op1 = SequencedWalOp {
@ -815,20 +821,22 @@ mod tests {
op: WalOp::Write(w1.to_owned()),
};
let op2 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)]
.into_iter()
.collect(),
op: WalOp::Write(w2.to_owned()),
};
let op3 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
// A third write entry coming after a delete and persist entry must still be yielded
let op5 = SequencedWalOp {
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 4)].into_iter().collect(),
op: WalOp::Write(w3.to_owned()),
};

View File

@ -22,16 +22,16 @@ async fn crud() {
);
// Can write an entry to the open segment
let op = arbitrary_sequenced_wal_op(42);
let op = arbitrary_sequenced_wal_op([42, 43]);
let summary = unwrap_summary(wal.write_op(op)).await;
assert_eq!(summary.total_bytes, 126);
assert_eq!(summary.bytes_written, 110);
assert_eq!(summary.total_bytes, 140);
assert_eq!(summary.bytes_written, 124);
// Can write another entry; total_bytes accumulates
let op = arbitrary_sequenced_wal_op(43);
let op = arbitrary_sequenced_wal_op([44, 45]);
let summary = unwrap_summary(wal.write_op(op)).await;
assert_eq!(summary.total_bytes, 236);
assert_eq!(summary.bytes_written, 110);
assert_eq!(summary.total_bytes, 264);
assert_eq!(summary.bytes_written, 124);
// Still no closed segments
let closed = wal.closed_segments();
@ -42,10 +42,15 @@ async fn crud() {
// Can't read entries from the open segment; have to rotate first
let (closed_segment_details, ids) = wal.rotate().unwrap();
assert_eq!(closed_segment_details.size(), 236);
assert_eq!(closed_segment_details.size(), 264);
assert_eq!(
ids.iter().collect::<Vec<_>>(),
[SequenceNumber::new(42), SequenceNumber::new(43)]
[
SequenceNumber::new(42),
SequenceNumber::new(43),
SequenceNumber::new(44),
SequenceNumber::new(45)
]
);
// There's one closed segment
@ -53,20 +58,25 @@ async fn crud() {
let closed_segment_ids: Vec<_> = closed.iter().map(|c| c.id()).collect();
assert_eq!(closed_segment_ids, &[closed_segment_details.id()]);
// Can read the written entries from the closed segment,
// ensuring the per-partition sequence numbers match up to the current
// op-level sequence number while it is the source of truth.
// Can read the written entries from the closed segment, ensuring that the
// per-partition sequence numbers are preserved.
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
let op = reader.next().unwrap().unwrap();
op[0]
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 42));
let op = reader.next().unwrap().unwrap();
op[0]
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([42, 43]),);
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 43));
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([44, 45]),);
// Can delete a segment, leaving no closed segments again
wal.delete(closed_segment_details.id()).await.unwrap();
@ -85,10 +95,10 @@ async fn replay() {
// WAL.
{
let wal = wal::Wal::new(dir.path()).await.unwrap();
let op = arbitrary_sequenced_wal_op(42);
let op = arbitrary_sequenced_wal_op([42]);
let _ = unwrap_summary(wal.write_op(op)).await;
wal.rotate().unwrap();
let op = arbitrary_sequenced_wal_op(43);
let op = arbitrary_sequenced_wal_op([43, 44]);
let _ = unwrap_summary(wal.write_op(op)).await;
}
@ -102,22 +112,27 @@ async fn replay() {
assert_eq!(closed_segment_ids.len(), 2);
// Can read the written entries from the previously closed segment
// ensuring the per-partition sequence numbers match up to the current
// op-level sequence number while it is the source of truth.
// ensuring the per-partition sequence numbers are preserved.
let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap();
let op = reader.next().unwrap().unwrap();
op[0]
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 42));
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([42]));
// Can read the written entries from the previously open segment
let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap();
let op = reader.next().unwrap().unwrap();
op[0]
let mut op = reader.next().unwrap().unwrap();
let mut got_sequence_numbers = op
.remove(0)
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, 43));
.into_values()
.collect::<Vec<_>>();
got_sequence_numbers.sort();
assert_eq!(got_sequence_numbers, Vec::<u64>::from([43, 44]));
}
#[tokio::test]
@ -128,17 +143,20 @@ async fn ordering() {
{
let wal = wal::Wal::new(dir.path()).await.unwrap();
let op = arbitrary_sequenced_wal_op(42);
let op = arbitrary_sequenced_wal_op([42, 43]);
let _ = unwrap_summary(wal.write_op(op)).await;
let (_, ids) = wal.rotate().unwrap();
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(42)]);
assert_eq!(
ids.iter().collect::<Vec<_>>(),
[SequenceNumber::new(42), SequenceNumber::new(43)]
);
let op = arbitrary_sequenced_wal_op(43);
let op = arbitrary_sequenced_wal_op([44]);
let _ = unwrap_summary(wal.write_op(op)).await;
let (_, ids) = wal.rotate().unwrap();
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(43)]);
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(44)]);
let op = arbitrary_sequenced_wal_op(44);
let op = arbitrary_sequenced_wal_op([45]);
let _ = unwrap_summary(wal.write_op(op)).await;
}
@ -162,15 +180,21 @@ async fn ordering() {
assert!(ids.is_empty());
}
// TODO(savage): This needs changing to generate multiple partitioned sequence numbers for each
// write.
fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {
let w = test_data("m1,t=foo v=1i 1");
fn arbitrary_sequenced_wal_op<I: IntoIterator<Item = u64>>(sequence_numbers: I) -> SequencedWalOp {
let sequence_numbers = sequence_numbers.into_iter().collect::<Vec<_>>();
let lp = sequence_numbers
.iter()
.enumerate()
.fold(String::new(), |string, (idx, _)| {
string + &format!("m{},t=foo v=1i 1\n", idx)
});
let w = test_data(lp.as_str());
SequencedWalOp {
table_write_sequence_numbers: w
.table_batches
.iter()
.map(|table_batch| (TableId::new(table_batch.table_id), sequence_number))
.zip(sequence_numbers.iter())
.map(|(table_batch, &id)| (TableId::new(table_batch.table_id), id))
.collect(),
op: WalOp::Write(w),
}