From 54a8f7d007475da5f7345b1f7b666d597d964ba4 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Mon, 26 Jun 2023 17:04:12 +0100 Subject: [PATCH 1/6] feat(data_types): Add `Extend` for `SequenceNumberSet` Although callers could manually extend the sequence number set by continually adding in an iterator loop or a fold expression, this enables other combinator patterns when dealing with collections of sequence number sets. --- data_types/src/sequence_number_set.rs | 31 +++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/data_types/src/sequence_number_set.rs b/data_types/src/sequence_number_set.rs index b357843fe0..c64206fbc8 100644 --- a/data_types/src/sequence_number_set.rs +++ b/data_types/src/sequence_number_set.rs @@ -99,6 +99,14 @@ impl Extend for SequenceNumberSet { } } +impl Extend for SequenceNumberSet { + fn extend>(&mut self, iter: T) { + for new_set in iter { + self.add_set(&new_set); + } + } +} + impl FromIterator for SequenceNumberSet { fn from_iter>(iter: T) -> Self { Self(iter.into_iter().map(|v| v.get() as _).collect()) @@ -174,6 +182,29 @@ mod tests { assert!(a.contains(SequenceNumber::new(2))); } + #[test] + fn test_extend_multiple_sets() { + let mut a = SequenceNumberSet::default(); + a.add(SequenceNumber::new(7)); + + let b = [SequenceNumber::new(13), SequenceNumber::new(76)]; + let c = [SequenceNumber::new(42), SequenceNumber::new(64)]; + + assert!(a.contains(SequenceNumber::new(7))); + for &num in [b, c].iter().flatten() { + assert!(!a.contains(num)); + } + + a.extend([ + SequenceNumberSet::from_iter(b), + SequenceNumberSet::from_iter(c), + ]); + assert!(a.contains(SequenceNumber::new(7))); + for &num in [b, c].iter().flatten() { + assert!(a.contains(num)); + } + } + #[test] fn test_collect() { let collect_set = [SequenceNumber::new(4), SequenceNumber::new(2)]; From e6e09d0c151a8cf034d2ce843c4ac8fb59c08278 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Mon, 26 Jun 2023 18:44:24 +0100 Subject: [PATCH 2/6] feat(ingester): Assign individual sequence numbers for writes per partition This commit asks the oracle for a new sequence number for each table batch of a write operation (and thus each partition of a write) when handling an RPC write operation before appending the operation to the WAL. The ingester now honours the sequence numbers per-partition when WAL replay is performed. --- ingester/src/init/wal_replay.rs | 37 +++++++++++++++++---------- ingester/src/server/grpc/rpc_write.rs | 13 +++++----- wal/src/lib.rs | 1 - wal/src/writer_thread.rs | 17 +++++------- wal/tests/end_to_end.rs | 4 +-- 5 files changed, 39 insertions(+), 33 deletions(-) diff --git a/ingester/src/init/wal_replay.rs b/ingester/src/init/wal_replay.rs index fd3ce91b2f..4db6fe1341 100644 --- a/ingester/src/init/wal_replay.rs +++ b/ingester/src/init/wal_replay.rs @@ -194,26 +194,18 @@ where for op in ops { let SequencedWalOp { - table_write_sequence_numbers, // TODO(savage): Use sequence numbers assigned per-partition + table_write_sequence_numbers, op, } = op; - let sequence_number = SequenceNumber::new( - *table_write_sequence_numbers - .values() - .next() - .expect("attempt to replay unsequenced wal entry"), - ); - - max_sequence = max_sequence.max(Some(sequence_number)); - let op = match op { Op::Write(w) => w, Op::Delete(_) => unreachable!(), Op::Persist(_) => unreachable!(), }; - debug!(?op, sequence_number = sequence_number.get(), "apply wal op"); + let mut op_min_sequence_number = None; + let mut op_max_sequence_number = None; // Reconstruct the ingest operation let batches = decode_database_batch(&op)?; @@ -226,10 +218,18 @@ where .into_iter() .map(|(k, v)| { let table_id = TableId::new(k); + let sequence_number = SequenceNumber::new( + *table_write_sequence_numbers + .get(&table_id) + .expect("attempt to apply unsequenced wal op"), + ); + + max_sequence = max_sequence.max(Some(sequence_number)); + op_min_sequence_number = op_min_sequence_number.min(Some(sequence_number)); + op_max_sequence_number = op_min_sequence_number.max(Some(sequence_number)); + ( table_id, - // TODO(savage): Use table-partitioned sequence - // numbers here TableData::new(table_id, PartitionedData::new(sequence_number, v)), ) }) @@ -239,6 +239,17 @@ where None, ); + debug!( + ?op, + op_min_sequence_number = op_min_sequence_number + .expect("attempt to apply unsequenced wal op") + .get(), + op_max_sequence_number = op_max_sequence_number + .expect("attempt to apply unsequenced wal op") + .get(), + "apply wal op" + ); + // Apply the operation to the provided DML sink sink.apply(IngestOp::Write(op)) .await diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs index 844e5e23bd..7d3de046a5 100644 --- a/ingester/src/server/grpc/rpc_write.rs +++ b/ingester/src/server/grpc/rpc_write.rs @@ -180,20 +180,21 @@ where "received rpc write" ); - let sequence_number = self.timestamp.next(); - - // Construct the corresponding ingester write operation for the RPC payload + // Construct the corresponding ingester write operation for the RPC payload, + // independently sequencing the data contained by the write per-partition let op = WriteOperation::new( namespace_id, batches .into_iter() .map(|(k, v)| { let table_id = TableId::new(k); + let partition_sequence_number = self.timestamp.next(); ( table_id, - // TODO(savage): Sequence partitioned data independently within a - // write. - TableData::new(table_id, PartitionedData::new(sequence_number, v)), + TableData::new( + table_id, + PartitionedData::new(partition_sequence_number, v), + ), ) }) .collect(), diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 7d314e104f..1c6fb8bbd1 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -720,7 +720,6 @@ mod tests { wal.write_op(op3.clone()); wal.write_op(op4.clone()).changed().await.unwrap(); - // TODO(savage): Returned SequenceNumberSet should reflect `partition_sequence_numbers` post-change. let (closed, ids) = wal.rotate().unwrap(); let ops: Vec = wal diff --git a/wal/src/writer_thread.rs b/wal/src/writer_thread.rs index 888616bd38..60ca60f34b 100644 --- a/wal/src/writer_thread.rs +++ b/wal/src/writer_thread.rs @@ -122,17 +122,12 @@ impl WriterIoThread { .ops .into_iter() .map(|v| { - // TODO(savage): Extract all [`SequenceNumber`] used for - // the op and include them in the batch once the tables - // sequenced independently. - let id = SequenceNumber::new( - *v.table_write_sequence_numbers - .values() - .next() - .expect("attempt to encode unsequence wal operation") - as _, - ); - (proto::SequencedWalOp::from(v), id) + let op_ids: SequenceNumberSet = v + .table_write_sequence_numbers + .values() + .map(|&id| SequenceNumber::new(id as _)) + .collect(); + (proto::SequencedWalOp::from(v), op_ids) }) .unzip(); let proto_batch = proto::WalOpBatch { ops }; diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index 62cc8d190a..b2580ab910 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -130,8 +130,6 @@ async fn ordering() { let op = arbitrary_sequenced_wal_op(42); let _ = unwrap_summary(wal.write_op(op)).await; - // TODO(savage): These will need to return the - // partition_sequence_numbers and be checked there. let (_, ids) = wal.rotate().unwrap(); assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(42)]); @@ -164,6 +162,8 @@ async fn ordering() { assert!(ids.is_empty()); } +// TODO(savage): This needs changing to generate multiple partitioned sequence numbers for each +// write. fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp { let w = test_data("m1,t=foo v=1i 1"); SequencedWalOp { From e74a7a7dd4ecd853cce9486a0810a026d56a2358 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Tue, 27 Jun 2023 16:34:04 +0100 Subject: [PATCH 3/6] test(wal): Test correct assignment of write per-partition sequence numbers This adds extra test coverage for the ingester's WAL replay & RPC write paths, as well as the WAL E2E tests, to ensure that all sequence numbers present in a WriteOperation/WalOperation are encoded and present when decoded. --- ingester/src/init/wal_replay.rs | 45 +++++++---- ingester/src/server/grpc/rpc_write.rs | 80 ++++++++++++++++++- ingester/src/test_util.rs | 52 ++++++++++++- wal/src/lib.rs | 34 +++++---- wal/tests/end_to_end.rs | 106 ++++++++++++++++---------- 5 files changed, 244 insertions(+), 73 deletions(-) diff --git a/ingester/src/init/wal_replay.rs b/ingester/src/init/wal_replay.rs index 4db6fe1341..2eed3cf369 100644 --- a/ingester/src/init/wal_replay.rs +++ b/ingester/src/init/wal_replay.rs @@ -271,6 +271,7 @@ mod tests { use assert_matches::assert_matches; use async_trait::async_trait; + use lazy_static::lazy_static; use metric::{Attributes, Metric}; use parking_lot::Mutex; use wal::Wal; @@ -281,9 +282,9 @@ mod tests { dml_sink::mock_sink::MockDmlSink, persist::queue::mock::MockPersistQueue, test_util::{ - assert_dml_writes_eq, make_write_op, PartitionDataBuilder, ARBITRARY_NAMESPACE_ID, - ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID, - ARBITRARY_TABLE_NAME, + assert_write_ops_eq, make_multi_table_write_op, make_write_op, PartitionDataBuilder, + ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_PARTITION_KEY, + ARBITRARY_TABLE_ID, ARBITRARY_TABLE_NAME, }, wal::wal_sink::WalSink, }; @@ -311,6 +312,10 @@ mod tests { } } + lazy_static! { + static ref ALTERNATIVE_TABLE_NAME: &'static str = "arán"; + } + #[tokio::test] async fn test_replay() { let dir = tempfile::tempdir().unwrap(); @@ -340,18 +345,30 @@ mod tests { ), None, ); - let op3 = make_write_op( + + // Add a write hitting multiple tables for good measure + let op3 = make_multi_table_write_op( &ARBITRARY_PARTITION_KEY, ARBITRARY_NAMESPACE_ID, - &ARBITRARY_TABLE_NAME, - ARBITRARY_TABLE_ID, - 42, + [ + ( + ARBITRARY_TABLE_NAME.to_string().as_str(), + ARBITRARY_TABLE_ID, + SequenceNumber::new(42), + ), + ( + &ALTERNATIVE_TABLE_NAME, + TableId::new(ARBITRARY_TABLE_ID.get() + 1), + SequenceNumber::new(43), + ), + ] + .into_iter(), // Overwrite op2 &format!( - r#"{},region=Asturias temp=15 4242424242"#, - &*ARBITRARY_TABLE_NAME + r#"{},region=Asturias temp=15 4242424242 + {},region=Mayo temp=12 4242424242"#, + &*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME, ), - None, ); // The write portion of this test. @@ -427,7 +444,7 @@ mod tests { .await .expect("failed to replay WAL"); - assert_eq!(max_sequence_number, Some(SequenceNumber::new(42))); + assert_eq!(max_sequence_number, Some(SequenceNumber::new(43))); // Assert the ops were pushed into the DmlSink exactly as generated. let ops = mock_iter.sink.get_calls(); @@ -438,9 +455,9 @@ mod tests { IngestOp::Write(ref w2), IngestOp::Write(ref w3) ] => { - assert_dml_writes_eq(w1.clone(), op1); - assert_dml_writes_eq(w2.clone(), op2); - assert_dml_writes_eq(w3.clone(), op3); + assert_write_ops_eq(w1.clone(), op1); + assert_write_ops_eq(w2.clone(), op2); + assert_write_ops_eq(w3.clone(), op3); } ); diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs index 7d3de046a5..be02dbd92a 100644 --- a/ingester/src/server/grpc/rpc_write.rs +++ b/ingester/src/server/grpc/rpc_write.rs @@ -227,17 +227,22 @@ mod tests { column::{SemanticType, Values}, Column, DatabaseBatch, TableBatch, }; - use std::sync::Arc; + use lazy_static::lazy_static; + use std::{collections::HashSet, sync::Arc}; use super::*; use crate::{ dml_payload::IngestOp, - dml_sink::mock_sink::MockDmlSink, - test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_KEY, ARBITRARY_TABLE_ID}, + test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID}, }; + use crate::{dml_sink::mock_sink::MockDmlSink, test_util::ARBITRARY_PARTITION_KEY}; const PERSIST_QUEUE_DEPTH: usize = 42; + lazy_static! { + static ref ALTERNATIVE_TABLE_ID: TableId = TableId::new(76); + } + macro_rules! test_rpc_write { ( $name:ident, @@ -316,6 +321,75 @@ mod tests { } ); + test_rpc_write!( + apply_ok_independently_sequenced_partitions, + request = proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: ARBITRARY_NAMESPACE_ID.get(), + partition_key: ARBITRARY_PARTITION_KEY.to_string(), + table_batches: vec![ + TableBatch { + table_id: ARBITRARY_TABLE_ID.get(), + columns: vec![Column { + column_name: String::from("time"), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![4242], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count:1 , + }, + TableBatch { + table_id: ALTERNATIVE_TABLE_ID.get(), + columns: vec![Column { + column_name: String::from("time"), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![7676], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count: 1, + }, + ], + }), + }, + sink_ret = Ok(()), + want_err = false, + want_calls = [IngestOp::Write(w)] => { + // Assert the properties of the applied IngestOp match the expected + // values. Notably a sequence number should be assigned _per partition_. + assert_eq!(w.namespace(), ARBITRARY_NAMESPACE_ID); + assert_eq!(w.tables().count(), 2); + assert_eq!(*w.partition_key(), *ARBITRARY_PARTITION_KEY); + let sequence_numbers = w.tables().map(|t| t.1.partitioned_data().sequence_number()).collect::>(); + assert_eq!( + sequence_numbers, + [ + SequenceNumber::new(1), + SequenceNumber::new(2), + ] + .into_iter() + .collect::>(), + ); + } + ); + test_rpc_write!( no_payload, request = proto::WriteRequest { payload: None }, diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 9a8a75cb1a..ec58d58c8c 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -4,6 +4,7 @@ use data_types::{ partition_template::TablePartitionTemplateOverride, NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId, }; +use hashbrown::HashSet; use iox_catalog::{interface::Catalog, test_helpers::arbitrary_namespace}; use lazy_static::lazy_static; use mutable_batch_lp::lines_to_batches; @@ -312,6 +313,52 @@ pub(crate) fn make_write_op( WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), span_ctx) } +/// Construct a [`WriteOperation`] with the specified parameters for LP covering +/// multiple separately sequenced table writes. +/// +/// # Panics +/// +/// This method panics if `table_sequence_numbers` contains a different number +/// of tables to the batches derived from `lines` OR if a [`SequenceNumber`] +/// is re-used within the write. +#[track_caller] +pub(crate) fn make_multi_table_write_op< + 'a, + I: ExactSizeIterator, +>( + partition_key: &PartitionKey, + namespace_id: NamespaceId, + table_sequence_numbers: I, + lines: &str, +) -> WriteOperation { + let mut tables_by_name = lines_to_batches(lines, 0).expect("invalid LP"); + assert_eq!( + tables_by_name.len(), + table_sequence_numbers.len(), + "number of tables in LP does not match number of table_sequence_numbers" + ); + + let mut seen_sequence_numbers = HashSet::::new(); + + let tables_by_id = table_sequence_numbers + .map(|(table_name, table_id, sequence_number)| { + let mb = tables_by_name + .remove(table_name) + .expect("table name does not exist in LP"); + assert!( + seen_sequence_numbers.insert(sequence_number), + "duplicate sequence number {sequence_number:?} observed" + ); + ( + table_id, + TableData::new(table_id, PartitionedData::new(sequence_number, mb)), + ) + }) + .collect(); + + WriteOperation::new(namespace_id, tables_by_id, partition_key.clone(), None) +} + pub(crate) async fn populate_catalog( catalog: &dyn Catalog, namespace: &str, @@ -332,17 +379,18 @@ pub(crate) async fn populate_catalog( /// Assert `a` and `b` have identical metadata, and that when converting /// them to Arrow batches they produces identical output. #[track_caller] -pub(crate) fn assert_dml_writes_eq(a: WriteOperation, b: WriteOperation) { +pub(crate) fn assert_write_ops_eq(a: WriteOperation, b: WriteOperation) { assert_eq!(a.namespace(), b.namespace(), "namespace"); assert_eq!(a.tables().count(), b.tables().count(), "table count"); assert_eq!(a.partition_key(), b.partition_key(), "partition key"); // Assert sequence numbers were reassigned for (a_table, b_table) in a.tables().zip(b.tables()) { + assert_eq!(a_table.0, b_table.0, "table id mismatch"); assert_eq!( a_table.1.partitioned_data().sequence_number(), b_table.1.partitioned_data().sequence_number(), - "sequence number" + "sequence number mismatch" ); } diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 1c6fb8bbd1..ae0bfe8848 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -696,22 +696,25 @@ mod tests { let wal = Wal::new(&dir.path()).await.unwrap(); let w1 = test_data("m1,t=foo v=1i 1"); - let w2 = test_data("m1,t=foo v=2i 2"); + // Use multiple tables for a write to test per-partition sequencing is preserved + let w2 = test_data("m1,t=foo v=2i 2\nm2,u=bar v=1i 1"); let op1 = SequencedWalOp { table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(), op: WalOp::Write(w1), }; let op2 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)] + .into_iter() + .collect(), op: WalOp::Write(w2), }; let op3 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Delete(test_delete()), }; let op4 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Persist(test_persist()), }; @@ -732,7 +735,7 @@ mod tests { // Assert the set has recorded the op IDs. // // Note that one op has a duplicate sequence number above! - assert_eq!(ids.len(), 3); + assert_eq!(ids.len(), 4); // Assert the sequence number set contains the specified ops. let ids = ids.iter().collect::>(); @@ -742,6 +745,7 @@ mod tests { SequenceNumber::new(0), SequenceNumber::new(1), SequenceNumber::new(2), + SequenceNumber::new(3), ] ); @@ -752,9 +756,11 @@ mod tests { .collect::>>(), [ [(TableId::new(0), 0)].into_iter().collect(), - [(TableId::new(0), 1)].into_iter().collect(), - [(TableId::new(0), 2)].into_iter().collect(), - [(TableId::new(0), 2)].into_iter().collect(), + [(TableId::new(0), 1), (TableId::new(1), 2)] + .into_iter() + .collect(), + [(TableId::new(0), 3)].into_iter().collect(), + [(TableId::new(0), 3)].into_iter().collect(), ] .into_iter() .collect::>>(), @@ -807,7 +813,7 @@ mod tests { let wal = Wal::new(dir.path()).await.unwrap(); let w1 = test_data("m1,t=foo v=1i 1"); - let w2 = test_data("m2,u=foo w=2i 2"); + let w2 = test_data("m1,t=foo v=2i 2\nm2,u=foo w=2i 2"); let w3 = test_data("m1,t=foo v=3i 3"); let op1 = SequencedWalOp { @@ -815,20 +821,22 @@ mod tests { op: WalOp::Write(w1.to_owned()), }; let op2 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 1), (TableId::new(1), 2)] + .into_iter() + .collect(), op: WalOp::Write(w2.to_owned()), }; let op3 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Delete(test_delete()), }; let op4 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Persist(test_persist()), }; // A third write entry coming after a delete and persist entry must still be yielded let op5 = SequencedWalOp { - table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 4)].into_iter().collect(), op: WalOp::Write(w3.to_owned()), }; diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index b2580ab910..331c9b49bf 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -22,16 +22,16 @@ async fn crud() { ); // Can write an entry to the open segment - let op = arbitrary_sequenced_wal_op(42); + let op = arbitrary_sequenced_wal_op([42, 43]); let summary = unwrap_summary(wal.write_op(op)).await; - assert_eq!(summary.total_bytes, 126); - assert_eq!(summary.bytes_written, 110); + assert_eq!(summary.total_bytes, 140); + assert_eq!(summary.bytes_written, 124); // Can write another entry; total_bytes accumulates - let op = arbitrary_sequenced_wal_op(43); + let op = arbitrary_sequenced_wal_op([44, 45]); let summary = unwrap_summary(wal.write_op(op)).await; - assert_eq!(summary.total_bytes, 236); - assert_eq!(summary.bytes_written, 110); + assert_eq!(summary.total_bytes, 264); + assert_eq!(summary.bytes_written, 124); // Still no closed segments let closed = wal.closed_segments(); @@ -42,10 +42,15 @@ async fn crud() { // Can't read entries from the open segment; have to rotate first let (closed_segment_details, ids) = wal.rotate().unwrap(); - assert_eq!(closed_segment_details.size(), 236); + assert_eq!(closed_segment_details.size(), 264); assert_eq!( ids.iter().collect::>(), - [SequenceNumber::new(42), SequenceNumber::new(43)] + [ + SequenceNumber::new(42), + SequenceNumber::new(43), + SequenceNumber::new(44), + SequenceNumber::new(45) + ] ); // There's one closed segment @@ -53,20 +58,25 @@ async fn crud() { let closed_segment_ids: Vec<_> = closed.iter().map(|c| c.id()).collect(); assert_eq!(closed_segment_ids, &[closed_segment_details.id()]); - // Can read the written entries from the closed segment, - // ensuring the per-partition sequence numbers match up to the current - // op-level sequence number while it is the source of truth. + // Can read the written entries from the closed segment, ensuring that the + // per-partition sequence numbers are preserved. let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap(); - let op = reader.next().unwrap().unwrap(); - op[0] + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 42)); - let op = reader.next().unwrap().unwrap(); - op[0] + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([42, 43]),); + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 43)); + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([44, 45]),); // Can delete a segment, leaving no closed segments again wal.delete(closed_segment_details.id()).await.unwrap(); @@ -85,10 +95,10 @@ async fn replay() { // WAL. { let wal = wal::Wal::new(dir.path()).await.unwrap(); - let op = arbitrary_sequenced_wal_op(42); + let op = arbitrary_sequenced_wal_op([42]); let _ = unwrap_summary(wal.write_op(op)).await; wal.rotate().unwrap(); - let op = arbitrary_sequenced_wal_op(43); + let op = arbitrary_sequenced_wal_op([43, 44]); let _ = unwrap_summary(wal.write_op(op)).await; } @@ -102,22 +112,27 @@ async fn replay() { assert_eq!(closed_segment_ids.len(), 2); // Can read the written entries from the previously closed segment - // ensuring the per-partition sequence numbers match up to the current - // op-level sequence number while it is the source of truth. + // ensuring the per-partition sequence numbers are preserved. let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap(); - let op = reader.next().unwrap().unwrap(); - op[0] + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 42)); + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([42])); // Can read the written entries from the previously open segment let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap(); - let op = reader.next().unwrap().unwrap(); - op[0] + let mut op = reader.next().unwrap().unwrap(); + let mut got_sequence_numbers = op + .remove(0) .table_write_sequence_numbers - .values() - .for_each(|sequence_number| assert_eq!(*sequence_number, 43)); + .into_values() + .collect::>(); + got_sequence_numbers.sort(); + assert_eq!(got_sequence_numbers, Vec::::from([43, 44])); } #[tokio::test] @@ -128,17 +143,20 @@ async fn ordering() { { let wal = wal::Wal::new(dir.path()).await.unwrap(); - let op = arbitrary_sequenced_wal_op(42); + let op = arbitrary_sequenced_wal_op([42, 43]); let _ = unwrap_summary(wal.write_op(op)).await; let (_, ids) = wal.rotate().unwrap(); - assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(42)]); + assert_eq!( + ids.iter().collect::>(), + [SequenceNumber::new(42), SequenceNumber::new(43)] + ); - let op = arbitrary_sequenced_wal_op(43); + let op = arbitrary_sequenced_wal_op([44]); let _ = unwrap_summary(wal.write_op(op)).await; let (_, ids) = wal.rotate().unwrap(); - assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(43)]); + assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(44)]); - let op = arbitrary_sequenced_wal_op(44); + let op = arbitrary_sequenced_wal_op([45]); let _ = unwrap_summary(wal.write_op(op)).await; } @@ -162,15 +180,21 @@ async fn ordering() { assert!(ids.is_empty()); } -// TODO(savage): This needs changing to generate multiple partitioned sequence numbers for each -// write. -fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp { - let w = test_data("m1,t=foo v=1i 1"); +fn arbitrary_sequenced_wal_op>(sequence_numbers: I) -> SequencedWalOp { + let sequence_numbers = sequence_numbers.into_iter().collect::>(); + let lp = sequence_numbers + .iter() + .enumerate() + .fold(String::new(), |string, (idx, _)| { + string + &format!("m{},t=foo v=1i 1\n", idx) + }); + let w = test_data(lp.as_str()); SequencedWalOp { table_write_sequence_numbers: w .table_batches .iter() - .map(|table_batch| (TableId::new(table_batch.table_id), sequence_number)) + .zip(sequence_numbers.iter()) + .map(|(table_batch, &id)| (TableId::new(table_batch.table_id), id)) .collect(), op: WalOp::Write(w), } From 2da99f8032114b343e7cdd05568de515b99df2dc Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Fri, 30 Jun 2023 16:04:42 +0100 Subject: [PATCH 4/6] refactor: Use `const` instead of unnecessary lazy_static Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- ingester/src/init/wal_replay.rs | 9 +++------ ingester/src/server/grpc/rpc_write.rs | 5 +---- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/ingester/src/init/wal_replay.rs b/ingester/src/init/wal_replay.rs index 2eed3cf369..b88e270539 100644 --- a/ingester/src/init/wal_replay.rs +++ b/ingester/src/init/wal_replay.rs @@ -271,7 +271,6 @@ mod tests { use assert_matches::assert_matches; use async_trait::async_trait; - use lazy_static::lazy_static; use metric::{Attributes, Metric}; use parking_lot::Mutex; use wal::Wal; @@ -312,9 +311,7 @@ mod tests { } } - lazy_static! { - static ref ALTERNATIVE_TABLE_NAME: &'static str = "arán"; - } + const ALTERNATIVE_TABLE_NAME: &str = "arán"; #[tokio::test] async fn test_replay() { @@ -357,7 +354,7 @@ mod tests { SequenceNumber::new(42), ), ( - &ALTERNATIVE_TABLE_NAME, + ALTERNATIVE_TABLE_NAME, TableId::new(ARBITRARY_TABLE_ID.get() + 1), SequenceNumber::new(43), ), @@ -367,7 +364,7 @@ mod tests { &format!( r#"{},region=Asturias temp=15 4242424242 {},region=Mayo temp=12 4242424242"#, - &*ARBITRARY_TABLE_NAME, &*ALTERNATIVE_TABLE_NAME, + &*ARBITRARY_TABLE_NAME, ALTERNATIVE_TABLE_NAME, ), ); diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs index be02dbd92a..3a6ac27c95 100644 --- a/ingester/src/server/grpc/rpc_write.rs +++ b/ingester/src/server/grpc/rpc_write.rs @@ -227,7 +227,6 @@ mod tests { column::{SemanticType, Values}, Column, DatabaseBatch, TableBatch, }; - use lazy_static::lazy_static; use std::{collections::HashSet, sync::Arc}; use super::*; @@ -239,9 +238,7 @@ mod tests { const PERSIST_QUEUE_DEPTH: usize = 42; - lazy_static! { - static ref ALTERNATIVE_TABLE_ID: TableId = TableId::new(76); - } + const ALTERNATIVE_TABLE_ID: TableId = TableId::new(76); macro_rules! test_rpc_write { ( From 0978aa0551c088871bac7424843db717af0f481f Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Mon, 10 Jul 2023 15:13:37 +0100 Subject: [PATCH 5/6] fix(e2e): Add small busy-loop to debug::build_catalog test to assert only on non-empty results --- influxdb_iox/tests/end_to_end_cases/debug.rs | 30 ++++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/debug.rs b/influxdb_iox/tests/end_to_end_cases/debug.rs index 90e88a9d17..e02c73dff6 100644 --- a/influxdb_iox/tests/end_to_end_cases/debug.rs +++ b/influxdb_iox/tests/end_to_end_cases/debug.rs @@ -1,5 +1,5 @@ //! Tests the `influxdb_iox debug` commands -use std::path::Path; +use std::{path::Path, time::Duration}; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_sorted_eq; @@ -7,6 +7,7 @@ use assert_cmd::Command; use futures::FutureExt; use predicates::prelude::*; use tempfile::TempDir; +use test_helpers::timeout::FutureTimeout; use test_helpers_end_to_end::{ maybe_skip_integration, run_sql, MiniCluster, ServerFixture, Step, StepTest, StepTestState, TestConfig, @@ -104,14 +105,20 @@ async fn build_catalog() { // We can build a catalog and start up the server and run a query let restarted = RestartedServer::build_catalog_and_start(&table_dir).await; - let batches = restarted.run_sql(sql, &namespace).await; + let batches = run_sql_until_non_empty(&restarted, sql, namespace.as_str()) + .with_timeout(Duration::from_secs(2)) + .await + .expect("timed out waiting for non-empty batches in result"); assert_batches_sorted_eq!(&expected, &batches); // We can also rebuild a catalog from just the parquet files let only_parquet_dir = copy_only_parquet_files(&table_dir); let restarted = RestartedServer::build_catalog_and_start(only_parquet_dir.path()).await; - let batches = restarted.run_sql(sql, &namespace).await; + let batches = run_sql_until_non_empty(&restarted, sql, namespace.as_str()) + .with_timeout(Duration::from_secs(2)) + .await + .expect("timed out waiting for non-empty batches in result"); assert_batches_sorted_eq!(&expected, &batches); } .boxed() @@ -122,6 +129,23 @@ async fn build_catalog() { .await } +/// Loops forever, running the SQL query against the [`RestartedServer`] given +/// until the result is non-empty. Callers are responsible for timing out the +/// function. +async fn run_sql_until_non_empty( + restarted: &RestartedServer, + sql: &str, + namespace: &str, +) -> Vec { + loop { + let batches = restarted.run_sql(sql, namespace).await; + if !batches.is_empty() { + return batches; + } + tokio::task::yield_now().await; + } +} + /// An all in one instance, with data directory of `data_dir` struct RestartedServer { all_in_one: ServerFixture, From dec0244bffcfdd1569edfb3c989009841e207377 Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Mon, 10 Jul 2023 15:27:30 +0100 Subject: [PATCH 6/6] refactor(e2e): Wait 100ms between queries in debug::build_catalog test --- influxdb_iox/tests/end_to_end_cases/debug.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_iox/tests/end_to_end_cases/debug.rs b/influxdb_iox/tests/end_to_end_cases/debug.rs index e02c73dff6..da50bc0c87 100644 --- a/influxdb_iox/tests/end_to_end_cases/debug.rs +++ b/influxdb_iox/tests/end_to_end_cases/debug.rs @@ -142,7 +142,7 @@ async fn run_sql_until_non_empty( if !batches.is_empty() { return batches; } - tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(100)).await; } }