diff --git a/data_types/src/sequence_number_set.rs b/data_types/src/sequence_number_set.rs index 306b388fd4..3c496f18c3 100644 --- a/data_types/src/sequence_number_set.rs +++ b/data_types/src/sequence_number_set.rs @@ -73,6 +73,18 @@ impl TryFrom<&[u8]> for SequenceNumberSet { } } +impl Extend for SequenceNumberSet { + fn extend>(&mut self, iter: T) { + self.0.extend(iter.into_iter().map(|v| v.get() as _)) + } +} + +impl FromIterator for SequenceNumberSet { + fn from_iter>(iter: T) -> Self { + Self(iter.into_iter().map(|v| v.get() as _).collect()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -114,4 +126,33 @@ mod tests { a.remove(SequenceNumber::new(1)); assert_eq!(a.len(), 0); } + + #[test] + fn test_extend() { + let mut a = SequenceNumberSet::default(); + a.add(SequenceNumber::new(42)); + + let extend_set = [SequenceNumber::new(4), SequenceNumber::new(2)]; + + assert!(a.contains(SequenceNumber::new(42))); + assert!(!a.contains(SequenceNumber::new(4))); + assert!(!a.contains(SequenceNumber::new(2))); + + a.extend(extend_set); + + assert!(a.contains(SequenceNumber::new(42))); + assert!(a.contains(SequenceNumber::new(4))); + assert!(a.contains(SequenceNumber::new(2))); + } + + #[test] + fn test_collect() { + let collect_set = [SequenceNumber::new(4), SequenceNumber::new(2)]; + + let a = collect_set.into_iter().collect::(); + + assert!(!a.contains(SequenceNumber::new(42))); + assert!(a.contains(SequenceNumber::new(4))); + assert!(a.contains(SequenceNumber::new(2))); + } } diff --git a/ingester2/src/wal/rotate_task.rs b/ingester2/src/wal/rotate_task.rs index 19582f1269..6ac94a4857 100644 --- a/ingester2/src/wal/rotate_task.rs +++ b/ingester2/src/wal/rotate_task.rs @@ -26,10 +26,11 @@ pub(crate) async fn periodic_rotation( interval.tick().await; info!("rotating wal file"); - let stats = wal.rotate().expect("failed to rotate WAL"); + let (stats, ids) = wal.rotate().expect("failed to rotate WAL"); debug!( closed_id = %stats.id(), segment_bytes = stats.size(), + n_ops = ids.len(), "rotated wal" ); @@ -127,6 +128,8 @@ pub(crate) async fn periodic_rotation( info!( closed_id = %stats.id(), + file_bytes = stats.size(), + n_ops = ids.len(), "dropped persisted wal segment" ); } diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 2e0842751a..cdd0fb53a6 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -14,6 +14,7 @@ use crate::blocking::{ ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter, }; +use data_types::sequence_number_set::SequenceNumberSet; use generated_types::{ google::{FieldViolation, OptionalField}, influxdata::iox::wal::v1::{ @@ -274,6 +275,7 @@ impl Wal { segments: Arc::new(Mutex::new(Segments { closed_segments, open_segment, + open_segment_ids: SequenceNumberSet::default(), })), next_id_source, buffer: Mutex::new(buffer), @@ -299,16 +301,18 @@ impl Wal { ClosedSegmentFileReader::from_path(path) } - /// Writes one [`SequencedWalOp`] to the buffer and returns a watch channel for when the buffer - /// is flushed and fsync'd to disk. + /// Writes one [`SequencedWalOp`] to the buffer and returns a watch channel + /// for when the buffer is flushed and fsync'd to disk. pub fn write_op(&self, op: SequencedWalOp) -> watch::Receiver> { let mut b = self.buffer.lock(); b.ops.push(op); b.flush_notification.clone() } - /// Closes the currently open segment and opens a new one, returning the closed segment details. - pub fn rotate(&self) -> Result { + /// Closes the currently open segment and opens a new one, returning the + /// closed segment details, including the [`SequenceNumberSet`] containing + /// the sequence numbers of the writes within the closed segment. + pub fn rotate(&self) -> Result<(ClosedSegment, SequenceNumberSet)> { let new_open_segment = OpenSegmentFileWriter::new_in_directory(&self.root, Arc::clone(&self.next_id_source)) .context(UnableToCreateSegmentFileSnafu)?; @@ -316,15 +320,16 @@ impl Wal { let mut segments = self.segments.lock(); let closed = std::mem::replace(&mut segments.open_segment, new_open_segment); - let closed = closed.close().expect("should convert to closed segmet"); + let seqnum_set = std::mem::take(&mut segments.open_segment_ids); + let closed = closed.close().expect("should convert to closed segment"); let previous_value = segments.closed_segments.insert(closed.id(), closed.clone()); assert!( previous_value.is_none(), - "Should always add new closed segment entries, not replace" + "should always add new closed segment entries, not replace" ); - Ok(closed) + Ok((closed, seqnum_set)) } async fn flush_buffer_background_task(&self) { @@ -402,6 +407,7 @@ impl std::fmt::Debug for Wal { struct Segments { closed_segments: BTreeMap, open_segment: OpenSegmentFileWriter, + open_segment_ids: SequenceNumberSet, } #[derive(Debug)] @@ -554,7 +560,7 @@ impl ClosedSegment { #[cfg(test)] mod tests { use super::*; - use data_types::{NamespaceId, TableId}; + use data_types::{NamespaceId, SequenceNumber, TableId}; use dml::DmlWrite; use generated_types::influxdata::{ iox::{delete::v1::DeletePayload, wal::v1::PersistOp}, @@ -592,7 +598,7 @@ mod tests { wal.write_op(op3.clone()); wal.write_op(op4.clone()).changed().await.unwrap(); - let closed = wal.rotate().unwrap(); + let (closed, ids) = wal.rotate().unwrap(); let mut reader = wal.reader_for_segment(closed.id).unwrap(); @@ -601,6 +607,22 @@ mod tests { ops.append(&mut batch); } assert_eq!(vec![op1, op2, op3, op4], ops); + + // Assert the set has recorded the op IDs. + // + // Note that one op has a duplicate sequence number above! + assert_eq!(ids.len(), 3); + + // Assert the sequence number set contains the specified ops. + let ids = ids.iter().collect::>(); + assert_eq!( + ids, + [ + SequenceNumber::new(0), + SequenceNumber::new(1), + SequenceNumber::new(2), + ] + ) } // open wal with files that aren't segments (should log and skip) @@ -621,8 +643,9 @@ mod tests { ); // No writes, but rotating is totally fine - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.size(), 16); + assert!(ids.is_empty()); // There's one closed segment let closed = wal.closed_segments(); diff --git a/wal/src/writer_thread.rs b/wal/src/writer_thread.rs index 10a7e2291d..3a80a978ea 100644 --- a/wal/src/writer_thread.rs +++ b/wal/src/writer_thread.rs @@ -1,5 +1,6 @@ use std::{sync::Arc, thread::JoinHandle}; +use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber}; use generated_types::influxdata::iox::wal::v1 as proto; use observability_deps::tracing::{debug, error}; use parking_lot::Mutex; @@ -115,12 +116,16 @@ impl WriterIoThread { } }; - // Encode the batch into the proto types - let ops: Vec<_> = batch + // Encode the batch into the proto types, and extract the + // SequenceNumberSet for this batch. + let (ops, ids): (Vec<_>, SequenceNumberSet) = batch .ops .into_iter() - .map(proto::SequencedWalOp::from) - .collect(); + .map(|v| { + let id = SequenceNumber::new(v.sequence_number as _); + (proto::SequencedWalOp::from(v), id) + }) + .unzip(); let proto_batch = proto::WalOpBatch { ops }; // Generate the binary protobuf message, storing it into proto_data @@ -128,24 +133,33 @@ impl WriterIoThread { .encode(&mut proto_data) .expect("encoding batch into vec cannot fail"); - // Write the serialised data to the current open segment file. - let res = { + // Obtain the segments lock - this prevents concurrent rotation, but + // has no impact on concurrent writers. + { + // Write the serialised data to the current open segment file. let mut segments = self.segments.lock(); match segments.open_segment.write(&proto_data) { - Ok(summary) => WriteResult::Ok(summary), + Ok(summary) => { + // Broadcast the result to all writers to this batch. + // + // Do not panic if no thread is waiting for the flush + // notification - this may be the case if all writes + // disconnected before the WAL was flushed. + let _ = batch.notify_flush.send(Some(WriteResult::Ok(summary))); + + // Now the blocked writers are making progress, union + // this batch sequence number set with the cumulative + // segment file set before releasing the segment lock. + segments.open_segment_ids.add_set(&ids); + } Err(e) => { error!(error=%e, "failed to write WAL batch"); - WriteResult::Err(e.to_string()) + let _ = batch + .notify_flush + .send(Some(WriteResult::Err(e.to_string()))); } - } + }; }; - - // Broadcast the result to all writers to this batch. - // - // Do not panic if no thread is waiting for the flush notification - - // this may be the case if all writes disconnected before the WAL - // was flushed. - let _ = batch.notify_flush.send(Some(res)); } } } diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index 5274142fd4..400ea0e306 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -1,4 +1,4 @@ -use data_types::{NamespaceId, TableId}; +use data_types::{NamespaceId, SequenceNumber, TableId}; use dml::DmlWrite; use generated_types::influxdata::{ iox::wal::v1::sequenced_wal_op::Op as WalOp, @@ -41,8 +41,12 @@ async fn crud() { ); // Can't read entries from the open segment; have to rotate first - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.size(), 232); + assert_eq!( + ids.iter().collect::>(), + [SequenceNumber::new(42), SequenceNumber::new(43)] + ); // There's one closed segment let closed = wal.closed_segments(); @@ -110,11 +114,13 @@ async fn ordering() { let op = arbitrary_sequenced_wal_op(42); let _ = unwrap_summary(wal.write_op(op)).await; - wal.rotate().unwrap(); + let (_, ids) = wal.rotate().unwrap(); + assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(42)]); let op = arbitrary_sequenced_wal_op(43); let _ = unwrap_summary(wal.write_op(op)).await; - wal.rotate().unwrap(); + let (_, ids) = wal.rotate().unwrap(); + assert_eq!(ids.iter().collect::>(), [SequenceNumber::new(43)]); let op = arbitrary_sequenced_wal_op(44); let _ = unwrap_summary(wal.write_op(op)).await; @@ -130,12 +136,14 @@ async fn ordering() { assert_eq!(closed_segment_ids, &[0, 1, 2]); // The open segment is next in order - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.id().get(), 3); + assert!(ids.is_empty()); // Creating new files after replay are later in the ordering - let closed_segment_details = wal.rotate().unwrap(); + let (closed_segment_details, ids) = wal.rotate().unwrap(); assert_eq!(closed_segment_details.id().get(), 4); + assert!(ids.is_empty()); } fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {