Merge pull request #6513 from influxdata/dom/seqnum-set

feat(data_types): SequenceNumberSet
pull/24376/head
Dom 2023-01-09 10:45:53 +00:00 committed by GitHub
commit 13f2f36a9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 142 additions and 0 deletions

22
Cargo.lock generated
View File

@ -1115,6 +1115,27 @@ dependencies = [
"itertools", "itertools",
] ]
[[package]]
name = "croaring"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b637fa52bae8bfe59608b632329dd3bd606126ec8af04a5cbc628f9371a545d5"
dependencies = [
"byteorder",
"croaring-sys",
"libc",
]
[[package]]
name = "croaring-sys"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f09181aa94e1b0b038a1ef581f816e1e303c1146c1504bf34ea73fe8079095f"
dependencies = [
"cc",
"libc",
]
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.6" version = "0.5.6"
@ -1277,6 +1298,7 @@ dependencies = [
name = "data_types" name = "data_types"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"croaring",
"influxdb_line_protocol", "influxdb_line_protocol",
"iox_time", "iox_time",
"observability_deps", "observability_deps",

View File

@ -7,6 +7,7 @@ edition.workspace = true
license.workspace = true license.workspace = true
[dependencies] [dependencies]
croaring = "0.7.0"
influxdb_line_protocol = { path = "../influxdb_line_protocol" } influxdb_line_protocol = { path = "../influxdb_line_protocol" }
iox_time = { path = "../iox_time" } iox_time = { path = "../iox_time" }
observability_deps = { path = "../observability_deps" } observability_deps = { path = "../observability_deps" }

View File

@ -13,6 +13,8 @@
clippy::dbg_macro clippy::dbg_macro
)] )]
pub mod sequence_number_set;
use influxdb_line_protocol::FieldValue; use influxdb_line_protocol::FieldValue;
use observability_deps::tracing::warn; use observability_deps::tracing::warn;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};

View File

@ -0,0 +1,117 @@
//! A set of [`SequenceNumber`] instances.
use crate::SequenceNumber;
/// A space-efficient encoded set of [`SequenceNumber`].
#[derive(Debug, Default, Clone)]
pub struct SequenceNumberSet(croaring::Bitmap);
impl SequenceNumberSet {
/// Add the specified [`SequenceNumber`] to the set.
pub fn add(&mut self, n: SequenceNumber) {
self.0.add(n.get() as _);
}
/// Remove the specified [`SequenceNumber`] to the set, if present.
///
/// This is a no-op if `n` was not part of `self`.
pub fn remove(&mut self, n: SequenceNumber) {
self.0.remove(n.get() as _);
}
/// Add all the [`SequenceNumber`] in `other` to `self`.
///
/// The result of this operation is the set union of both input sets.
pub fn add_set(&mut self, other: &Self) {
self.0.or_inplace(&other.0)
}
/// Remove all the [`SequenceNumber`] in `other` from `self`.
pub fn remove_set(&mut self, other: &Self) {
self.0.andnot_inplace(&other.0)
}
/// Serialise `self` into a set of bytes.
///
/// [This document][spec] describes the serialised format.
///
/// [spec]: https://github.com/RoaringBitmap/RoaringFormatSpec/
pub fn as_bytes(&self) -> Vec<u8> {
self.0.serialize()
}
/// Return true if the specified [`SequenceNumber`] has been added to
/// `self`.
pub fn contains(&self, n: SequenceNumber) -> bool {
self.0.contains(n.get() as _)
}
/// Returns the number of [`SequenceNumber`] in this set.
pub fn len(&self) -> u64 {
self.0.cardinality()
}
/// Return `true` if there are no [`SequenceNumber`] in this set.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// Return an iterator of all [`SequenceNumber`] in this set.
pub fn iter(&self) -> impl Iterator<Item = SequenceNumber> + '_ {
self.0.iter().map(|v| SequenceNumber::new(v as _))
}
}
/// Deserialisation method.
impl TryFrom<&[u8]> for SequenceNumberSet {
type Error = String;
fn try_from(buffer: &[u8]) -> Result<Self, Self::Error> {
croaring::Bitmap::try_deserialize(buffer)
.map(SequenceNumberSet)
.ok_or_else(|| "invalid bitmap bytes".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_set_operations() {
let mut a = SequenceNumberSet::default();
let mut b = SequenceNumberSet::default();
// Add an element and check it is readable
a.add(SequenceNumber::new(1));
assert!(a.contains(SequenceNumber::new(1)));
assert_eq!(a.len(), 1);
assert_eq!(a.iter().collect::<Vec<_>>(), vec![SequenceNumber::new(1)]);
assert!(!a.contains(SequenceNumber::new(42)));
// Merging an empty set into a should not change a
a.add_set(&b);
assert_eq!(a.len(), 1);
assert!(a.contains(SequenceNumber::new(1)));
// Merging a non-empty set should add the new elements
b.add(SequenceNumber::new(2));
a.add_set(&b);
assert_eq!(a.len(), 2);
assert!(a.contains(SequenceNumber::new(1)));
assert!(a.contains(SequenceNumber::new(2)));
// Removing the set should return it to the pre-merged state.
a.remove_set(&b);
assert_eq!(a.len(), 1);
assert!(a.contains(SequenceNumber::new(1)));
// Removing a non-existant element should be a NOP
a.remove(SequenceNumber::new(42));
assert_eq!(a.len(), 1);
// Removing the last element should result in an empty set.
a.remove(SequenceNumber::new(1));
assert_eq!(a.len(), 0);
}
}