feat: use u64 hash in buffer index instead of str literal (#25883)

* feat: use u64 hash in buffer index instead of str literal

* refactor: move hash of column after if branch and add docs
praveen/cli-jsonl-rename-format
Trevor Hilton 2025-01-21 09:09:25 -05:00 committed by GitHub
parent 447f66d9a7
commit d1fd155b21
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 74 additions and 29 deletions

14
Cargo.lock generated
View File

@ -3265,6 +3265,7 @@ dependencies = [
"test_helpers",
"thiserror 1.0.69",
"tokio",
"twox-hash 2.1.0",
"url",
"uuid",
]
@ -3739,7 +3740,7 @@ version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
"twox-hash 1.6.3",
]
[[package]]
@ -4272,7 +4273,7 @@ dependencies = [
"snap",
"thrift",
"tokio",
"twox-hash",
"twox-hash 1.6.3",
"zstd",
"zstd-sys",
]
@ -6853,6 +6854,15 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "twox-hash"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7b17f197b3050ba473acf9181f7b1d3b66d1cf7356c6cc57886662276e65908"
dependencies = [
"rand",
]
[[package]]
name = "typenum"
version = "1.17.0"

View File

@ -126,6 +126,7 @@ tonic-build = "0.11.0"
tonic-health = "0.11.0"
tonic-reflection = "0.11.0"
tower = "0.4.13"
twox-hash = "2.1.0"
unicode-segmentation = "1.11.0"
url = "2.5.0"
urlencoding = "1.1"

View File

@ -61,6 +61,7 @@ serde_with.workspace = true
sha2.workspace = true
snap.workspace = true
thiserror.workspace = true
twox-hash.workspace = true
tokio.workspace = true
url.workspace = true
uuid.workspace = true

View File

