diff --git a/Cargo.lock b/Cargo.lock index 4ac0a72ce2..7b7ab35a47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2749,6 +2749,21 @@ dependencies = [ "uuid", ] +[[package]] +name = "influxdb3_cache" +version = "0.1.0" +dependencies = [ + "anyhow", + "arrow", + "arrow_util", + "influxdb3_catalog", + "influxdb3_id", + "influxdb3_wal", + "influxdb3_write", + "iox_time", + "schema", +] + [[package]] name = "influxdb3_catalog" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 2339ee77bd..38ea96b3c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ # In alphabetical order members = [ "influxdb3", + "influxdb3_cache", "influxdb3_catalog", "influxdb3_client", "influxdb3_id", diff --git a/influxdb3_cache/Cargo.toml b/influxdb3_cache/Cargo.toml new file mode 100644 index 0000000000..4b7fb0f0de --- /dev/null +++ b/influxdb3_cache/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "influxdb3_cache" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +# Core Crates +iox_time.workspace = true +schema.workspace = true + +# Local deps +influxdb3_catalog = { path = "../influxdb3_catalog" } +influxdb3_id = { path = "../influxdb3_id" } +influxdb3_wal = { path = "../influxdb3_wal" } + +# crates.io dependencies +anyhow.workspace = true +arrow.workspace = true + +[dev-dependencies] +# Core Crates +arrow_util.workspace = true +# Local deps +influxdb3_write = { path = "../influxdb3_write" } + +[lints] +workspace = true diff --git a/influxdb3_cache/src/lib.rs b/influxdb3_cache/src/lib.rs new file mode 100644 index 0000000000..a55f022b01 --- /dev/null +++ b/influxdb3_cache/src/lib.rs @@ -0,0 +1,3 @@ +//! Crate holding the various cache implementations used by InfluxDB 3 + +pub mod meta_cache; diff --git a/influxdb3_cache/src/meta_cache/mod.rs b/influxdb3_cache/src/meta_cache/mod.rs new file mode 100644 index 0000000000..67f9eeba9c --- /dev/null +++ b/influxdb3_cache/src/meta_cache/mod.rs @@ -0,0 +1,854 @@ +//! The Metadata Cache holds the distinct values for a column or set of columns on a table + +use std::{ + cmp::Eq, + collections::{BTreeMap, BinaryHeap, HashSet}, + num::NonZeroUsize, + sync::Arc, + time::Duration, +}; + +use anyhow::{bail, Context}; +use arrow::{ + array::{ArrayRef, RecordBatch, StringViewBuilder}, + datatypes::{DataType, Field, SchemaBuilder, SchemaRef}, + error::ArrowError, +}; +use influxdb3_catalog::catalog::TableDefinition; +use influxdb3_id::ColumnId; +use influxdb3_wal::{FieldData, Row}; +use iox_time::TimeProvider; +use schema::{InfluxColumnType, InfluxFieldType}; + +/// A metadata cache for storing distinct values for a set of columns in a table +#[derive(Debug)] +pub struct MetaCache { + time_provider: Arc, + /// The maximum number of unique value combinations in the cache + max_cardinality: usize, + /// The maximum age for entries in the cache + max_age: Duration, + /// The fixed Arrow schema used to produce record batches from the cache + schema: SchemaRef, + /// Holds current state of the cache + state: MetaCacheState, + /// The identifiers of the columns used in the cache + column_ids: Vec, + /// The cache data, stored in a tree + data: Node, +} + +/// Type for tracking the current state of a [`MetaCache`] +#[derive(Debug, Default)] +struct MetaCacheState { + /// The current number of unique value combinations in the cache + cardinality: usize, +} + +/// Arguments to create a new [`MetaCache`] +#[derive(Debug)] +pub struct CreateMetaCacheArgs { + pub time_provider: Arc, + pub table_def: Arc, + pub max_cardinality: MaxCardinality, + pub max_age: MaxAge, + pub column_ids: Vec, +} + +#[derive(Debug, Clone, Copy)] +pub struct MaxCardinality(NonZeroUsize); + +#[cfg(test)] +impl MaxCardinality { + fn try_new(size: usize) -> Result { + Ok(Self( + NonZeroUsize::try_from(size).context("invalid size provided")?, + )) + } +} + +impl Default for MaxCardinality { + fn default() -> Self { + Self(NonZeroUsize::new(100_000).unwrap()) + } +} + +impl From for usize { + fn from(value: MaxCardinality) -> Self { + value.0.into() + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MaxAge(Duration); + +impl Default for MaxAge { + fn default() -> Self { + Self(Duration::from_secs(24 * 60 * 60)) + } +} + +impl From for Duration { + fn from(value: MaxAge) -> Self { + value.0 + } +} + +impl From for MaxAge { + fn from(duration: Duration) -> Self { + Self(duration) + } +} + +impl MetaCache { + /// Create a new [`MetaCache`] + /// + /// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided + /// [`TableDefinition`]. + pub fn new( + CreateMetaCacheArgs { + time_provider, + table_def, + max_cardinality, + max_age, + column_ids, + }: CreateMetaCacheArgs, + ) -> Result { + if column_ids.is_empty() { + bail!("must pass a non-empty set of column ids"); + } + let mut builder = SchemaBuilder::new(); + for id in &column_ids { + let col = table_def.columns.get(id).with_context(|| { + format!("invalid column id ({id}) encountered while creating metadata cache") + })?; + let data_type = match col.data_type { + InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => { + DataType::Utf8View + } + other => bail!( + "cannot use a column of type {other} in a metadata cache, only \ + tags and string fields can be used" + ), + }; + + builder.push(Arc::new(Field::new(col.name.as_ref(), data_type, false))); + } + Ok(Self { + time_provider, + max_cardinality: max_cardinality.into(), + max_age: max_age.into(), + state: MetaCacheState::default(), + schema: Arc::new(builder.finish()), + column_ids, + data: Node::default(), + }) + } + + /// Push a [`Row`] from the WAL into the cache, if the row contains all of the cached columns. + pub fn push(&mut self, row: &Row) { + let mut values = Vec::with_capacity(self.column_ids.len()); + for id in &self.column_ids { + let Some(value) = row + .fields + .iter() + .find(|f| &f.id == id) + .map(|f| Value::from(&f.value)) + else { + // ignore the row if it does not contain all columns in the cache: + return; + }; + values.push(value); + } + let mut target = &mut self.data; + let mut val_iter = values.into_iter().peekable(); + let mut is_new = false; + while let (Some(value), peek) = (val_iter.next(), val_iter.peek()) { + let (last_seen, node) = target.0.entry(value).or_insert_with(|| { + is_new = true; + (row.time, peek.is_some().then(Node::default)) + }); + *last_seen = row.time; + if let Some(node) = node { + target = node; + } else { + break; + } + } + if is_new { + self.state.cardinality += 1; + } + } + + /// Gather a record batch from a cache given the set of predicates + /// + /// This assumes the predicates are well behaved, and validated before being passed in. For example, + /// there cannot be multiple predicates on a single column; the caller needs to validate/transform + /// the incoming predicates from Datafusion before calling. + /// + /// Entries in the cache that have not been seen since before the `max_age` of the cache will be + /// filtered out of the result. + pub fn to_record_batch(&self, predicates: &[Predicate]) -> Result { + // predicates may not be provided for all columns in the cache, or not be provided in the + // order of columns in the cache. This re-orders them to the correct order, and fills in any + // gaps with None. + let predicates: Vec> = self + .column_ids + .iter() + .map(|id| predicates.iter().find(|p| &p.column_id == id)) + .collect(); + + // Uses a [`StringViewBuilder`] to compose the set of [`RecordBatch`]es. This is convenient for + // the sake of nested caches, where a predicate on a higher branch in the cache will need to have + // its value in the outputted batches duplicated. + let mut builders: Vec = (0..self.column_ids.len()) + .map(|_| StringViewBuilder::new()) + .collect(); + + let expired_time_ns = self.expired_time_ns(); + let _ = self + .data + .evaluate_predicates(expired_time_ns, &predicates, &mut builders); + + RecordBatch::try_new( + Arc::clone(&self.schema), + builders + .into_iter() + .map(|mut builder| Arc::new(builder.finish()) as ArrayRef) + .collect(), + ) + } + + /// Prune nodes from within the cache + /// + /// This first prunes entries that are older than the `max_age` of the cache. If the cardinality + /// of the cache is still over its `max_cardinality`, it will do another pass to bring the cache + /// size down. + pub fn prune(&mut self) { + let before_time_ns = self.expired_time_ns(); + let _ = self.data.remove_before(before_time_ns); + self.state.cardinality = self.data.cardinality(); + if self.state.cardinality > self.max_cardinality { + let n_to_remove = self.state.cardinality - self.max_cardinality; + self.data.remove_n_oldest(n_to_remove); + self.state.cardinality = self.data.cardinality(); + } + } + + /// Get the nanosecond timestamp as an `i64`, before which, entries that have not been seen + /// since are considered expired. + fn expired_time_ns(&self) -> i64 { + self.time_provider + .now() + .checked_sub(self.max_age) + .expect("max age on cache should not cause an overflow") + .timestamp_nanos() + } +} + +/// A node in the `data` tree of a [`MetaCache`] +/// +/// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and +/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to +/// the node in the next level of the tree. +#[derive(Debug, Default)] +struct Node(BTreeMap)>); + +impl Node { + /// Remove all elements before the given nanosecond timestamp returning `true` if the resulting + /// node is empty. + fn remove_before(&mut self, time_ns: i64) -> bool { + self.0.retain(|_, (last_seen, node)| { + // Note that for a branch node, the `last_seen` time will be the greatest of + // all its nodes, so an entire branch can be removed if its value has not been seen since + // before `time_ns`, hence the short-curcuit here: + *last_seen > time_ns + || node + .as_mut() + .is_some_and(|node| node.remove_before(time_ns)) + }); + self.0.is_empty() + } + + /// Remove the `n_to_remove` oldest entries from the cache + fn remove_n_oldest(&mut self, n_to_remove: usize) { + let mut times = BinaryHeap::with_capacity(n_to_remove); + self.find_n_oldest(n_to_remove, &mut times); + self.remove_before(*times.peek().unwrap()); + } + + /// Use a binary heap to find the time before which all nodes should be removed + fn find_n_oldest(&self, n_to_remove: usize, times_heap: &mut BinaryHeap) { + self.0 + .values() + // do not need to add the last_seen time for a branch node to the + // heap since it will be equal to the greatest of that of its leaves + .for_each(|(last_seen, node)| { + if let Some(node) = node { + node.find_n_oldest(n_to_remove, times_heap) + } else if times_heap.len() < n_to_remove { + times_heap.push(*last_seen); + } else if times_heap.peek().is_some_and(|newest| last_seen < newest) { + times_heap.pop(); + times_heap.push(*last_seen); + } + }); + } + + /// Get the total count of unique value combinations nested under this node + /// + /// Note that this includes expired elements, which still contribute to the total size of the + /// cache until they are pruned. + fn cardinality(&self) -> usize { + self.0 + .values() + .map(|(_, node)| node.as_ref().map_or(1, |node| node.cardinality())) + .sum() + } + + /// Evaluate the set of provided predicates against this node, adding values to the provided + /// [`StringViewBuilder`]s. Predicates and builders are provided as slices, as this is called + /// recursively down the cache tree. + /// + /// Returns the number of values that were added to the arrow builders. + /// + /// # Panics + /// + /// This will panic if invalid sized `predicates` and `builders` slices are passed in. When + /// called from the root [`Node`], their size should be that of the depth of the cache, i.e., + /// the number of columns in the cache. + fn evaluate_predicates( + &self, + expired_time_ns: i64, + predicates: &[Option<&Predicate>], + builders: &mut [StringViewBuilder], + ) -> usize { + let mut total_count = 0; + let (predicate, next_predicates) = predicates + .split_first() + .expect("predicates should not be empty"); + // if there is a predicate, evaluate it, otherwise, just grab everything from the node: + let values_and_nodes = if let Some(predicate) = predicate { + self.evaluate_predicate(expired_time_ns, predicate) + } else { + self.0 + .iter() + .filter(|&(_, (t, _))| (t > &expired_time_ns)) + .map(|(v, (_, n))| (v.clone(), n.as_ref())) + .collect() + }; + let (builder, next_builders) = builders + .split_first_mut() + .expect("builders should not be empty"); + // iterate over the resulting set of values and next nodes (if this is a branch), and add + // the values to the arrow builders: + for (value, node) in values_and_nodes { + if let Some(node) = node { + let count = + node.evaluate_predicates(expired_time_ns, next_predicates, next_builders); + if count > 0 { + // we are not on a terminal node in the cache, so create a block, as this value + // repeated `count` times, i.e., depending on how many values come out of + // subsequent nodes: + let block = builder.append_block(value.0.as_bytes().into()); + for _ in 0..count { + builder + .try_append_view(block, 0u32, value.0.as_bytes().len() as u32) + .expect("append view for known valid block, offset and length"); + } + total_count += count; + } + } else { + builder.append_value(value.0); + total_count += 1; + } + } + total_count + } + + /// Evaluate a predicate against a [`Node`], producing a list of [`Value`]s and, if this is a + /// branch node in the cache tree, a reference to the next [`Node`]. + fn evaluate_predicate( + &self, + expired_time_ns: i64, + predicate: &Predicate, + ) -> Vec<(Value, Option<&Node>)> { + match &predicate.kind { + PredicateKind::Eq(rhs) => self + .0 + .get_key_value(rhs) + .and_then(|(v, (t, n))| { + (t > &expired_time_ns).then(|| vec![(v.clone(), n.as_ref())]) + }) + .unwrap_or_default(), + PredicateKind::NotEq(rhs) => self + .0 + .iter() + .filter(|(v, (t, _))| t > &expired_time_ns && *v != rhs) + .map(|(v, (_, n))| (v.clone(), n.as_ref())) + .collect(), + PredicateKind::In(in_list) => in_list + .iter() + .filter_map(|v| { + self.0.get_key_value(v).and_then(|(v, (t, n))| { + (t > &expired_time_ns).then(|| (v.clone(), n.as_ref())) + }) + }) + .collect(), + PredicateKind::NotIn(not_in_set) => self + .0 + .iter() + .filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v)) + .map(|(v, (_, n))| (v.clone(), n.as_ref())) + .collect(), + } + } +} + +/// A cache value, which for now, only holds strings +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] +struct Value(Arc); + +impl From<&FieldData> for Value { + fn from(field: &FieldData) -> Self { + match field { + FieldData::Key(s) => Self(Arc::from(s.as_str())), + FieldData::Tag(s) => Self(Arc::from(s.as_str())), + FieldData::String(s) => Self(Arc::from(s.as_str())), + FieldData::Timestamp(_) + | FieldData::Integer(_) + | FieldData::UInteger(_) + | FieldData::Float(_) + | FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"), + } + } +} + +/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`] +/// +/// This is intended to be derived from a set of filter expressions in Datafusion +#[derive(Debug)] +pub struct Predicate { + column_id: ColumnId, + kind: PredicateKind, +} + +#[cfg(test)] +impl Predicate { + fn new_eq(column_id: ColumnId, rhs: impl Into>) -> Self { + Self { + column_id, + kind: PredicateKind::Eq(Value(rhs.into())), + } + } + + fn new_not_eq(column_id: ColumnId, rhs: impl Into>) -> Self { + Self { + column_id, + kind: PredicateKind::NotEq(Value(rhs.into())), + } + } + + fn new_in(column_id: ColumnId, in_vals: impl IntoIterator>>) -> Self { + Self { + column_id, + kind: PredicateKind::In(in_vals.into_iter().map(Into::into).map(Value).collect()), + } + } + + fn new_not_in(column_id: ColumnId, in_vals: impl IntoIterator>>) -> Self { + Self { + column_id, + kind: PredicateKind::NotIn(in_vals.into_iter().map(Into::into).map(Value).collect()), + } + } +} + +// TODO: remove this when fully implemented, for now just suppressing compiler warnings +#[allow(dead_code)] +#[derive(Debug)] +enum PredicateKind { + Eq(Value), + NotEq(Value), + In(Vec), + NotIn(HashSet), +} + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + use arrow_util::assert_batches_sorted_eq; + use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; + use influxdb3_id::ColumnId; + use influxdb3_wal::Row; + use influxdb3_write::write_buffer::validator::WriteValidator; + use iox_time::{MockProvider, Time, TimeProvider}; + + use crate::meta_cache::Predicate; + + use super::{CreateMetaCacheArgs, MaxAge, MaxCardinality, MetaCache}; + + struct TestWriter { + catalog: Arc, + } + + impl TestWriter { + const DB_NAME: &str = "test_db"; + + fn new() -> Self { + Self { + catalog: Arc::new(Catalog::new("test-host".into(), "test-instance".into())), + } + } + fn write_lp(&self, lp: impl AsRef, time_ns: i64) -> Vec { + let lines_parsed = WriteValidator::initialize( + Self::DB_NAME.try_into().unwrap(), + Arc::clone(&self.catalog), + time_ns, + ) + .expect("initialize write validator") + .v1_parse_lines_and_update_schema( + lp.as_ref(), + false, + Time::from_timestamp_nanos(time_ns), + influxdb3_write::Precision::Nanosecond, + ) + .expect("parse and validate v1 line protocol"); + + lines_parsed.into_inner().to_rows() + } + + fn db_schema(&self) -> Arc { + self.catalog + .db_schema(Self::DB_NAME) + .expect("db schema should be initialized") + } + } + + #[test] + fn evaluate_predicates() { + let writer = TestWriter::new(); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + // write some data to get a set of rows destined for the WAL, and an updated catalog: + let rows = writer.write_lp( + "\ + cpu,region=us-east,host=a usage=100\n\ + cpu,region=us-east,host=b usage=100\n\ + cpu,region=us-west,host=c usage=100\n\ + cpu,region=us-west,host=d usage=100\n\ + cpu,region=ca-east,host=e usage=100\n\ + cpu,region=ca-east,host=f usage=100\n\ + cpu,region=ca-cent,host=g usage=100\n\ + cpu,region=ca-cent,host=h usage=100\n\ + cpu,region=eu-east,host=i usage=100\n\ + cpu,region=eu-east,host=j usage=100\n\ + cpu,region=eu-cent,host=k usage=100\n\ + cpu,region=eu-cent,host=l usage=100\n\ + ", + 0, + ); + // grab the table definition for the table written to: + let table_def = writer.db_schema().table_definition("cpu").unwrap(); + // use the two tags, in order, to create the cache: + let column_ids: Vec = ["region", "host"] + .into_iter() + .map(|name| table_def.column_name_to_id_unchecked(name)) + .collect(); + let region_col_id = column_ids[0]; + let host_col_id = column_ids[1]; + // create the cache: + let mut cache = MetaCache::new(CreateMetaCacheArgs { + time_provider, + table_def, + max_cardinality: MaxCardinality::default(), + max_age: MaxAge::default(), + column_ids, + }) + .expect("create cache"); + // push the row data into the cache: + for row in rows { + cache.push(&row); + } + + // run a series of test cases with varying predicates: + struct TestCase<'a> { + desc: &'a str, + predicates: &'a [Predicate], + expected: &'a [&'a str], + } + + let test_cases = [ + TestCase { + desc: "no predicates", + predicates: &[], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | g |", + "| ca-cent | h |", + "| ca-east | e |", + "| ca-east | f |", + "| eu-cent | k |", + "| eu-cent | l |", + "| eu-east | i |", + "| eu-east | j |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + }, + TestCase { + desc: "eq predicate on region", + predicates: &[Predicate::new_eq(region_col_id, "us-east")], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| us-east | a |", + "| us-east | b |", + "+---------+------+", + ], + }, + TestCase { + desc: "eq predicate on host", + predicates: &[Predicate::new_eq(host_col_id, "h")], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | h |", + "+---------+------+", + ], + }, + TestCase { + desc: "eq predicate on region and host", + predicates: &[ + Predicate::new_eq(region_col_id, "ca-cent"), + Predicate::new_eq(host_col_id, "h"), + ], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | h |", + "+---------+------+", + ], + }, + TestCase { + desc: "not eq predicate on region", + predicates: &[Predicate::new_not_eq(region_col_id, "ca-cent")], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-east | e |", + "| ca-east | f |", + "| eu-cent | k |", + "| eu-cent | l |", + "| eu-east | i |", + "| eu-east | j |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + }, + TestCase { + desc: "not eq predicate on region and host", + predicates: &[ + Predicate::new_not_eq(region_col_id, "ca-cent"), + Predicate::new_not_eq(host_col_id, "a"), + ], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-east | e |", + "| ca-east | f |", + "| eu-cent | k |", + "| eu-cent | l |", + "| eu-east | i |", + "| eu-east | j |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + }, + TestCase { + desc: "in predicate on region", + predicates: &[Predicate::new_in(region_col_id, ["ca-cent", "ca-east"])], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | g |", + "| ca-cent | h |", + "| ca-east | e |", + "| ca-east | f |", + "+---------+------+", + ], + }, + TestCase { + desc: "in predicate on region and host", + predicates: &[ + Predicate::new_in(region_col_id, ["ca-cent", "ca-east"]), + Predicate::new_in(host_col_id, ["g", "e"]), + ], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| ca-cent | g |", + "| ca-east | e |", + "+---------+------+", + ], + }, + TestCase { + desc: "not in predicate on region", + predicates: &[Predicate::new_not_in(region_col_id, ["ca-cent", "ca-east"])], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| eu-cent | k |", + "| eu-cent | l |", + "| eu-east | i |", + "| eu-east | j |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + }, + TestCase { + desc: "not in predicate on region and host", + predicates: &[ + Predicate::new_not_in(region_col_id, ["ca-cent", "ca-east"]), + Predicate::new_not_in(host_col_id, ["j", "k"]), + ], + expected: &[ + "+---------+------+", + "| region | host |", + "+---------+------+", + "| eu-cent | l |", + "| eu-east | i |", + "| us-east | a |", + "| us-east | b |", + "| us-west | c |", + "| us-west | d |", + "+---------+------+", + ], + }, + ]; + + for tc in test_cases { + let records = cache + .to_record_batch(tc.predicates) + .expect("get record batches"); + println!("{}", tc.desc); + assert_batches_sorted_eq!(tc.expected, &[records]); + } + } + + #[test] + fn cache_pruning() { + let writer = TestWriter::new(); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + // write some data to update the catalog: + let _ = writer.write_lp( + "\ + cpu,region=us-east,host=a usage=100\n\ + ", + time_provider.now().timestamp_nanos(), + ); + // grab the table definition for the table written to: + let table_def = writer.db_schema().table_definition("cpu").unwrap(); + // use the two tags, in order, to create the cache: + let column_ids: Vec = ["region", "host"] + .into_iter() + .map(|name| table_def.column_name_to_id_unchecked(name)) + .collect(); + // create a cache with some cardinality and age limits: + let mut cache = MetaCache::new(CreateMetaCacheArgs { + time_provider: Arc::clone(&time_provider) as _, + table_def, + max_cardinality: MaxCardinality::try_new(10).unwrap(), + max_age: MaxAge::from(Duration::from_nanos(100)), + column_ids, + }) + .expect("create cache"); + // push a bunch of rows with incrementing times and varying tag values: + (0..10).for_each(|mult| { + time_provider.set(Time::from_timestamp_nanos(mult * 20)); + let rows = writer.write_lp( + format!( + "\ + cpu,region=us-east-{mult},host=a-{mult} usage=100\n\ + cpu,region=us-west-{mult},host=b-{mult} usage=100\n\ + cpu,region=us-cent-{mult},host=c-{mult} usage=100\n\ + " + ), + time_provider.now().timestamp_nanos(), + ); + // push the initial row data into the cache: + for row in rows { + cache.push(&row); + } + }); + // check the cache before prune: + // NOTE: this does not include entries that have surpassed the max_age of the cache, though, + // there are still more than the cache's max cardinality, as it has not yet been pruned. + let records = cache.to_record_batch(&[]).unwrap(); + assert_batches_sorted_eq!( + [ + "+-----------+------+", + "| region | host |", + "+-----------+------+", + "| us-cent-5 | c-5 |", + "| us-cent-6 | c-6 |", + "| us-cent-7 | c-7 |", + "| us-cent-8 | c-8 |", + "| us-cent-9 | c-9 |", + "| us-east-5 | a-5 |", + "| us-east-6 | a-6 |", + "| us-east-7 | a-7 |", + "| us-east-8 | a-8 |", + "| us-east-9 | a-9 |", + "| us-west-5 | b-5 |", + "| us-west-6 | b-6 |", + "| us-west-7 | b-7 |", + "| us-west-8 | b-8 |", + "| us-west-9 | b-9 |", + "+-----------+------+", + ], + &[records] + ); + cache.prune(); + let records = cache.to_record_batch(&[]).unwrap(); + assert_batches_sorted_eq!( + [ + "+-----------+------+", + "| region | host |", + "+-----------+------+", + "| us-cent-7 | c-7 |", + "| us-cent-8 | c-8 |", + "| us-cent-9 | c-9 |", + "| us-east-7 | a-7 |", + "| us-east-8 | a-8 |", + "| us-east-9 | a-9 |", + "| us-west-7 | b-7 |", + "| us-west-8 | b-8 |", + "| us-west-9 | b-9 |", + "+-----------+------+", + ], + &[records] + ); + } +} diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 6d3ec713b3..b0c9f0bfbf 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -941,10 +941,10 @@ impl TableDefinition { self.column_map.get_by_left(id).cloned() } - pub fn column_name_to_id_unchecked(&self, name: Arc) -> ColumnId { + pub fn column_name_to_id_unchecked(&self, name: impl Into>) -> ColumnId { *self .column_map - .get_by_right(&name) + .get_by_right(&name.into()) .expect("Column exists in mapping") } @@ -1262,7 +1262,7 @@ mod tests { println!("table: {table:#?}"); assert_eq!(table.column_map.len(), 2); - assert_eq!(table.column_name_to_id_unchecked("test2".into()), 1.into()); + assert_eq!(table.column_name_to_id_unchecked("test2"), 1.into()); } #[test] diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 3bd7163206..e6cd75f57e 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -3,7 +3,7 @@ pub mod persisted_files; pub mod queryable_buffer; mod table_buffer; -pub(crate) mod validator; +pub mod validator; use crate::chunk::ParquetChunk; use crate::last_cache::{self, CreateCacheArguments, LastCacheProvider}; diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index fccbaa8792..e063278161 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -20,7 +20,7 @@ use super::Error; /// Type state for the [`WriteValidator`] after it has been initialized /// with the catalog. -pub(crate) struct WithCatalog { +pub struct WithCatalog { catalog: Arc, db_schema: Arc, time_now_ns: i64, @@ -28,23 +28,33 @@ pub(crate) struct WithCatalog { /// Type state for the [`WriteValidator`] after it has parsed v1 or v3 /// line protocol. -pub(crate) struct LinesParsed { +pub struct LinesParsed { catalog: WithCatalog, lines: Vec, catalog_batch: Option, errors: Vec, } +impl LinesParsed { + /// Convert this set of parsed and qualified lines into a set of rows + /// + /// This is useful for testing when you need to use the write validator to parse line protocol + /// and get the raw row data for the WAL. + pub fn to_rows(self) -> Vec { + self.lines.into_iter().map(|line| line.row).collect() + } +} + /// A state machine for validating v1 or v3 line protocol and updating /// the [`Catalog`] with new tables or schema changes. -pub(crate) struct WriteValidator { +pub struct WriteValidator { state: State, } impl WriteValidator { /// Initialize the [`WriteValidator`] by getting a handle to, or creating /// a handle to the [`DatabaseSchema`] for the given namespace name `db_name`. - pub(crate) fn initialize( + pub fn initialize( db_name: NamespaceName<'static>, catalog: Arc, time_now_ns: i64, @@ -69,7 +79,7 @@ impl WriteValidator { /// /// If this function succeeds, then the catalog will receive an update, so /// steps following this should be infallible. - pub(crate) fn v3_parse_lines_and_update_schema( + pub fn v3_parse_lines_and_update_schema( self, lp: &str, accept_partial: bool, @@ -150,7 +160,7 @@ impl WriteValidator { /// /// If this function succeeds, then the catalog will receive an update, so /// steps following this should be infallible. - pub(crate) fn v1_parse_lines_and_update_schema( + pub fn v1_parse_lines_and_update_schema( self, lp: &str, accept_partial: bool, @@ -729,6 +739,13 @@ pub(crate) struct ValidatedLines { } impl WriteValidator { + /// Convert this into the inner [`LinesParsed`] + /// + /// This is mainly used for testing + pub fn into_inner(self) -> LinesParsed { + self.state + } + /// Convert a set of valid parsed `v3` lines to a [`ValidatedLines`] which will /// be buffered and written to the WAL, if configured. ///