From 6532fb752b6e6c0bb22c0471099aaa7f9f01f608 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 22 Feb 2023 14:13:32 +0100 Subject: [PATCH 1/4] feat: impl Extend for SequenceNumberSet Allow a SequenceNumberSet to be efficiently extended from any iterator of SequenceNumber instances. --- data_types/src/sequence_number_set.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/data_types/src/sequence_number_set.rs b/data_types/src/sequence_number_set.rs index 306b388fd4..3576ad86ab 100644 --- a/data_types/src/sequence_number_set.rs +++ b/data_types/src/sequence_number_set.rs @@ -73,6 +73,12 @@ impl TryFrom<&[u8]> for SequenceNumberSet { } } +impl Extend for SequenceNumberSet { + fn extend>(&mut self, iter: T) { + self.0.extend(iter.into_iter().map(|v| v.get() as _)) + } +} + #[cfg(test)] mod tests { use super::*; @@ -114,4 +120,22 @@ mod tests { a.remove(SequenceNumber::new(1)); assert_eq!(a.len(), 0); } + + #[test] + fn test_extend() { + let mut a = SequenceNumberSet::default(); + a.add(SequenceNumber::new(42)); + + let extend_set = [SequenceNumber::new(4), SequenceNumber::new(2)]; + + assert!(a.contains(SequenceNumber::new(42))); + assert!(!a.contains(SequenceNumber::new(4))); + assert!(!a.contains(SequenceNumber::new(2))); + + a.extend(extend_set); + + assert!(a.contains(SequenceNumber::new(42))); + assert!(a.contains(SequenceNumber::new(4))); + assert!(a.contains(SequenceNumber::new(2))); + } } From 6aa33ef380c1be337b64030d769e903b1b1f010f Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 22 Feb 2023 14:19:53 +0100 Subject: [PATCH 2/4] feat: impl FromIterator for SequenceNumberSet Allow a SequenceNumberSet to be instantiated from an iterator of SequenceNumber. --- data_types/src/sequence_number_set.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/data_types/src/sequence_number_set.rs b/data_types/src/sequence_number_set.rs index 3576ad86ab..3c496f18c3 100644 --- a/data_types/src/sequence_number_set.rs +++ b/data_types/src/sequence_number_set.rs @@ -79,6 +79,12 @@ impl Extend for SequenceNumberSet { } } +impl FromIterator for SequenceNumberSet { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().map(|v| v.get() as _).collect()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -138,4 +144,15 @@ mod tests { assert!(a.contains(SequenceNumber::new(4))); assert!(a.contains(SequenceNumber::new(2))); } + + #[test] + fn test_collect() { + let collect_set = [SequenceNumber::new(4), SequenceNumber::new(2)]; + + let a = collect_set.into_iter().collect::(); + + assert!(!a.contains(SequenceNumber::new(42))); + assert!(a.contains(SequenceNumber::new(4))); + assert!(a.contains(SequenceNumber::new(2))); + } } From b22643350f897fddf0c2c34cac87482292e1f4e2 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 1 Mar 2023 17:22:46 +0100 Subject: [PATCH 3/4] refactor(wal): track segment sequence numbers Changes the WAL to maintain a SequenceNumberSet containing every ID wrote to the currently open segment file. The sets are derived from batched data for efficiency, rather than recorded per write, to prevent any overhead in the hot path. The batch set is merged with the file set off the hot path, in a separate I/O thread (not the async runtime). --- wal/src/lib.rs | 8 +++++-- wal/src/writer_thread.rs | 46 ++++++++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 05e35c044d..5bf7a1da60 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -14,6 +14,7 @@ use crate::blocking::{ ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter, }; +use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber}; use generated_types::{ google::{FieldViolation, OptionalField}, influxdata::iox::wal::v1::{ @@ -274,6 +275,7 @@ impl Wal { segments: Arc::new(Mutex::new(Segments { closed_segments, open_segment, + open_segment_ids: SequenceNumberSet::default(), })), next_id_source, buffer: Mutex::new(buffer), @@ -316,12 +318,13 @@ impl Wal { let mut segments = self.segments.lock(); let closed = std::mem::replace(&mut segments.open_segment, new_open_segment); - let closed = closed.close().expect("should convert to closed segmet"); + let _seqnum_set = std::mem::take(&mut segments.open_segment_ids); + let closed = closed.close().expect("should convert to closed segment"); let previous_value = segments.closed_segments.insert(closed.id(), closed.clone()); assert!( previous_value.is_none(), - "Should always add new closed segment entries, not replace" + "should always add new closed segment entries, not replace" ); Ok(closed) @@ -381,6 +384,7 @@ impl std::fmt::Debug for Wal { struct Segments { closed_segments: BTreeMap, open_segment: OpenSegmentFileWriter, + open_segment_ids: SequenceNumberSet, } #[derive(Debug)] diff --git a/wal/src/writer_thread.rs b/wal/src/writer_thread.rs index 10a7e2291d..3a80a978ea 100644 --- a/wal/src/writer_thread.rs +++ b/wal/src/writer_thread.rs @@ -1,5 +1,6 @@ use std::{sync::Arc, thread::JoinHandle}; +use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber}; use generated_types::influxdata::iox::wal::v1 as proto; use observability_deps::tracing::{debug, error}; use parking_lot::Mutex; @@ -115,12 +116,16 @@ impl WriterIoThread { } }; - // Encode the batch into the proto types - let ops: Vec<_> = batch + // Encode the batch into the proto types, and extract the + // SequenceNumberSet for this batch. + let (ops, ids): (Vec<_>, SequenceNumberSet) = batch .ops .into_iter() - .map(proto::SequencedWalOp::from) - .collect(); + .map(|v| { + let id = SequenceNumber::new(v.sequence_number as _); + (proto::SequencedWalOp::from(v), id) + }) + .unzip(); let proto_batch = proto::WalOpBatch { ops }; // Generate the binary protobuf message, storing it into proto_data @@ -128,24 +133,33 @@ impl WriterIoThread { .encode(&mut proto_data) .expect("encoding batch into vec cannot fail"); - // Write the serialised data to the current open segment file. - let res = { + // Obtain the segments lock - this prevents concurrent rotation, but + // has no impact on concurrent writers. + { + // Write the serialised data to the current open segment file. let mut segments = self.segments.lock(); match segments.open_segment.write(&proto_data) { - Ok(summary) => WriteResult::Ok(summary), + Ok(summary) => { + // Broadcast the result to all writers to this batch. + // + // Do not panic if no thread is waiting for the flush + // notification - this may be the case if all writes + // disconnected before the WAL was flushed. + let _ = batch.notify_flush.send(Some(WriteResult::Ok(summary))); + + // Now the blocked writers are making progress, union + // this batch sequence number set with the cumulative + // segment file set before releasing the segment lock. + segments.open_segment_ids.add_set(&ids); + } Err(e) => { error!(error=%e, "failed to write WAL batch"); - WriteResult::Err(e.to_string()) + let _ = batch + .notify_flush + .send(Some(WriteResult::Err(e.to_string()))); } - } + }; }; - - // Broadcast the result to all writers to this batch. - // - // Do not panic if no thread is waiting for the flush notification - - // this may be the case if all writes disconnected before the WAL - // was flushed. - let _ = batch.notify_flush.send(Some(res)); } } } From 0b40e0d17cc498cf63eb14f626198b1819b1b059 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 1 Mar 2023 17:32:22 +0100 Subject: [PATCH 4/4] feat(wal): SequenceNumberSet for rotated file Changes Wal::rotate() to return the SequenceNumberSet containing the IDs of all writes in the segment file that is rotated out. --- ingester2/src/wal/rotate_task.rs | 5 +++- wal/src/lib.rs | 39 ++++++++++++++++++++++++-------- wal/tests/end_to_end.rs | 20 +++++++++++----- 3 files changed, 47 insertions(+), 17 deletions(-) diff --git a/ingester2/src/wal/rotate_task.rs b/ingester2/src/wal/rotate_task.rs index 19582f1269..6ac94a4857 100644 --- a/ingester2/src/wal/rotate_task.rs +++ b/ingester2/src/wal/rotate_task.rs @@ -26,10 +26,11 @@ pub(crate) async fn periodic_rotation( interval.tick().await; info!("rotating wal file"); - let stats = wal.rotate().expect("failed to rotate WAL"); + let (stats, ids) = wal.rotate().expect("failed to rotate WAL"); debug!( closed_id = %stats.id(), segment_bytes = stats.size(), + n_ops = ids.len(), "rotated wal" ); @@ -127,6 +128,8 @@ pub(crate) async fn periodic_rotation( info!( closed_id = %stats.id(), + file_bytes = stats.size(), + n_ops = ids.len(), "dropped persisted wal segment" ); } diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 5bf7a1da60..36a087a8b0 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -14,7 +14,7 @@ use crate::blocking::{ ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter, }; -use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber}; +use data_types::sequence_number_set::SequenceNumberSet; use generated_types::{ google::{FieldViolation, OptionalField}, influxdata::iox::wal::v1::{ @@ -301,16 +301,18 @@ impl Wal { ClosedSegmentFileReader::from_path(path) } - /// Writes one [`SequencedWalOp`] to the buffer and returns a watch channel for when the buffer - /// is flushed and fsync'd to disk. + /// Writes one [`SequencedWalOp`] to the buffer and returns a watch channel + /// for when the buffer is flushed and fsync'd to disk. pub fn write_op(&self, op: SequencedWalOp) -> watch::Receiver> { let mut b = self.buffer.lock(); b.ops.push(op); b.flush_notification.clone() } - /// Closes the currently open segment and opens a new one, returning the closed segment details. - pub fn rotate(&self) -> Result { + /// Closes the currently open segment and opens a new one, returning the + /// closed segment details, including the [`SequenceNumberSet`] containing + /// the sequence numbers of the writes within the closed segment. + pub fn rotate(&self) -> Result<(ClosedSegment, SequenceNumberSet)> { let new_open_segment = OpenSegmentFileWriter::new_in_directory(&self.root, Arc::clone(&self.next_id_source)) .context(UnableToCreateSegmentFileSnafu)?; @@ -318,7 +320,7 @@ impl Wal { let mut segments = self.segments.lock(); let closed = std::mem::replace(&mut segments.open_segment, new_open_segment); - let _seqnum_set = std::mem::take(&mut segments.open_segment_ids); + let seqnum_set = std::mem::take(&mut segments.open_segment_ids); let closed = closed.close().expect("should convert to closed segment"); let previous_value = segments.closed_segments.insert(closed.id(), closed.clone()); @@ -327,7 +329,7 @@ impl Wal { "should always add new closed segment entries, not replace" ); - Ok(closed) + Ok((closed, seqnum_set)) } async fn flush_buffer_background_task(&self) { @@ -537,7 +539,7 @@ impl ClosedSegment { #[cfg(test)] mod tests { use super::*; - use data_types::{NamespaceId, TableId}; + use data_types::{NamespaceId, SequenceNumber, TableId}; use dml::DmlWrite; use generated_types::influxdata::{ iox::{delete::v1::DeletePayload, wal::v1::PersistOp}, @@ -575,7 +577,7 @@ mod tests { wal.write_op(op3.clone()); wal.write_op(op4.clone()).changed().await.unwrap(); - let closed = wal.rotate().unwrap(); + let (closed, ids) = wal.rotate().unwrap(); let mut reader = wal.reader_for_segment(closed.id).unwrap(); @@ -584,6 +586,22 @@ mod tests { ops.append(&mut batch); } assert_eq!(vec![op1, op2, op3, op4], ops); + + // Assert the set has recorded the op IDs. + // + // Note that one op has a duplicate sequence number above! + assert_eq!(ids.len(), 3); + + // Assert the sequence number set contains the specified ops. + let ids = ids.iter().collect::>(); + assert_eq!( + ids, + [ + SequenceNumber::new(0), + SequenceNumber::new(1), + SequenceNumber::new(2), + ] + ) } // open wal with files that aren't segments (should log and skip) @@ -604,8 +622,9 @@ mod tests { ); // No writes, but rotating is totally fine - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.size(), 16); + assert!(ids.is_empty()); // There's one closed segment let closed = wal.closed_segments(); diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index 5274142fd4..400ea0e306 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -1,4 +1,4 @@ -use data_types::{NamespaceId, TableId}; +use data_types::{NamespaceId, SequenceNumber, TableId}; use dml::DmlWrite; use generated_types::influxdata::{ iox::wal::v1::sequenced_wal_op::Op as WalOp, @@ -41,8 +41,12 @@ async fn crud() { ); // Can't read entries from the open segment; have to rotate first - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.size(), 232); + assert_eq!( + ids.iter().collect::>(), + [SequenceNumber::new(42), SequenceNumber::new(43)] + ); // There's one closed segment let closed = wal.closed_segments(); @@ -110,11 +114,13 @@ async fn ordering() { let op = arbitrary_sequenced_wal_op(42); let _ = unwrap_summary(wal.write_op(op)).await; - wal.rotate().unwrap(); + let (_, ids) = wal.rotate().unwrap(); + assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(42)]); let op = arbitrary_sequenced_wal_op(43); let _ = unwrap_summary(wal.write_op(op)).await; - wal.rotate().unwrap(); + let (_, ids) = wal.rotate().unwrap(); + assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(43)]); let op = arbitrary_sequenced_wal_op(44); let _ = unwrap_summary(wal.write_op(op)).await; @@ -130,12 +136,14 @@ async fn ordering() { assert_eq!(closed_segment_ids, &[0, 1, 2]); // The open segment is next in order - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.id().get(), 3); + assert!(ids.is_empty()); // Creating new files after replay are later in the ordering - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.id().get(), 4); + assert!(ids.is_empty()); } fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {