diff --git a/Cargo.lock b/Cargo.lock index 0880f7098d..5bbc84610a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1116,6 +1116,27 @@ dependencies = [ "itertools", ] +[[package]] +name = "croaring" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b637fa52bae8bfe59608b632329dd3bd606126ec8af04a5cbc628f9371a545d5" +dependencies = [ + "byteorder", + "croaring-sys", + "libc", +] + +[[package]] +name = "croaring-sys" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f09181aa94e1b0b038a1ef581f816e1e303c1146c1504bf34ea73fe8079095f" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1278,6 +1299,7 @@ dependencies = [ name = "data_types" version = "0.1.0" dependencies = [ + "croaring", "influxdb_line_protocol", "iox_time", "observability_deps", diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index 624ee15555..764bb4147f 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true license.workspace = true [dependencies] +croaring = "0.7.0" influxdb_line_protocol = { path = "../influxdb_line_protocol" } iox_time = { path = "../iox_time" } observability_deps = { path = "../observability_deps" } diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 3ffac44a47..1ed0ca352e 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -13,6 +13,8 @@ clippy::dbg_macro )] +pub mod sequence_number_set; + use influxdb_line_protocol::FieldValue; use observability_deps::tracing::warn; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; diff --git a/data_types/src/sequence_number_set.rs b/data_types/src/sequence_number_set.rs new file mode 100644 index 0000000000..97a332a8d3 --- /dev/null +++ b/data_types/src/sequence_number_set.rs @@ -0,0 +1,117 @@ +//! A set of [`SequenceNumber`] instances. + +use crate::SequenceNumber; + +/// A space-efficient encoded set of [`SequenceNumber`]. +#[derive(Debug, Default, Clone)] +pub struct SequenceNumberSet(croaring::Bitmap); + +impl SequenceNumberSet { + /// Add the specified [`SequenceNumber`] to the set. + pub fn add(&mut self, n: SequenceNumber) { + self.0.add(n.get() as _); + } + + /// Remove the specified [`SequenceNumber`] to the set, if present. + /// + /// This is a no-op if `n` was not part of `self`. + pub fn remove(&mut self, n: SequenceNumber) { + self.0.remove(n.get() as _); + } + + /// Add all the [`SequenceNumber`] in `other` to `self`. + /// + /// The result of this operation is the set union of both input sets. + pub fn add_set(&mut self, other: &Self) { + self.0.or_inplace(&other.0) + } + + /// Remove all the [`SequenceNumber`] in `other` from `self`. + pub fn remove_set(&mut self, other: &Self) { + self.0.andnot_inplace(&other.0) + } + + /// Serialise `self` into a set of bytes. + /// + /// [This document][spec] describes the serialised format. + /// + /// [spec]: https://github.com/RoaringBitmap/RoaringFormatSpec/ + pub fn as_bytes(&self) -> Vec { + self.0.serialize() + } + + /// Return true if the specified [`SequenceNumber`] has been added to + /// `self`. + pub fn contains(&self, n: SequenceNumber) -> bool { + self.0.contains(n.get() as _) + } + + /// Returns the number of [`SequenceNumber`] in this set. + pub fn len(&self) -> u64 { + self.0.cardinality() + } + + /// Return `true` if there are no [`SequenceNumber`] in this set. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Return an iterator of all [`SequenceNumber`] in this set. + pub fn iter(&self) -> impl Iterator + '_ { + self.0.iter().map(|v| SequenceNumber::new(v as _)) + } +} + +/// Deserialisation method. +impl TryFrom<&[u8]> for SequenceNumberSet { + type Error = String; + + fn try_from(buffer: &[u8]) -> Result { + croaring::Bitmap::try_deserialize(buffer) + .map(SequenceNumberSet) + .ok_or_else(|| "invalid bitmap bytes".to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_set_operations() { + let mut a = SequenceNumberSet::default(); + let mut b = SequenceNumberSet::default(); + + // Add an element and check it is readable + a.add(SequenceNumber::new(1)); + assert!(a.contains(SequenceNumber::new(1))); + assert_eq!(a.len(), 1); + assert_eq!(a.iter().collect::>(), vec![SequenceNumber::new(1)]); + assert!(!a.contains(SequenceNumber::new(42))); + + // Merging an empty set into a should not change a + a.add_set(&b); + assert_eq!(a.len(), 1); + assert!(a.contains(SequenceNumber::new(1))); + + // Merging a non-empty set should add the new elements + b.add(SequenceNumber::new(2)); + a.add_set(&b); + assert_eq!(a.len(), 2); + assert!(a.contains(SequenceNumber::new(1))); + assert!(a.contains(SequenceNumber::new(2))); + + // Removing the set should return it to the pre-merged state. + a.remove_set(&b); + assert_eq!(a.len(), 1); + assert!(a.contains(SequenceNumber::new(1))); + + // Removing a non-existant element should be a NOP + a.remove(SequenceNumber::new(42)); + assert_eq!(a.len(), 1); + + // Removing the last element should result in an empty set. + a.remove(SequenceNumber::new(1)); + assert_eq!(a.len(), 0); + } +}