diff --git a/ingester/src/data/partition/buffer/always_some.rs b/ingester/src/data/partition/buffer/always_some.rs new file mode 100644 index 0000000000..2ae59380e0 --- /dev/null +++ b/ingester/src/data/partition/buffer/always_some.rs @@ -0,0 +1,99 @@ +//! A helper type that ensures an `Option` is always `Some` once the guard is +//! dropped. + +/// A guard through which a value can be placed back into the [`AlwaysSome`]. +#[derive(Debug)] +#[must_use = "Guard must be used to restore the value"] +pub(super) struct Guard<'a, T>(&'a mut Option); + +impl<'a, T> Guard<'a, T> { + /// Store `value` in the [`AlwaysSome`] for subsequent + /// [`AlwaysSome::take()`] calls. + pub(super) fn store(self, value: T) { + assert!(self.0.is_none()); + *self.0 = Some(value); + } +} + +/// A helper type that aims to ease working with an [`Option`] that must always +/// be restored in a given scope. +/// +/// Accessing the value within an [`AlwaysSome`] returns a [`Guard`], which MUST +/// be used to store the value before going out of scope. Failure to store a +/// value cause a subsequent [`Self::take()`] call to panic. +/// +/// Failing to store a value in the [`Guard`] causes a compiler warning, however +/// this does not prevent failing to return a value to the [`AlwaysSome`] as the +/// warning can be falsely silenced by using it within one conditional code path +/// and not the other. +#[derive(Debug)] +pub(super) struct AlwaysSome(Option); + +impl Default for AlwaysSome +where + T: Default, +{ + fn default() -> Self { + Self::new(T::default()) + } +} + +impl std::ops::Deref for AlwaysSome { + type Target = T; + + fn deref(&self) -> &Self::Target { + self.0.as_ref().unwrap() + } +} + +impl AlwaysSome { + /// Wrap `value` in an [`AlwaysSome`]. + pub(super) fn new(value: T) -> Self { + Self(Some(value)) + } + + /// Read the value. + pub(super) fn take(&mut self) -> (Guard<'_, T>, T) { + let value = std::mem::take(&mut self.0); + + ( + Guard(&mut self.0), + value.expect("AlwaysSome value is None!"), + ) + } + + /// Deconstruct `self`, returning the inner value. + pub(crate) fn into_inner(self) -> T { + self.0.unwrap() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_always_some() { + let mut a = AlwaysSome::::default(); + + let (guard, value) = a.take(); + assert_eq!(value, 0); + guard.store(42); + + let (guard, value) = a.take(); + assert_eq!(value, 42); + guard.store(24); + + assert_eq!(a.into_inner(), 24); + } + + #[test] + #[should_panic = "AlwaysSome value is None!"] + fn test_drops_guard() { + let mut a = AlwaysSome::::default(); + { + let _ = a.take(); + } + let _ = a.take(); + } +} diff --git a/ingester/src/data/partition/buffer/mutable_buffer.rs b/ingester/src/data/partition/buffer/mutable_buffer.rs new file mode 100644 index 0000000000..cd607a37bd --- /dev/null +++ b/ingester/src/data/partition/buffer/mutable_buffer.rs @@ -0,0 +1,53 @@ +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use mutable_batch::MutableBatch; +use schema::selection::Selection; + +/// A [`Buffer`] is an internal mutable buffer wrapper over a [`MutableBatch`] +/// for the [`BufferState`] FSM. +/// +/// A [`Buffer`] can contain no writes. +/// +/// [`BufferState`]: super::super::BufferState +#[derive(Debug, Default)] +pub(super) struct Buffer { + buffer: Option, +} + +impl Buffer { + /// Apply `batch` to the in-memory buffer. + /// + /// # Data Loss + /// + /// If this method returns an error, the data in `batch` is problematic and + /// has been discarded. + pub(super) fn buffer_write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> { + match self.buffer { + Some(ref mut b) => b.extend_from(&batch)?, + None => self.buffer = Some(batch), + }; + + Ok(()) + } + + /// Generates a [`RecordBatch`] from the data in this [`Buffer`]. + /// + /// If this [`Buffer`] is empty when this method is called, the call is a + /// NOP and [`None`] is returned. + /// + /// # Panics + /// + /// If generating the snapshot fails, this method panics. + pub(super) fn snapshot(self) -> Option> { + Some(Arc::new( + self.buffer? + .to_arrow(Selection::All) + .expect("failed to snapshot buffer data"), + )) + } + + pub(super) fn is_empty(&self) -> bool { + self.buffer.is_none() + } +} diff --git a/ingester/src/data/partition/buffer/state_machine.rs b/ingester/src/data/partition/buffer/state_machine.rs new file mode 100644 index 0000000000..24c73a1538 --- /dev/null +++ b/ingester/src/data/partition/buffer/state_machine.rs @@ -0,0 +1,289 @@ +#![allow(dead_code)] +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use data_types::SequenceNumber; +use mutable_batch::MutableBatch; + +mod buffering; +mod buffering_with_snapshot; +mod persisting; +mod snapshot; + +pub(in crate::data::partition::buffer) use buffering::*; +pub(in crate::data::partition::buffer) use buffering_with_snapshot::*; +pub(crate) use persisting::*; +pub(in crate::data::partition::buffer) use snapshot::*; + +use crate::data::SequenceNumberRange; + +use super::traits::{Queryable, Writeable}; + +/// A result type for fallible transitions. +/// +/// The type system ensures the state machine is always returned to the caller, +/// regardless of the transition outcome. +#[derive(Debug)] +pub(crate) enum Transition { + /// The transition succeeded, and the new state is contained within. + Ok(BufferState), + /// The state machine failed to transition due to an invariant not being + /// upheld, and the original state is contained within. + Unchanged(BufferState), +} + +impl Transition { + /// A helper function to construct [`Self::Ok`] variants. + pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Transition { + Self::Ok(BufferState { + state: v, + sequence_range, + }) + } + + /// A helper function to construct [`Self::Unchanged`] variants. + pub(super) fn unchanged(v: BufferState) -> Transition { + Self::Unchanged(v) + } +} + +/// A finite state machine for buffering writes, and converting them into a +/// queryable data format on-demand. +/// +/// This FSM is used to provide explicit states for each stage of the data +/// lifecycle within a partition buffer: +/// +/// ```text +/// ┌──────────────┐ +/// │ Buffering │ +/// └───────┬──────┘ +/// │ +/// ▼ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ +/// ┌─────▶ Snapshot ├─────▶ Persisting │ +/// │ └ ─ ─ ─ ┬ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ +/// │ │ +/// │ ▼ +/// │ ┌───────────────────────┐ +/// └──│ BufferingWithSnapshot │ +/// └───────────────────────┘ +/// ``` +/// +/// Boxes with dashed lines indicate immutable, queryable states that contain +/// data in an efficient data format for query execution ([`RecordBatch`]). +/// +/// Boxes with solid lines indicate a mutable state to which further writes can +/// be applied. +/// +/// A [`BufferState`] tracks the bounding [`SequenceNumber`] values it has +/// observed, and enforces monotonic writes (w.r.t their [`SequenceNumber`]). +#[derive(Debug)] +pub(crate) struct BufferState { + state: T, + sequence_range: SequenceNumberRange, +} + +impl BufferState { + /// Initialise a new buffer state machine. + pub(super) fn new() -> Self { + Self { + state: Buffering::default(), + sequence_range: SequenceNumberRange::default(), + } + } +} + +impl BufferState { + pub fn sequence_number_range(&self) -> &SequenceNumberRange { + &self.sequence_range + } +} + +/// A [`BufferState`] in a mutable state can accept writes and record their +/// [`SequenceNumber`]. +impl BufferState +where + T: Writeable, +{ + /// The provided [`SequenceNumber`] MUST be for the given [`MutableBatch`]. + /// + /// # Panics + /// + /// This method panics if it is called non-monotonic writes/sequence + /// numbers. + pub(crate) fn write( + &mut self, + batch: MutableBatch, + n: SequenceNumber, + ) -> Result<(), mutable_batch::Error> { + self.state.write(batch)?; + self.sequence_range.observe(n); + Ok(()) + } +} + +/// A [`BufferState`] in a queryable state delegates the read to the current +/// state machine state. +impl Queryable for BufferState +where + T: Queryable, +{ + /// Returns the current buffer data. + /// + /// This is always a cheap method call. + fn get_query_data(&self) -> &[Arc] { + self.state.get_query_data() + } +} + +#[cfg(test)] +mod tests { + use std::ops::Deref; + + use arrow_util::assert_batches_eq; + use mutable_batch_lp::test_helpers::lp_to_mutable_batch; + use schema::selection::Selection; + + use super::*; + + #[test] + fn test_buffer_lifecycle() { + // Initialise a buffer in the base state. + let mut buffer: BufferState = BufferState::new(); + + // Validate the sequence number ranges are not populated. + assert!(buffer.sequence_number_range().inclusive_min().is_none()); + assert!(buffer.sequence_number_range().inclusive_max().is_none()); + + // Write some data to a buffer. + buffer + .write( + lp_to_mutable_batch(r#"bananas,tag=platanos great=true 668563242000000042"#).1, + SequenceNumber::new(0), + ) + .expect("write to empty buffer should succeed"); + + // Snapshot the buffer into an immutable, queryable data format. + let buffer: BufferState = match buffer.snapshot() { + Transition::Ok(v) => v, + Transition::Unchanged(_) => panic!("did not transition to snapshot state"), + }; + + // Extract the queryable data from the buffer and validate it. + // + // Keep the data to validate they are ref-counted copies after further + // writes below. Note this construct allows the caller to decide when/if + // to allocate. + let w1_data = buffer.get_query_data().to_owned(); + + let expected = vec![ + "+-------+----------+--------------------------------+", + "| great | tag | time |", + "+-------+----------+--------------------------------+", + "| true | platanos | 1991-03-10T00:00:42.000000042Z |", + "+-------+----------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[w1_data[0].deref().clone()]); + + // Transition the buffer into a mutable state in which it can accept + // writes. + let mut buffer: BufferState = buffer.into_buffering(); + + // Apply another write. + buffer + .write( + lp_to_mutable_batch( + r#"bananas,tag=platanos great=true,how_much=1000 668563242000000042"#, + ) + .1, + SequenceNumber::new(1), + ) + .expect("write to empty buffer should succeed"); + + // Snapshot the buffer into an immutable, queryable data format. + let buffer: BufferState = buffer.snapshot(); + + // Verify the second write was buffered. + let w2_data = buffer.get_query_data().to_owned(); + let expected = vec![ + "+-------+----------+----------+--------------------------------+", + "| great | how_much | tag | time |", + "+-------+----------+----------+--------------------------------+", + "| true | 1000 | platanos | 1991-03-10T00:00:42.000000042Z |", + "+-------+----------+----------+--------------------------------+", + ]; + assert_batches_eq!(&expected, &[w2_data[1].deref().clone()]); + + // Verify the first write has not changed, and has not been + // re-ordered. + assert_eq!(w1_data, w2_data[..1]); + // Furthermore, ensure no data was actually copied + assert!(Arc::ptr_eq(&w1_data[0], &w2_data[0])); + + // Ensure the same data is returned for a second read. + { + let second_read = buffer.get_query_data(); + assert_eq!(w2_data, second_read); + + // And that no data was actually copied. + let same_arcs = w2_data + .iter() + .zip(second_read.iter()) + .all(|(a, b)| Arc::ptr_eq(a, &b)); + assert!(same_arcs); + } + + // Finally transition into the terminal persisting state. + let buffer: BufferState = buffer.into_persisting(); + + // Validate the sequence number ranges were updated as writes occurred. + assert_eq!( + buffer.sequence_number_range().inclusive_min(), + Some(SequenceNumber::new(0)) + ); + assert_eq!( + buffer.sequence_number_range().inclusive_max(), + Some(SequenceNumber::new(1)) + ); + + // Extract the final buffered result + let final_data = buffer.into_data(); + + // And once again verify no data was changed, copied or re-ordered. + assert_eq!(w2_data, final_data); + let same_arcs = w2_data + .into_iter() + .zip(final_data.into_iter()) + .all(|(a, b)| Arc::ptr_eq(&a, &b)); + assert!(same_arcs); + } + + #[test] + fn test_snapshot_buffer_different_but_compatible_schemas() { + let mut buffer = BufferState::new(); + + // Missing tag `t1` + let (_, mut mb1) = lp_to_mutable_batch(r#"foo iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#); + buffer.state.write(mb1.clone()).unwrap(); + + // Missing field `iv` + let (_, mb2) = lp_to_mutable_batch(r#"foo,t1=aoeu uv=1u,fv=12.0,bv=false,sv="bye" 10000"#); + buffer.state.write(mb2.clone()).unwrap(); + + let buffer: BufferState = match buffer.snapshot() { + Transition::Ok(v) => v, + Transition::Unchanged(_) => panic!("failed to transition"), + }; + + assert_eq!(buffer.get_query_data().len(), 1); + + let snapshot = &buffer.get_query_data()[0]; + + // Generate the combined buffer from the original inputs to compare + // against. + mb1.extend_from(&mb2).unwrap(); + let want = mb1.to_arrow(Selection::All).unwrap(); + + assert_eq!(&**snapshot, &want); + } +} diff --git a/ingester/src/data/partition/buffer/state_machine/buffering.rs b/ingester/src/data/partition/buffer/state_machine/buffering.rs new file mode 100644 index 0000000000..85ab01bbcd --- /dev/null +++ b/ingester/src/data/partition/buffer/state_machine/buffering.rs @@ -0,0 +1,61 @@ +//! A write buffer. + +use mutable_batch::MutableBatch; + +use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable}; + +use super::{snapshot::Snapshot, BufferState, Transition}; + +/// The FSM starting ingest state - a mutable buffer collecting writes. +#[derive(Debug, Default)] +pub(crate) struct Buffering { + /// The buffer for incoming writes. + /// + /// This buffer MAY be empty when no writes have occured since transitioning + /// to this state. + buffer: Buffer, +} + +impl Writeable for Buffering { + fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> { + self.buffer.buffer_write(batch) + } +} + +impl BufferState { + /// Attempt to generate a snapshot from the data in this buffer. + /// + /// This returns [`Transition::Unchanged`] if this buffer contains no data. + pub(crate) fn snapshot(self) -> Transition { + if self.state.buffer.is_empty() { + // It is a logical error to snapshot an empty buffer. + return Transition::unchanged(self); + } + + // Generate a snapshot from the buffer. + let snap = self + .state + .buffer + .snapshot() + .expect("snapshot of non-empty buffer should succeed"); + + // And transition to the WithSnapshot state. + Transition::ok(Snapshot::new(vec![snap]), self.sequence_range) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_buffer_does_not_snapshot() { + let b = BufferState::new(); + match b.snapshot() { + Transition::Ok(_) => panic!("empty buffer should not transition to snapshot state"), + Transition::Unchanged(_) => { + // OK! + } + } + } +} diff --git a/ingester/src/data/partition/buffer/state_machine/buffering_with_snapshot.rs b/ingester/src/data/partition/buffer/state_machine/buffering_with_snapshot.rs new file mode 100644 index 0000000000..00a7f855d4 --- /dev/null +++ b/ingester/src/data/partition/buffer/state_machine/buffering_with_snapshot.rs @@ -0,0 +1,62 @@ +//! A writfield1 buffer, with one or more snapshots. + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; +use mutable_batch::MutableBatch; + +use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable}; + +use super::{snapshot::Snapshot, BufferState}; + +/// A mutable state that buffers incoming writes while holding at least one +/// previously generated buffer snapshot. +#[derive(Debug)] +pub(crate) struct BufferingWithSnapshot { + /// The buffer for incoming writes. + /// + /// NOTE: this buffer MAY be empty. + buffer: Buffer, + + /// Snapshots generated from previous buffer contents. + /// + /// INVARIANT: this array is always non-empty. + snapshots: Vec>, +} + +impl BufferingWithSnapshot { + pub(super) fn new(snapshots: Vec>) -> Self { + Self { + buffer: Buffer::default(), + snapshots, + } + } +} + +impl Writeable for BufferingWithSnapshot { + fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> { + // TODO(5806): assert schema compatibility with existing snapshots + self.buffer.buffer_write(batch) + } +} + +impl BufferState { + /// Snapshot the current buffer contents and transition to an immutable, + /// queryable state containing only snapshots. + /// + /// This call MAY be a NOP if the buffer has accrued no writes. + pub(crate) fn snapshot(self) -> BufferState { + assert!(!self.state.snapshots.is_empty()); + + BufferState { + state: Snapshot::new( + self.state + .snapshots + .into_iter() + .chain(self.state.buffer.snapshot()) + .collect(), + ), + sequence_range: self.sequence_range, + } + } +} diff --git a/ingester/src/data/partition/buffer/state_machine/persisting.rs b/ingester/src/data/partition/buffer/state_machine/persisting.rs new file mode 100644 index 0000000000..27c8d33603 --- /dev/null +++ b/ingester/src/data/partition/buffer/state_machine/persisting.rs @@ -0,0 +1,37 @@ +//! A writfield1 buffer, with one or more snapshots. + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; + +use crate::data::partition::buffer::traits::Queryable; + +use super::BufferState; + +/// An immutable set of [`RecordBatch`] in the process of being persisted. +#[derive(Debug)] +pub(crate) struct Persisting { + /// Snapshots generated from previous buffer contents to be persisted. + /// + /// INVARIANT: this array is always non-empty. + snapshots: Vec>, +} + +impl Persisting { + pub(super) fn new(snapshots: Vec>) -> Self { + Self { snapshots } + } +} + +impl Queryable for Persisting { + fn get_query_data(&self) -> &[Arc] { + &self.snapshots + } +} + +impl BufferState { + /// Consume `self`, returning the data it holds as a set of [`RecordBatch`]. + pub(super) fn into_data(self) -> Vec> { + self.state.snapshots + } +} diff --git a/ingester/src/data/partition/buffer/state_machine/snapshot.rs b/ingester/src/data/partition/buffer/state_machine/snapshot.rs new file mode 100644 index 0000000000..6c5b8988e9 --- /dev/null +++ b/ingester/src/data/partition/buffer/state_machine/snapshot.rs @@ -0,0 +1,49 @@ +//! A writfield1 buffer, with one or more snapshots. + +use std::sync::Arc; + +use arrow::record_batch::RecordBatch; + +use crate::data::partition::buffer::{state_machine::persisting::Persisting, traits::Queryable}; + +use super::{buffering_with_snapshot::BufferingWithSnapshot, BufferState}; + +/// An immutable, queryable FSM state containing at least one buffer snapshot. +#[derive(Debug)] +pub(crate) struct Snapshot { + /// Snapshots generated from previous buffer contents. + /// + /// INVARIANT: this array is always non-empty. + snapshots: Vec>, +} + +impl Snapshot { + pub(super) fn new(snapshots: Vec>) -> Self { + assert!(!snapshots.is_empty()); + Self { snapshots } + } +} + +impl Queryable for Snapshot { + fn get_query_data(&self) -> &[Arc] { + &self.snapshots + } +} + +impl BufferState { + pub(crate) fn into_buffering(self) -> BufferState { + assert!(!self.state.snapshots.is_empty()); + BufferState { + state: BufferingWithSnapshot::new(self.state.snapshots), + sequence_range: self.sequence_range, + } + } + + pub(crate) fn into_persisting(self) -> BufferState { + assert!(!self.state.snapshots.is_empty()); + BufferState { + state: Persisting::new(self.state.snapshots), + sequence_range: self.sequence_range, + } + } +} diff --git a/ingester/src/data/partition/buffer/traits.rs b/ingester/src/data/partition/buffer/traits.rs new file mode 100644 index 0000000000..bff6dfda8f --- /dev/null +++ b/ingester/src/data/partition/buffer/traits.rs @@ -0,0 +1,17 @@ +//! Private traits for state machine states. + +use std::{fmt::Debug, sync::Arc}; + +use arrow::record_batch::RecordBatch; +use mutable_batch::MutableBatch; + +/// A state that can accept writes. +pub(crate) trait Writeable: Debug { + fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error>; +} + +/// A state that can return the contents of the buffer as one or more +/// [`RecordBatch`] instances. +pub(crate) trait Queryable: Debug { + fn get_query_data(&self) -> &[Arc]; +} diff --git a/ingester/src/data/sequence_range.rs b/ingester/src/data/sequence_range.rs new file mode 100644 index 0000000000..d8e2bb2033 --- /dev/null +++ b/ingester/src/data/sequence_range.rs @@ -0,0 +1,141 @@ +use data_types::SequenceNumber; + +/// A range of sequence numbers, both inclusive [min, max]. +#[derive(Debug, Default, PartialEq, Eq, Clone)] +pub(crate) struct SequenceNumberRange { + range: Option<(SequenceNumber, SequenceNumber)>, +} + +impl SequenceNumberRange { + pub(crate) fn observe(&mut self, n: SequenceNumber) { + self.range = Some(match self.range { + Some((min, max)) => { + assert!(n > max, "monotonicity violation"); + (min, n) + } + None => (n, n), + }); + } + + /// Returns the inclusive lower bound on [`SequenceNumber`] values observed. + pub(crate) fn inclusive_min(&self) -> Option { + self.range.map(|v| v.0) + } + + /// Returns the inclusive upper bound on [`SequenceNumber`] values observed. + pub(crate) fn inclusive_max(&self) -> Option { + self.range.map(|v| v.1) + } + + /// Merge two [`SequenceNumberRange`] instances, returning a new, merged + /// instance. + /// + /// The merge result contains the minimum of [`Self::inclusive_min()`] from + /// each instance, and the maximum of [`Self::inclusive_max()`]. + /// + /// If both `self` and `other` contain no [`SequenceNumber`] observations, + /// the returned instance contains no observations. + pub(crate) fn merge(&self, other: &Self) -> Self { + let merged_range = self + .range + .into_iter() + .chain(other.range.clone()) + .reduce(|a, b| (a.0.min(b.0), a.1.max(b.1))); + + Self { + range: merged_range, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ranges() { + let mut r = SequenceNumberRange::default(); + + r.observe(SequenceNumber::new(0)); + r.observe(SequenceNumber::new(2)); + r.observe(SequenceNumber::new(3)); + + assert_eq!(r.inclusive_min(), Some(SequenceNumber::new(0))); + assert_eq!(r.inclusive_max(), Some(SequenceNumber::new(3))); + } + + #[test] + #[should_panic = "monotonicity violation"] + fn test_monotonicity() { + let mut r = SequenceNumberRange::default(); + + r.observe(SequenceNumber::new(1)); + r.observe(SequenceNumber::new(3)); + r.observe(SequenceNumber::new(2)); + } + + #[test] + #[should_panic = "monotonicity violation"] + fn test_exactly_once() { + let mut r = SequenceNumberRange::default(); + + r.observe(SequenceNumber::new(1)); + r.observe(SequenceNumber::new(1)); + } + + #[test] + fn test_merge() { + let mut a = SequenceNumberRange::default(); + let mut b = SequenceNumberRange::default(); + + a.observe(SequenceNumber::new(4)); + b.observe(SequenceNumber::new(2)); + + let a_b = a.merge(&b); + assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(2))); + assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4))); + + let b_a = b.merge(&a); + assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(2))); + assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4))); + + assert_eq!(a_b, b_a); + } + + #[test] + fn test_merge_half_empty() { + let mut a = SequenceNumberRange::default(); + let b = SequenceNumberRange::default(); + + a.observe(SequenceNumber::new(4)); + // B observes nothing + + let a_b = a.merge(&b); + assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(4))); + assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4))); + + let b_a = b.merge(&a); + assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(4))); + assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4))); + + assert_eq!(a_b, b_a); + } + + #[test] + fn test_merge_both_empty() { + let a = SequenceNumberRange::default(); + let b = SequenceNumberRange::default(); + + // Neither observe anything + + let a_b = a.merge(&b); + assert_eq!(a_b.inclusive_min(), None); + assert_eq!(a_b.inclusive_max(), None); + + let b_a = b.merge(&a); + assert_eq!(b_a.inclusive_min(), None); + assert_eq!(b_a.inclusive_max(), None); + + assert_eq!(a_b, b_a); + } +}