feat(data_types): SequenceNumberSet

Adds a space-efficient encoding of a set of SequenceNumber, backed by
roaring bitmaps.

Memory utilisation will change as the number of elements changes,
according to the underlying roaring bitmap design, but is intended to be
"relatively" cheap.
pull/24376/head
Dom Dwyer 2023-01-05 18:13:58 +01:00
parent e1d0bcdaad
commit 3b672c223c
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
4 changed files with 142 additions and 0 deletions

22
Cargo.lock generated
View File

@ -1116,6 +1116,27 @@ dependencies = [
"itertools",
]
[[package]]
name = "croaring"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b637fa52bae8bfe59608b632329dd3bd606126ec8af04a5cbc628f9371a545d5"
dependencies = [
"byteorder",
"croaring-sys",
"libc",
]
[[package]]
name = "croaring-sys"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f09181aa94e1b0b038a1ef581f816e1e303c1146c1504bf34ea73fe8079095f"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@ -1278,6 +1299,7 @@ dependencies = [
name = "data_types"
version = "0.1.0"
dependencies = [
"croaring",
"influxdb_line_protocol",
"iox_time",
"observability_deps",

View File

@ -7,6 +7,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
croaring = "0.7.0"
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
iox_time = { path = "../iox_time" }
observability_deps = { path = "../observability_deps" }

View File

@ -13,6 +13,8 @@
clippy::dbg_macro
)]
pub mod sequence_number_set;
use influxdb_line_protocol::FieldValue;
use observability_deps::tracing::warn;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};

View File

@ -0,0 +1,117 @@
//! A set of [`SequenceNumber`] instances.
use crate::SequenceNumber;
/// A space-efficient encoded set of [`SequenceNumber`].
#[derive(Debug, Default, Clone)]
pub struct SequenceNumberSet(croaring::Bitmap);
impl SequenceNumberSet {
/// Add the specified [`SequenceNumber`] to the set.
pub fn add(&mut self, n: SequenceNumber) {
self.0.add(n.get() as _);
}
/// Remove the specified [`SequenceNumber`] to the set, if present.
///
/// This is a no-op if `n` was not part of `self`.
pub fn remove(&mut self, n: SequenceNumber) {
self.0.remove(n.get() as _);
}
/// Add all the [`SequenceNumber`] in `other` to `self`.
///
/// The result of this operation is the set union of both input sets.
pub fn add_set(&mut self, other: &Self) {
self.0.or_inplace(&other.0)
}
/// Remove all the [`SequenceNumber`] in `other` from `self`.
pub fn remove_set(&mut self, other: &Self) {
self.0.andnot_inplace(&other.0)
}
/// Serialise `self` into a set of bytes.
///
/// [This document][spec] describes the serialised format.
///
/// [spec]: https://github.com/RoaringBitmap/RoaringFormatSpec/
pub fn as_bytes(&self) -> Vec<u8> {
self.0.serialize()
}
/// Return true if the specified [`SequenceNumber`] has been added to
/// `self`.
pub fn contains(&self, n: SequenceNumber) -> bool {
self.0.contains(n.get() as _)
}
/// Returns the number of [`SequenceNumber`] in this set.
pub fn len(&self) -> u64 {
self.0.cardinality()
}
/// Return `true` if there are no [`SequenceNumber`] in this set.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// Return an iterator of all [`SequenceNumber`] in this set.
pub fn iter(&self) -> impl Iterator<Item = SequenceNumber> + '_ {
self.0.iter().map(|v| SequenceNumber::new(v as _))
}
}
/// Deserialisation method.
impl TryFrom<&[u8]> for SequenceNumberSet {
type Error = String;
fn try_from(buffer: &[u8]) -> Result<Self, Self::Error> {
croaring::Bitmap::try_deserialize(buffer)
.map(SequenceNumberSet)
.ok_or_else(|| "invalid bitmap bytes".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_set_operations() {
let mut a = SequenceNumberSet::default();
let mut b = SequenceNumberSet::default();
// Add an element and check it is readable
a.add(SequenceNumber::new(1));
assert!(a.contains(SequenceNumber::new(1)));
assert_eq!(a.len(), 1);
assert_eq!(a.iter().collect::<Vec<_>>(), vec![SequenceNumber::new(1)]);
assert!(!a.contains(SequenceNumber::new(42)));
// Merging an empty set into a should not change a
a.add_set(&b);
assert_eq!(a.len(), 1);
assert!(a.contains(SequenceNumber::new(1)));
// Merging a non-empty set should add the new elements
b.add(SequenceNumber::new(2));
a.add_set(&b);
assert_eq!(a.len(), 2);
assert!(a.contains(SequenceNumber::new(1)));
assert!(a.contains(SequenceNumber::new(2)));
// Removing the set should return it to the pre-merged state.
a.remove_set(&b);
assert_eq!(a.len(), 1);
assert!(a.contains(SequenceNumber::new(1)));
// Removing a non-existant element should be a NOP
a.remove(SequenceNumber::new(42));
assert_eq!(a.len(), 1);
// Removing the last element should result in an empty set.
a.remove(SequenceNumber::new(1));
assert_eq!(a.len(), 0);
}
}