From 3b672c223c0e298df11a85d06b588919609cfbad Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 5 Jan 2023 18:13:58 +0100 Subject: [PATCH] feat(data_types): SequenceNumberSet Adds a space-efficient encoding of a set of SequenceNumber, backed by roaring bitmaps. Memory utilisation will change as the number of elements changes, according to the underlying roaring bitmap design, but is intended to be "relatively" cheap. --- Cargo.lock | 22 +++++ data_types/Cargo.toml | 1 + data_types/src/lib.rs | 2 + data_types/src/sequence_number_set.rs | 117 ++++++++++++++++++++++++++ 4 files changed, 142 insertions(+) create mode 100644 data_types/src/sequence_number_set.rs diff --git a/Cargo.lock b/Cargo.lock index 0880f7098d..5bbc84610a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1116,6 +1116,27 @@ dependencies = [ "itertools", ] +[[package]] +name = "croaring" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b637fa52bae8bfe59608b632329dd3bd606126ec8af04a5cbc628f9371a545d5" +dependencies = [ + "byteorder", + "croaring-sys", + "libc", +] + +[[package]] +name = "croaring-sys" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f09181aa94e1b0b038a1ef581f816e1e303c1146c1504bf34ea73fe8079095f" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -1278,6 +1299,7 @@ dependencies = [ name = "data_types" version = "0.1.0" dependencies = [ + "croaring", "influxdb_line_protocol", "iox_time", "observability_deps", diff --git a/data_types/Cargo.toml b/data_types/Cargo.toml index 624ee15555..764bb4147f 100644 --- a/data_types/Cargo.toml +++ b/data_types/Cargo.toml @@ -7,6 +7,7 @@ edition.workspace = true license.workspace = true [dependencies] +croaring = "0.7.0" influxdb_line_protocol = { path = "../influxdb_line_protocol" } iox_time = { path = "../iox_time" } observability_deps = { path = "../observability_deps" } diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 3ffac44a47..1ed0ca352e 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -13,6 +13,8 @@ clippy::dbg_macro )] +pub mod sequence_number_set; + use influxdb_line_protocol::FieldValue; use observability_deps::tracing::warn; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; diff --git a/data_types/src/sequence_number_set.rs b/data_types/src/sequence_number_set.rs new file mode 100644 index 0000000000..97a332a8d3 --- /dev/null +++ b/data_types/src/sequence_number_set.rs @@ -0,0 +1,117 @@ +//! A set of [`SequenceNumber`] instances. + +use crate::SequenceNumber; + +/// A space-efficient encoded set of [`SequenceNumber`]. +#[derive(Debug, Default, Clone)] +pub struct SequenceNumberSet(croaring::Bitmap); + +impl SequenceNumberSet { + /// Add the specified [`SequenceNumber`] to the set. + pub fn add(&mut self, n: SequenceNumber) { + self.0.add(n.get() as _); + } + + /// Remove the specified [`SequenceNumber`] to the set, if present. + /// + /// This is a no-op if `n` was not part of `self`. + pub fn remove(&mut self, n: SequenceNumber) { + self.0.remove(n.get() as _); + } + + /// Add all the [`SequenceNumber`] in `other` to `self`. + /// + /// The result of this operation is the set union of both input sets. + pub fn add_set(&mut self, other: &Self) { + self.0.or_inplace(&other.0) + } + + /// Remove all the [`SequenceNumber`] in `other` from `self`. + pub fn remove_set(&mut self, other: &Self) { + self.0.andnot_inplace(&other.0) + } + + /// Serialise `self` into a set of bytes. + /// + /// [This document][spec] describes the serialised format. + /// + /// [spec]: https://github.com/RoaringBitmap/RoaringFormatSpec/ + pub fn as_bytes(&self) -> Vec { + self.0.serialize() + } + + /// Return true if the specified [`SequenceNumber`] has been added to + /// `self`. + pub fn contains(&self, n: SequenceNumber) -> bool { + self.0.contains(n.get() as _) + } + + /// Returns the number of [`SequenceNumber`] in this set. + pub fn len(&self) -> u64 { + self.0.cardinality() + } + + /// Return `true` if there are no [`SequenceNumber`] in this set. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Return an iterator of all [`SequenceNumber`] in this set. + pub fn iter(&self) -> impl Iterator + '_ { + self.0.iter().map(|v| SequenceNumber::new(v as _)) + } +} + +/// Deserialisation method. +impl TryFrom<&[u8]> for SequenceNumberSet { + type Error = String; + + fn try_from(buffer: &[u8]) -> Result { + croaring::Bitmap::try_deserialize(buffer) + .map(SequenceNumberSet) + .ok_or_else(|| "invalid bitmap bytes".to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_set_operations() { + let mut a = SequenceNumberSet::default(); + let mut b = SequenceNumberSet::default(); + + // Add an element and check it is readable + a.add(SequenceNumber::new(1)); + assert!(a.contains(SequenceNumber::new(1))); + assert_eq!(a.len(), 1); + assert_eq!(a.iter().collect::>(), vec![SequenceNumber::new(1)]); + assert!(!a.contains(SequenceNumber::new(42))); + + // Merging an empty set into a should not change a + a.add_set(&b); + assert_eq!(a.len(), 1); + assert!(a.contains(SequenceNumber::new(1))); + + // Merging a non-empty set should add the new elements + b.add(SequenceNumber::new(2)); + a.add_set(&b); + assert_eq!(a.len(), 2); + assert!(a.contains(SequenceNumber::new(1))); + assert!(a.contains(SequenceNumber::new(2))); + + // Removing the set should return it to the pre-merged state. + a.remove_set(&b); + assert_eq!(a.len(), 1); + assert!(a.contains(SequenceNumber::new(1))); + + // Removing a non-existant element should be a NOP + a.remove(SequenceNumber::new(42)); + assert_eq!(a.len(), 1); + + // Removing the last element should result in an empty set. + a.remove(SequenceNumber::new(1)); + assert_eq!(a.len(), 0); + } +}