@ -44,6 +44,8 @@ use schema::{InfluxColumnType, TIME_COLUMN_NAME};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc, time::Duration};
use thiserror::Error;
use twox_hash::XxHash64;
use write_buffer::INDEX_HASH_SEED;
/// Used to determine if writes are older than what we can accept or query
pub const THREE_DAYS: Duration = Duration::from_secs(60 * 60 * 24 * 3);
@ -503,7 +505,7 @@ pub struct BufferFilter {
#[derive(Debug)]
pub struct BufferGuarantee {
pub guarantee: Guarantee,
pub literals: HashSet<Arc<str>>,
pub literal_hashes: HashSet<u64>,
}
impl BufferFilter {
@ -599,12 +601,11 @@ impl BufferFilter {
let literals = literals
.into_iter()
.filter_map(|l| match l {
ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => {
Some(Arc::<str>::from(s.as_str()))
}
ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => Some(s),
_ => None,
})
.collect::<HashSet<Arc<str>>>();
.map(|s| XxHash64::oneshot(INDEX_HASH_SEED, s.as_bytes()))
.collect::<HashSet<u64>>();
if literals.is_empty() {
continue;
@ -619,19 +620,19 @@ impl BufferFilter {
use Guarantee::*;
match (e.guarantee, guarantee) {
(In, In) | (NotIn, NotIn) => {
e.literals = e.literals.union(&literals).cloned().collect()
e.literal_hashes = e.literal_hashes.union(&literals).cloned().collect()
}
(In, NotIn) => {
e.literals = e.literals.difference(&literals).cloned().collect()
e.literal_hashes = e.literal_hashes.difference(&literals).cloned().collect()
}
(NotIn, In) => {
e.literals = literals.difference(&e.literals).cloned().collect()
e.literal_hashes = literals.difference(&e.literal_hashes).cloned().collect()
}
}
})
.or_insert(BufferGuarantee {
guarantee,
literals,
literal_hashes: literals,
});
debug!(?guarantees, ">>> updated guarantees");
}

View File

@ -4,6 +4,7 @@ mod metrics;
pub mod persisted_files;
pub mod queryable_buffer;
mod table_buffer;
pub(crate) use table_buffer::INDEX_HASH_SEED;
pub mod validator;
use crate::persister::Persister;

View File

@ -21,9 +21,12 @@ use std::collections::BTreeMap;
use std::mem::size_of;
use std::sync::Arc;
use thiserror::Error;
use twox_hash::XxHash64;
use crate::{BufferFilter, BufferGuarantee};
pub const INDEX_HASH_SEED: u64 = 0;
#[derive(Debug, Error)]
pub enum Error {
#[error("Field not found in table buffer: {0}")]
@ -554,10 +557,39 @@ impl std::fmt::Debug for MutableTableChunk {
}
}
/// Tracks which rows in a [`TableBuffer`] have a given unique value on a per column basis
///
/// The index maps [`ColumnId`] to a `u64`, which is the hash of the original string literal, to a
/// set of row indexes (`HashSet<usize>`).
///
/// # Example
///
/// Given the following writes to the `foo` table:
///
/// ```text
/// foo,tag=a val=1 1
/// foo,tag=b val=1 2
/// foo,tag=c val=1 3
/// foo,tag=a val=1 4
/// ```
///
/// The `BufferIndex`, after these rows have been buffered, and assuming that the index is on the
/// `tag` column, would have the following structure:
///
/// ```text
/// "tag"
/// / | \
/// "a" "b" "c"
/// | | |
/// [0,3] [1] [2]
/// ```
///
/// Though, instead of the `"tag"`, would be the column ID for the `"tag"` column, and instead of
/// the literals `a`, `b`, and `c`, there would be the 64 bit hash for each value, respectively.
#[derive(Debug, Clone)]
struct BufferIndex {
// column id -> string value -> row indexes
columns: HashMap<ColumnId, HashMap<String, HashSet<usize>>>,
// column id -> hashed string value as u64 -> row indexes
columns: HashMap<ColumnId, HashMap<u64, HashSet<usize>>>,
}
impl BufferIndex {
@ -573,8 +605,9 @@ impl BufferIndex {
fn add_row_if_indexed_column(&mut self, row_index: usize, column_id: ColumnId, value: &str) {
if let Some(column) = self.columns.get_mut(&column_id) {
let hashed_value = XxHash64::oneshot(INDEX_HASH_SEED, value.as_bytes());
column
.entry_ref(value)
.entry(hashed_value)
.and_modify(|c| {
c.insert(row_index);
})
@ -583,20 +616,20 @@ impl BufferIndex {
}
fn get_rows_from_index_for_filter(&self, filter: &BufferFilter) -> Option<HashSet<usize>> {
debug!(?filter, "processing filter");
debug!(?filter, ">>> processing filter");
let mut row_ids = RowIndexSet::new();
for (
col_id,
BufferGuarantee {
guarantee,
literals,
literal_hashes,
},
) in filter.guarantees()
{
debug!(
?col_id,
?guarantee,
?literals,
?literal_hashes,
current = ?row_ids,
">>> processing buffer guarantee"
);
@ -609,12 +642,12 @@ impl BufferIndex {
// rows where the tag is in nothing. That should yield no rows, so we give back
// an empty set...
// NOTE: tags cannot be NULL. They are given a value of "" if omitted in writes
if literals.is_empty() {
if literal_hashes.is_empty() {
return Some(Default::default());
}
row_ids.start_in();
for literal in literals {
if let Some(row) = row_map.get(literal.as_ref()) {
for literal_hash in literal_hashes {
if let Some(row) = row_map.get(literal_hash) {
row_ids.update_in(row);
}
}
@ -627,8 +660,8 @@ impl BufferIndex {
.copied()
.collect::<HashSet<usize>>();
row_ids.start_not_in(in_rows);
for literal in literals {
if let Some(row) = row_map.get(literal.as_ref()) {
for literal_hash in literal_hashes {
if let Some(row) = row_map.get(literal_hash) {
row_ids.update_not_in(row);
};
}
@ -642,15 +675,13 @@ impl BufferIndex {
row_ids.finish()
}
#[allow(dead_code)]
fn size(&self) -> usize {
let mut size = size_of::<Self>();
for (_, v) in &self.columns {
size += size_of::<ColumnId>()
+ size_of::<String>()
+ size_of::<HashMap<String, Vec<usize>>>();
for (k, v) in v {
size += k.len() + size_of::<String>() + size_of::<Vec<usize>>();
size +=
size_of::<ColumnId>() + size_of::<u64>() + size_of::<HashMap<u64, Vec<usize>>>();
for (_, v) in v {
size += size_of::<u64>() + size_of::<Vec<usize>>();
size += v.len() * size_of::<usize>();
}
}
@ -1138,7 +1169,7 @@ mod tests {
table_buffer.buffer_chunk(0, &rows);
let size = table_buffer.computed_size();
assert_eq!(size, 18120);
assert_eq!(size, 18021);
}
#[test]