diff --git a/ingester/src/init/wal_replay.rs b/ingester/src/init/wal_replay.rs index 4db6fe1341..2eed3cf369 100644 --- a/ingester/src/init/wal_replay.rs +++ b/ingester/src/init/wal_replay.rs @@ -271,6 +271,7 @@ mod tests { use assert_matches::assert_matches; use async_trait::async_trait; + use lazy_static::lazy_static; use metric::{Attributes, Metric}; use parking_lot::Mutex; use wal::Wal; @@ -281,9 +282,9 @@ mod tests { dml_sink::mock_sink::MockDmlSink, persist::queue::mock::MockPersistQueue, test_util::{ - assert_dml_writes_eq, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, - ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, - ARBITRARY_TABLE_NAME, + assert_write_ops_eq, make_multi_table_write_op, make_write_op, PartitionDataBuilder, + ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, + ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, }, wal::wal_sink::WalSink, }; @@ -311,6 +312,10 @@ mod tests { } } + lazy_static! { + static ref ALTERNATIVE_TABLE_NAME: &'static str = "arĂ¡n"; + } + #[tokio::test] async fn test_replay() { let dir = tempfile::tempdir().unwrap(); @@ -340,18 +345,30 @@ mod tests { ), None, ); - let op3 = make_write_op( + + // Add a write hitting multiple tables for good measure + let op3 = make_multi_table_write_op( &ARBITRARY_PARTITION_KEY, ARBITRARY_NAMESPACE_ID, - &ARBITRARY_TABLE_NAME, - ARBITRARY_TABLE_ID, - 42, + [ + ( + ARBITRARY_TABLE_NAME.to_string().as_str(), + ARBITRARY_TABLE_ID, + SequenceNumber::new(42), + ), + ( + &ALTERNATIVE_TABLE_NAME, + TableId::new(ARBITRARY_TABLE_ID.get() + 1), + SequenceNumber::new(43), + ), + ] + .into_iter(), // Overwrite op2 &format!( - r#"{},region=Asturias temp=15 4242424242"#, - &*ARBITRARY_TABLE_NAME + r#"{},region=Asturias temp=15 4242424242 + {},region=Mayo temp=12 4242424242"#, + &*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME, ), - None, ); // The write portion of this test. @@ -427,7 +444,7 @@ mod tests { .await .expect("failed to replay WAL"); - assert_eq!(max_sequence_number, Some(SequenceNumber::new(42))); + assert_eq!(max_sequence_number, Some(SequenceNumber::new(43))); // Assert the ops were pushed into the DmlSink exactly as generated. let ops = mock_iter.sink.get_calls(); @@ -438,9 +455,9 @@ mod tests { IngestOp::Write(ref w2), IngestOp::Write(ref w3) ] => { - assert_dml_writes_eq(w1.clone(), op1); - assert_dml_writes_eq(w2.clone(), op2); - assert_dml_writes_eq(w3.clone(), op3); + assert_write_ops_eq(w1.clone(), op1); + assert_write_ops_eq(w2.clone(), op2); + assert_write_ops_eq(w3.clone(), op3); } ); diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs index 7d3de046a5..be02dbd92a 100644 --- a/ingester/src/server/grpc/rpc_write.rs +++ b/ingester/src/server/grpc/rpc_write.rs @@ -227,17 +227,22 @@ mod tests { column::{SemanticType, Values}, Column, DatabaseBatch, TableBatch, }; - use std::sync::Arc; + use lazy_static::lazy_static; + use std::{collections::HashSet, sync::Arc}; use super::*; use crate::{ dml_payload::IngestOp, - dml_sink::mock_sink::MockDmlSink, - test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID}, + test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID}, }; + use crate::{dml_sink::mock_sink::MockDmlSink, test_util::ARBITRARY_PARTITION_KEY}; const PERSIST_QUEUE_DEPTH: usize = 42; + lazy_static! { + static ref ALTERNATIVE_TABLE_ID: TableId = TableId::new(76); + } + macro_rules! test_rpc_write { ( $name:ident, @@ -316,6 +321,75 @@ mod tests { } ); + test_rpc_write!( + apply_ok_independently_sequenced_partitions, + request = proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), + table_batches: vec![ + TableBatch { + table_id: ARBITRARY_TABLE_ID.get(), + columns: vec![Column { + column_name: String::from("time"), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![4242], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count:1 , + }, + TableBatch { + table_id: ALTERNATIVE_TABLE_ID.get(), + columns: vec![Column { + column_name: String::from("time"), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![7676], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count: 1, + }, + ], + }), + }, + sink_ret = Ok(()), + want_err = false, + want_calls = [IngestOp::Write(w)] => { + // Assert the properties of the applied IngestOp match the expected + // values. Notably a sequence number should be assigned _per partition_. + assert_eq!(w.namespace(), ARBITRARY_NAMESPACE_ID); + assert_eq!(w.tables().count(), 2); + assert_eq!(*w.partition_key(), *ARBITRARY_PARTITION_KEY); + let sequence_numbers = w.tables().map(|t| t.1.partitioned_data().sequence_number()).collect::>(); + assert_eq!( + sequence_numbers, + [ + SequenceNumber::new(1), + SequenceNumber::new(2), + ] + .into_iter() + .collect::>(), + ); + } + ); + test_rpc_write!( no_payload, request = proto::WriteRequest { payload: None }, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 9a8a75cb1a..ec58d58c8c 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -4,6 +4,7 @@ use data_types::{ partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId, }; +use hashbrown::HashSet; use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace}; use lazy_static::lazy_static; use mutable_batch_lp::lines_to_batches; @@ -312,6 +313,52 @@ pub(crate) fn make_write_op( WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), span_ctx) } +/// Construct a [`WriteOperation`] with the specified parameters for LP covering +/// multiple separately sequenced table writes. +/// +/// # Panics +/// +/// This method panics if `table_sequence_numbers` contains a different number +/// of tables to the batches derived from `lines` OR if a [`SequenceNumber`] +/// is re-used within the write. +#[track_caller] +pub(crate) fn make_multi_table_write_op< + 'a, + I: ExactSizeIterator, +>( + partition_key: &PartitionKey, + namespace_id: NamespaceId, + table_sequence_numbers: I, + lines: &str, +) -> WriteOperation { + let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP"); + assert_eq!( + tables_by_name.len(), + table_sequence_numbers.len(), + "number of tables in LP does not match number of table_sequence_numbers" + ); + + let mut seen_sequence_numbers = HashSet::::new(); + + let tables_by_id = table_sequence_numbers + .map(|(table_name, table_id, sequence_number)| { + let mb = tables_by_name + .remove(table_name) + .expect("table name does not exist in LP"); + assert!( + seen_sequence_numbers.insert(sequence_number), + "duplicate sequence number {sequence_number:?} observed" + ); + ( + table_id, + TableData::new(table_id, PartitionedData::new(sequence_number, mb)), + ) + }) + .collect(); + + WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None) +} + pub(crate) async fn populate_catalog( catalog: &dyn Catalog, namespace: &str, @@ -332,17 +379,18 @@ pub(crate) async fn populate_catalog( /// Assert `a` and `b` have identical metadata, and that when converting /// them to Arrow batches they produces identical output. #[track_caller] -pub(crate) fn assert_dml_writes_eq(a: WriteOperation, b: WriteOperation) { +pub(crate) fn assert_write_ops_eq(a: WriteOperation, b: WriteOperation) { assert_eq!(a.namespace(), b.namespace(), "namespace"); assert_eq!(a.tables().count(), b.tables().count(), "table count"); assert_eq!(a.partition_key(), b.partition_key(), "partition key"); // Assert sequence numbers were reassigned for (a_table, b_table) in a.tables().zip(b.tables()) { + assert_eq!(a_table.0, b_table.0, "table id mismatch"); assert_eq!( a_table.1.partitioned_data().sequence_number(), b_table.1.partitioned_data().sequence_number(), - "sequence number" + "sequence number mismatch" ); } diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 1c6fb8bbd1..ae0bfe8848 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -696,22 +696,25 @@ mod tests { let wal = Wal::new(&dir.path()).await.unwrap(); let w1 = test_data("m1,t=foo v=1i 1"); - let w2 = test_data("m1,t=foo v=2i 2"); + // Use multiple tables for a write to test per-partition sequencing is preserved + let w2 = test_data("m1,t=foo v=2i 2\nm2,u=bar v=1i 1"); let op1 = SequencedWalOp { table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(), op: WalOp::Write(w1), }; let op2 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)] + .into_iter() + .collect(), op: WalOp::Write(w2), }; let op3 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Delete(test_delete()), }; let op4 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Persist(test_persist()), }; @@ -732,7 +735,7 @@ mod tests { // Assert the set has recorded the op IDs. // // Note that one op has a duplicate sequence number above! - assert_eq!(ids.len(), 3); + assert_eq!(ids.len(), 4); // Assert the sequence number set contains the specified ops. let ids = ids.iter().collect::>(); @@ -742,6 +745,7 @@ mod tests { SequenceNumber::new(0), SequenceNumber::new(1), SequenceNumber::new(2), + SequenceNumber::new(3), ] ); @@ -752,9 +756,11 @@ mod tests { .collect::>>(), [ [(TableId::new(0), 0)].into_iter().collect(), - [(TableId::new(0), 1)].into_iter().collect(), - [(TableId::new(0), 2)].into_iter().collect(), - [(TableId::new(0), 2)].into_iter().collect(), + [(TableId::new(0), 1), (TableId::new(1), 2)] + .into_iter() + .collect(), + [(TableId::new(0), 3)].into_iter().collect(), + [(TableId::new(0), 3)].into_iter().collect(), ] .into_iter() .collect::>>(), @@ -807,7 +813,7 @@ mod tests { let wal = Wal::new(dir.path()).await.unwrap(); let w1 = test_data("m1,t=foo v=1i 1"); - let w2 = test_data("m2,u=foo w=2i 2"); + let w2 = test_data("m1,t=foo v=2i 2\nm2,u=foo w=2i 2"); let w3 = test_data("m1,t=foo v=3i 3"); let op1 = SequencedWalOp { @@ -815,20 +821,22 @@ mod tests { op: WalOp::Write(w1.to_owned()), }; let op2 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)] + .into_iter() + .collect(), op: WalOp::Write(w2.to_owned()), }; let op3 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Delete(test_delete()), }; let op4 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Persist(test_persist()), }; // A third write entry coming after a delete and persist entry must still be yielded let op5 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 4)].into_iter().collect(), op: WalOp::Write(w3.to_owned()), }; diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index b2580ab910..331c9b49bf 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -22,16 +22,16 @@ async fn crud() { ); // Can write an entry to the open segment - let op = arbitrary_sequenced_wal_op(42); + let op = arbitrary_sequenced_wal_op([42, 43]); let summary = unwrap_summary(wal.write_op(op)).await; - assert_eq!(summary.total_bytes, 126); - assert_eq!(summary.bytes_written, 110); + assert_eq!(summary.total_bytes, 140); + assert_eq!(summary.bytes_written, 124); // Can write another entry; total_bytes accumulates - let op = arbitrary_sequenced_wal_op(43); + let op = arbitrary_sequenced_wal_op([44, 45]); let summary = unwrap_summary(wal.write_op(op)).await; - assert_eq!(summary.total_bytes, 236); - assert_eq!(summary.bytes_written, 110); + assert_eq!(summary.total_bytes, 264); + assert_eq!(summary.bytes_written, 124); // Still no closed segments let closed = wal.closed_segments(); @@ -42,10 +42,15 @@ async fn crud() { // Can't read entries from the open segment; have to rotate first let (closed_segment_details, ids) = wal.rotate().unwrap(); - assert_eq!(closed_segment_details.size(), 236); + assert_eq!(closed_segment_details.size(), 264); assert_eq!( ids.iter().collect::>(), - [SequenceNumber::new(42), SequenceNumber::new(43)] + [ + SequenceNumber::new(42), + SequenceNumber::new(43), + SequenceNumber::new(44), + SequenceNumber::new(45) + ] ); // There's one closed segment @@ -53,20 +58,25 @@ async fn crud() { let closed_segment_ids: Vec<_> = closed.iter().map(|c| c.id()).collect(); assert_eq!(closed_segment_ids, &[closed_segment_details.id()]); - // Can read the written entries from the closed segment, - // ensuring the per-partition sequence numbers match up to the current - // op-level sequence number while it is the source of truth. + // Can read the written entries from the closed segment, ensuring that the + // per-partition sequence numbers are preserved. let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap(); - let op = reader.next().unwrap().unwrap(); - op[0] + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 42)); - let op = reader.next().unwrap().unwrap(); - op[0] + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([42, 43]),); + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 43)); + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([44, 45]),); // Can delete a segment, leaving no closed segments again wal.delete(closed_segment_details.id()).await.unwrap(); @@ -85,10 +95,10 @@ async fn replay() { // WAL. { let wal = wal::Wal::new(dir.path()).await.unwrap(); - let op = arbitrary_sequenced_wal_op(42); + let op = arbitrary_sequenced_wal_op([42]); let _ = unwrap_summary(wal.write_op(op)).await; wal.rotate().unwrap(); - let op = arbitrary_sequenced_wal_op(43); + let op = arbitrary_sequenced_wal_op([43, 44]); let _ = unwrap_summary(wal.write_op(op)).await; } @@ -102,22 +112,27 @@ async fn replay() { assert_eq!(closed_segment_ids.len(), 2); // Can read the written entries from the previously closed segment - // ensuring the per-partition sequence numbers match up to the current - // op-level sequence number while it is the source of truth. + // ensuring the per-partition sequence numbers are preserved. let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap(); - let op = reader.next().unwrap().unwrap(); - op[0] + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 42)); + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([42])); // Can read the written entries from the previously open segment let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap(); - let op = reader.next().unwrap().unwrap(); - op[0] + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 43)); + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([43, 44])); } #[tokio::test] @@ -128,17 +143,20 @@ async fn ordering() { { let wal = wal::Wal::new(dir.path()).await.unwrap(); - let op = arbitrary_sequenced_wal_op(42); + let op = arbitrary_sequenced_wal_op([42, 43]); let _ = unwrap_summary(wal.write_op(op)).await; let (_, ids) = wal.rotate().unwrap(); - assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(42)]); + assert_eq!( + ids.iter().collect::>(), + [SequenceNumber::new(42), SequenceNumber::new(43)] + ); - let op = arbitrary_sequenced_wal_op(43); + let op = arbitrary_sequenced_wal_op([44]); let _ = unwrap_summary(wal.write_op(op)).await; let (_, ids) = wal.rotate().unwrap(); - assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(43)]); + assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(44)]); - let op = arbitrary_sequenced_wal_op(44); + let op = arbitrary_sequenced_wal_op([45]); let _ = unwrap_summary(wal.write_op(op)).await; } @@ -162,15 +180,21 @@ async fn ordering() { assert!(ids.is_empty()); } -// TODO(savage): This needs changing to generate multiple partitioned sequence numbers for each -// write. -fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp { - let w = test_data("m1,t=foo v=1i 1"); +fn arbitrary_sequenced_wal_op>(sequence_numbers: I) -> SequencedWalOp { + let sequence_numbers = sequence_numbers.into_iter().collect::>(); + let lp = sequence_numbers + .iter() + .enumerate() + .fold(String::new(), |string, (idx, _)| { + string + &format!("m{},t=foo v=1i 1\n", idx) + }); + let w = test_data(lp.as_str()); SequencedWalOp { table_write_sequence_numbers: w .table_batches .iter() - .map(|table_batch| (TableId::new(table_batch.table_id), sequence_number)) + .zip(sequence_numbers.iter()) + .map(|(table_batch, &id)| (TableId::new(table_batch.table_id), id)) .collect(), op: WalOp::Write(w), }