Merge pull request #7110 from influxdata/dom/record-wal-seqnum-sets

feat(wal): track set of write IDs in a segment file
pull/24376/head
Dom 2023-03-02 15:20:27 +00:00 committed by GitHub
commit c3d9219afc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 33 deletions

View File

@ -73,6 +73,18 @@ impl TryFrom<&[u8]> for SequenceNumberSet {
}
}
impl Extend<SequenceNumber> for SequenceNumberSet {
fn extend<T: IntoIterator<Item = SequenceNumber>>(&mut self, iter: T) {
self.0.extend(iter.into_iter().map(|v| v.get() as _))
}
}
impl FromIterator<SequenceNumber> for SequenceNumberSet {
fn from_iter<T: IntoIterator<Item = SequenceNumber>>(iter: T) -> Self {
Self(iter.into_iter().map(|v| v.get() as _).collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -114,4 +126,33 @@ mod tests {
a.remove(SequenceNumber::new(1));
assert_eq!(a.len(), 0);
}
#[test]
fn test_extend() {
let mut a = SequenceNumberSet::default();
a.add(SequenceNumber::new(42));
let extend_set = [SequenceNumber::new(4), SequenceNumber::new(2)];
assert!(a.contains(SequenceNumber::new(42)));
assert!(!a.contains(SequenceNumber::new(4)));
assert!(!a.contains(SequenceNumber::new(2)));
a.extend(extend_set);
assert!(a.contains(SequenceNumber::new(42)));
assert!(a.contains(SequenceNumber::new(4)));
assert!(a.contains(SequenceNumber::new(2)));
}
#[test]
fn test_collect() {
let collect_set = [SequenceNumber::new(4), SequenceNumber::new(2)];
let a = collect_set.into_iter().collect::<SequenceNumberSet>();
assert!(!a.contains(SequenceNumber::new(42)));
assert!(a.contains(SequenceNumber::new(4)));
assert!(a.contains(SequenceNumber::new(2)));
}
}

View File

@ -26,10 +26,11 @@ pub(crate) async fn periodic_rotation<T, P>(
interval.tick().await;
info!("rotating wal file");
let stats = wal.rotate().expect("failed to rotate WAL");
let (stats, ids) = wal.rotate().expect("failed to rotate WAL");
debug!(
closed_id = %stats.id(),
segment_bytes = stats.size(),
n_ops = ids.len(),
"rotated wal"
);
@ -127,6 +128,8 @@ pub(crate) async fn periodic_rotation<T, P>(
info!(
closed_id = %stats.id(),
file_bytes = stats.size(),
n_ops = ids.len(),
"dropped persisted wal segment"
);
}

View File

@ -14,6 +14,7 @@
use crate::blocking::{
ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter,
};
use data_types::sequence_number_set::SequenceNumberSet;
use generated_types::{
google::{FieldViolation, OptionalField},
influxdata::iox::wal::v1::{
@ -274,6 +275,7 @@ impl Wal {
segments: Arc::new(Mutex::new(Segments {
closed_segments,
open_segment,
open_segment_ids: SequenceNumberSet::default(),
})),
next_id_source,
buffer: Mutex::new(buffer),
@ -299,16 +301,18 @@ impl Wal {
ClosedSegmentFileReader::from_path(path)
}
/// Writes one [`SequencedWalOp`] to the buffer and returns a watch channel for when the buffer
/// is flushed and fsync'd to disk.
/// Writes one [`SequencedWalOp`] to the buffer and returns a watch channel
/// for when the buffer is flushed and fsync'd to disk.
pub fn write_op(&self, op: SequencedWalOp) -> watch::Receiver<Option<WriteResult>> {
let mut b = self.buffer.lock();
b.ops.push(op);
b.flush_notification.clone()
}
/// Closes the currently open segment and opens a new one, returning the closed segment details.
pub fn rotate(&self) -> Result<ClosedSegment> {
/// Closes the currently open segment and opens a new one, returning the
/// closed segment details, including the [`SequenceNumberSet`] containing
/// the sequence numbers of the writes within the closed segment.
pub fn rotate(&self) -> Result<(ClosedSegment, SequenceNumberSet)> {
let new_open_segment =
OpenSegmentFileWriter::new_in_directory(&self.root, Arc::clone(&self.next_id_source))
.context(UnableToCreateSegmentFileSnafu)?;
@ -316,15 +320,16 @@ impl Wal {
let mut segments = self.segments.lock();
let closed = std::mem::replace(&mut segments.open_segment, new_open_segment);
let closed = closed.close().expect("should convert to closed segmet");
let seqnum_set = std::mem::take(&mut segments.open_segment_ids);
let closed = closed.close().expect("should convert to closed segment");
let previous_value = segments.closed_segments.insert(closed.id(), closed.clone());
assert!(
previous_value.is_none(),
"Should always add new closed segment entries, not replace"
"should always add new closed segment entries, not replace"
);
Ok(closed)
Ok((closed, seqnum_set))
}
async fn flush_buffer_background_task(&self) {
@ -402,6 +407,7 @@ impl std::fmt::Debug for Wal {
struct Segments {
closed_segments: BTreeMap<SegmentId, ClosedSegment>,
open_segment: OpenSegmentFileWriter,
open_segment_ids: SequenceNumberSet,
}
#[derive(Debug)]
@ -554,7 +560,7 @@ impl ClosedSegment {
#[cfg(test)]
mod tests {
use super::*;
use data_types::{NamespaceId, TableId};
use data_types::{NamespaceId, SequenceNumber, TableId};
use dml::DmlWrite;
use generated_types::influxdata::{
iox::{delete::v1::DeletePayload, wal::v1::PersistOp},
@ -592,7 +598,7 @@ mod tests {
wal.write_op(op3.clone());
wal.write_op(op4.clone()).changed().await.unwrap();
let closed = wal.rotate().unwrap();
let (closed, ids) = wal.rotate().unwrap();
let mut reader = wal.reader_for_segment(closed.id).unwrap();
@ -601,6 +607,22 @@ mod tests {
ops.append(&mut batch);
}
assert_eq!(vec![op1, op2, op3, op4], ops);
// Assert the set has recorded the op IDs.
//
// Note that one op has a duplicate sequence number above!
assert_eq!(ids.len(), 3);
// Assert the sequence number set contains the specified ops.
let ids = ids.iter().collect::<Vec<_>>();
assert_eq!(
ids,
[
SequenceNumber::new(0),
SequenceNumber::new(1),
SequenceNumber::new(2),
]
)
}
// open wal with files that aren't segments (should log and skip)
@ -621,8 +643,9 @@ mod tests {
);
// No writes, but rotating is totally fine
let closed_segment_details = wal.rotate().unwrap();
let (closed_segment_details, ids) = wal.rotate().unwrap();
assert_eq!(closed_segment_details.size(), 16);
assert!(ids.is_empty());
// There's one closed segment
let closed = wal.closed_segments();

View File

@ -1,5 +1,6 @@
use std::{sync::Arc, thread::JoinHandle};
use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber};
use generated_types::influxdata::iox::wal::v1 as proto;
use observability_deps::tracing::{debug, error};
use parking_lot::Mutex;
@ -115,12 +116,16 @@ impl WriterIoThread {
}
};
// Encode the batch into the proto types
let ops: Vec<_> = batch
// Encode the batch into the proto types, and extract the
// SequenceNumberSet for this batch.
let (ops, ids): (Vec<_>, SequenceNumberSet) = batch
.ops
.into_iter()
.map(proto::SequencedWalOp::from)
.collect();
.map(|v| {
let id = SequenceNumber::new(v.sequence_number as _);
(proto::SequencedWalOp::from(v), id)
})
.unzip();
let proto_batch = proto::WalOpBatch { ops };
// Generate the binary protobuf message, storing it into proto_data
@ -128,24 +133,33 @@ impl WriterIoThread {
.encode(&mut proto_data)
.expect("encoding batch into vec cannot fail");
// Write the serialised data to the current open segment file.
let res = {
// Obtain the segments lock - this prevents concurrent rotation, but
// has no impact on concurrent writers.
{
// Write the serialised data to the current open segment file.
let mut segments = self.segments.lock();
match segments.open_segment.write(&proto_data) {
Ok(summary) => WriteResult::Ok(summary),
Ok(summary) => {
// Broadcast the result to all writers to this batch.
//
// Do not panic if no thread is waiting for the flush
// notification - this may be the case if all writes
// disconnected before the WAL was flushed.
let _ = batch.notify_flush.send(Some(WriteResult::Ok(summary)));
// Now the blocked writers are making progress, union
// this batch sequence number set with the cumulative
// segment file set before releasing the segment lock.
segments.open_segment_ids.add_set(&ids);
}
Err(e) => {
error!(error=%e, "failed to write WAL batch");
WriteResult::Err(e.to_string())
let _ = batch
.notify_flush
.send(Some(WriteResult::Err(e.to_string())));
}
}
};
};
// Broadcast the result to all writers to this batch.
//
// Do not panic if no thread is waiting for the flush notification -
// this may be the case if all writes disconnected before the WAL
// was flushed.
let _ = batch.notify_flush.send(Some(res));
}
}
}

View File

@ -1,4 +1,4 @@
use data_types::{NamespaceId, TableId};
use data_types::{NamespaceId, SequenceNumber, TableId};
use dml::DmlWrite;
use generated_types::influxdata::{
iox::wal::v1::sequenced_wal_op::Op as WalOp,
@ -41,8 +41,12 @@ async fn crud() {
);
// Can't read entries from the open segment; have to rotate first
let closed_segment_details = wal.rotate().unwrap();
let (closed_segment_details, ids) = wal.rotate().unwrap();
assert_eq!(closed_segment_details.size(), 232);
assert_eq!(
ids.iter().collect::<Vec<_>>(),
[SequenceNumber::new(42), SequenceNumber::new(43)]
);
// There's one closed segment
let closed = wal.closed_segments();
@ -110,11 +114,13 @@ async fn ordering() {
let op = arbitrary_sequenced_wal_op(42);
let _ = unwrap_summary(wal.write_op(op)).await;
wal.rotate().unwrap();
let (_, ids) = wal.rotate().unwrap();
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(42)]);
let op = arbitrary_sequenced_wal_op(43);
let _ = unwrap_summary(wal.write_op(op)).await;
wal.rotate().unwrap();
let (_, ids) = wal.rotate().unwrap();
assert_eq!(ids.iter().collect::<Vec<_>>(), [SequenceNumber::new(43)]);
let op = arbitrary_sequenced_wal_op(44);
let _ = unwrap_summary(wal.write_op(op)).await;
@ -130,12 +136,14 @@ async fn ordering() {
assert_eq!(closed_segment_ids, &[0, 1, 2]);
// The open segment is next in order
let closed_segment_details = wal.rotate().unwrap();
let (closed_segment_details, ids) = wal.rotate().unwrap();
assert_eq!(closed_segment_details.id().get(), 3);
assert!(ids.is_empty());
// Creating new files after replay are later in the ordering
let closed_segment_details = wal.rotate().unwrap();
let (closed_segment_details, ids) = wal.rotate().unwrap();
assert_eq!(closed_segment_details.id().get(), 4);
assert!(ids.is_empty());
}
fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {