feat: add metadata cache provider with APIs for write and query (#25566)

This adds the MetaDataCacheProvider for managing metadata caches in the
influxdb3 instance. This includes APIs to create caches through the WAL
as well as from a catalog on initialization, to write data into the
managed caches, and to query data out of them.

The query side is fairly involved, relying on Datafusion's TableFunctionImpl
and TableProvider traits to make querying the cache using a user-defined
table function (UDTF) possible.

The predicate code was modified to only support two kinds of predicates:
IN and NOT IN, which simplifies the code, and maps nicely with the DataFusion
LiteralGuarantee which we leverage to derive the predicates from the
incoming queries.

A custom ExecutionPlan implementation was added specifically for the
metadata cache that can report the predicates that are pushed down to
the cache during query planning/execution.

A big set of tests was added to to check that queries are working, and
that predicates are being pushed down properly.
praveen/dyn-query-executor
Trevor Hilton 2024-11-22 10:57:26 -05:00 committed by GitHub
parent 3cde24feb4
commit 8e23032ceb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 1762 additions and 597 deletions

7
Cargo.lock generated
View File

@ -2755,13 +2755,18 @@ version = "0.1.0"
dependencies = [
"anyhow",
"arrow",
"arrow_util",
"async-trait",
"datafusion",
"indexmap 2.6.0",
"influxdb3_catalog",
"influxdb3_id",
"influxdb3_wal",
"influxdb3_write",
"iox_time",
"parking_lot",
"schema",
"thiserror 1.0.69",
"tokio",
]
[[package]]

View File

@ -18,10 +18,14 @@ influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
anyhow.workspace = true
arrow.workspace = true
async-trait.workspace = true
datafusion.workspace = true
indexmap.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tokio.workspace = true
[dev-dependencies]
# Core Crates
arrow_util.workspace = true
# Local deps
influxdb3_write = { path = "../influxdb3_write" }

View File

@ -0,0 +1,513 @@
use std::{
collections::{BTreeMap, BTreeSet, BinaryHeap},
num::NonZeroUsize,
sync::Arc,
time::Duration,
};
use anyhow::{bail, Context};
use arrow::{
array::{ArrayRef, RecordBatch, StringViewBuilder},
datatypes::{DataType, Field, SchemaBuilder, SchemaRef},
error::ArrowError,
};
use indexmap::IndexMap;
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::ColumnId;
use influxdb3_wal::{FieldData, Row};
use iox_time::TimeProvider;
use schema::{InfluxColumnType, InfluxFieldType};
/// A metadata cache for storing distinct values for a set of columns in a table
#[derive(Debug)]
pub(crate) struct MetaCache {
time_provider: Arc<dyn TimeProvider>,
/// The maximum number of unique value combinations in the cache
max_cardinality: usize,
/// The maximum age for entries in the cache
max_age: Duration,
/// The fixed Arrow schema used to produce record batches from the cache
schema: SchemaRef,
/// Holds current state of the cache
state: MetaCacheState,
/// The identifiers of the columns used in the cache
column_ids: Vec<ColumnId>,
/// The cache data, stored in a tree
data: Node,
}
/// Type for tracking the current state of a [`MetaCache`]
#[derive(Debug, Default)]
struct MetaCacheState {
/// The current number of unique value combinations in the cache
cardinality: usize,
}
/// Arguments to create a new [`MetaCache`]
#[derive(Debug)]
pub struct CreateMetaCacheArgs {
pub table_def: Arc<TableDefinition>,
pub max_cardinality: MaxCardinality,
pub max_age: MaxAge,
pub column_ids: Vec<ColumnId>,
}
#[derive(Debug, Clone, Copy)]
pub struct MaxCardinality(NonZeroUsize);
impl TryFrom<usize> for MaxCardinality {
type Error = anyhow::Error;
fn try_from(value: usize) -> Result<Self, Self::Error> {
Ok(Self(
NonZeroUsize::try_from(value).context("invalid size provided")?,
))
}
}
impl Default for MaxCardinality {
fn default() -> Self {
Self(NonZeroUsize::new(100_000).unwrap())
}
}
impl From<MaxCardinality> for usize {
fn from(value: MaxCardinality) -> Self {
value.0.into()
}
}
#[derive(Debug, Clone, Copy)]
pub struct MaxAge(Duration);
impl Default for MaxAge {
fn default() -> Self {
Self(Duration::from_secs(24 * 60 * 60))
}
}
impl From<MaxAge> for Duration {
fn from(value: MaxAge) -> Self {
value.0
}
}
impl From<Duration> for MaxAge {
fn from(duration: Duration) -> Self {
Self(duration)
}
}
impl MaxAge {
pub(crate) fn as_seconds(&self) -> u64 {
self.0.as_secs()
}
}
impl MetaCache {
/// Create a new [`MetaCache`]
///
/// Must pass a non-empty set of [`ColumnId`]s which correspond to valid columns in the provided
/// [`TableDefinition`].
pub(crate) fn new(
time_provider: Arc<dyn TimeProvider>,
CreateMetaCacheArgs {
table_def,
max_cardinality,
max_age,
column_ids,
}: CreateMetaCacheArgs,
) -> Result<Self, anyhow::Error> {
if column_ids.is_empty() {
bail!("must pass a non-empty set of column ids");
}
let mut builder = SchemaBuilder::new();
for id in &column_ids {
let col = table_def.columns.get(id).with_context(|| {
format!("invalid column id ({id}) encountered while creating metadata cache")
})?;
let data_type = match col.data_type {
InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String) => {
DataType::Utf8View
}
other => bail!(
"cannot use a column of type {other} in a metadata cache, only \
tags and string fields can be used"
),
};
builder.push(Arc::new(Field::new(col.name.as_ref(), data_type, false)));
}
Ok(Self {
time_provider,
max_cardinality: max_cardinality.into(),
max_age: max_age.into(),
state: MetaCacheState::default(),
schema: Arc::new(builder.finish()),
column_ids,
data: Node::default(),
})
}
/// Push a [`Row`] from the WAL into the cache, if the row contains all of the cached columns.
pub(crate) fn push(&mut self, row: &Row) {
let mut values = Vec::with_capacity(self.column_ids.len());
for id in &self.column_ids {
let Some(value) = row
.fields
.iter()
.find(|f| &f.id == id)
.map(|f| Value::from(&f.value))
else {
// ignore the row if it does not contain all columns in the cache:
return;
};
values.push(value);
}
let mut target = &mut self.data;
let mut val_iter = values.into_iter().peekable();
let mut is_new = false;
while let (Some(value), peek) = (val_iter.next(), val_iter.peek()) {
let (last_seen, node) = target.0.entry(value).or_insert_with(|| {
is_new = true;
(row.time, peek.is_some().then(Node::default))
});
*last_seen = row.time;
if let Some(node) = node {
target = node;
} else {
break;
}
}
if is_new {
self.state.cardinality += 1;
}
}
/// Gather a record batch from a cache given the set of predicates
///
/// This assumes the predicates are well behaved, and validated before being passed in. For example,
/// there cannot be multiple predicates on a single column; the caller needs to validate/transform
/// the incoming predicates from Datafusion before calling.
///
/// Entries in the cache that have not been seen since before the `max_age` of the cache will be
/// filtered out of the result.
pub(crate) fn to_record_batch(
&self,
predicates: &IndexMap<ColumnId, Predicate>,
) -> Result<RecordBatch, ArrowError> {
// predicates may not be provided for all columns in the cache, or not be provided in the
// order of columns in the cache. This re-orders them to the correct order, and fills in any
// gaps with None.
let predicates: Vec<Option<&Predicate>> = self
.column_ids
.iter()
.map(|id| predicates.get(id))
.collect();
// Uses a [`StringViewBuilder`] to compose the set of [`RecordBatch`]es. This is convenient for
// the sake of nested caches, where a predicate on a higher branch in the cache will need to have
// its value in the outputted batches duplicated.
let mut builders: Vec<StringViewBuilder> = (0..self.column_ids.len())
.map(|_| StringViewBuilder::new())
.collect();
let expired_time_ns = self.expired_time_ns();
let _ =
self.data
.evaluate_predicates(expired_time_ns, predicates.as_slice(), &mut builders);
RecordBatch::try_new(
Arc::clone(&self.schema),
builders
.into_iter()
.map(|mut builder| Arc::new(builder.finish()) as ArrayRef)
.collect(),
)
}
/// Prune nodes from within the cache
///
/// This first prunes entries that are older than the `max_age` of the cache. If the cardinality
/// of the cache is still over its `max_cardinality`, it will do another pass to bring the cache
/// size down.
pub(crate) fn prune(&mut self) {
let before_time_ns = self.expired_time_ns();
let _ = self.data.remove_before(before_time_ns);
self.state.cardinality = self.data.cardinality();
if self.state.cardinality > self.max_cardinality {
let n_to_remove = self.state.cardinality - self.max_cardinality;
self.data.remove_n_oldest(n_to_remove);
self.state.cardinality = self.data.cardinality();
}
}
/// Get the nanosecond timestamp as an `i64`, before which, entries that have not been seen
/// since are considered expired.
fn expired_time_ns(&self) -> i64 {
self.time_provider
.now()
.checked_sub(self.max_age)
.expect("max age on cache should not cause an overflow")
.timestamp_nanos()
}
/// Get the arrow [`SchemaRef`] for this cache
pub(crate) fn arrow_schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
/// Compare the configuration of a given cache, producing a helpful error message if they differ
pub(crate) fn compare_config(&self, other: &Self) -> Result<(), anyhow::Error> {
if self.max_cardinality != other.max_cardinality {
bail!(
"incompatible `max_cardinality`, expected {}, got {}",
self.max_cardinality,
other.max_cardinality
);
}
if self.max_age != other.max_age {
bail!(
"incompatible `max_age`, expected {}, got {}",
self.max_age.as_secs(),
other.max_age.as_secs()
)
}
if self.column_ids != other.column_ids {
bail!(
"incompatible column id selection, expected {}, got {}",
self.column_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(","),
other
.column_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(","),
)
}
Ok(())
}
}
/// A node in the `data` tree of a [`MetaCache`]
///
/// Recursive struct holding a [`BTreeMap`] whose keys are the values nested under this node, and
/// whose values hold the last seen time as an [`i64`] of each value, and an optional reference to
/// the node in the next level of the tree.
#[derive(Debug, Default)]
struct Node(BTreeMap<Value, (i64, Option<Node>)>);
impl Node {
/// Remove all elements before the given nanosecond timestamp returning `true` if the resulting
/// node is empty.
fn remove_before(&mut self, time_ns: i64) -> bool {
self.0.retain(|_, (last_seen, node)| {
// Note that for a branch node, the `last_seen` time will be the greatest of
// all its nodes, so an entire branch can be removed if its value has not been seen since
// before `time_ns`, hence the short-curcuit here:
*last_seen > time_ns
|| node
.as_mut()
.is_some_and(|node| node.remove_before(time_ns))
});
self.0.is_empty()
}
/// Remove the `n_to_remove` oldest entries from the cache
fn remove_n_oldest(&mut self, n_to_remove: usize) {
let mut times = BinaryHeap::with_capacity(n_to_remove);
self.find_n_oldest(n_to_remove, &mut times);
self.remove_before(*times.peek().unwrap());
}
/// Use a binary heap to find the time before which all nodes should be removed
fn find_n_oldest(&self, n_to_remove: usize, times_heap: &mut BinaryHeap<i64>) {
self.0
.values()
// do not need to add the last_seen time for a branch node to the
// heap since it will be equal to the greatest of that of its leaves
.for_each(|(last_seen, node)| {
if let Some(node) = node {
node.find_n_oldest(n_to_remove, times_heap)
} else if times_heap.len() < n_to_remove {
times_heap.push(*last_seen);
} else if times_heap.peek().is_some_and(|newest| last_seen < newest) {
times_heap.pop();
times_heap.push(*last_seen);
}
});
}
/// Get the total count of unique value combinations nested under this node
///
/// Note that this includes expired elements, which still contribute to the total size of the
/// cache until they are pruned.
fn cardinality(&self) -> usize {
self.0
.values()
.map(|(_, node)| node.as_ref().map_or(1, |node| node.cardinality()))
.sum()
}
/// Evaluate the set of provided predicates against this node, adding values to the provided
/// [`StringViewBuilder`]s. Predicates and builders are provided as slices, as this is called
/// recursively down the cache tree.
///
/// Returns the number of values that were added to the arrow builders.
///
/// # Panics
///
/// This will panic if invalid sized `predicates` and `builders` slices are passed in. When
/// called from the root [`Node`], their size should be that of the depth of the cache, i.e.,
/// the number of columns in the cache.
fn evaluate_predicates(
&self,
expired_time_ns: i64,
predicates: &[Option<&Predicate>],
builders: &mut [StringViewBuilder],
) -> usize {
let mut total_count = 0;
let (predicate, next_predicates) = predicates
.split_first()
.expect("predicates should not be empty");
// if there is a predicate, evaluate it, otherwise, just grab everything from the node:
let values_and_nodes = if let Some(predicate) = predicate {
self.evaluate_predicate(expired_time_ns, predicate)
} else {
self.0
.iter()
.filter(|&(_, (t, _))| (t > &expired_time_ns))
.map(|(v, (_, n))| (v.clone(), n.as_ref()))
.collect()
};
let (builder, next_builders) = builders
.split_first_mut()
.expect("builders should not be empty");
// iterate over the resulting set of values and next nodes (if this is a branch), and add
// the values to the arrow builders:
for (value, node) in values_and_nodes {
if let Some(node) = node {
let count =
node.evaluate_predicates(expired_time_ns, next_predicates, next_builders);
if count > 0 {
// we are not on a terminal node in the cache, so create a block, as this value
// repeated `count` times, i.e., depending on how many values come out of
// subsequent nodes:
let block = builder.append_block(value.0.as_bytes().into());
for _ in 0..count {
builder
.try_append_view(block, 0u32, value.0.as_bytes().len() as u32)
.expect("append view for known valid block, offset and length");
}
total_count += count;
}
} else {
builder.append_value(value.0);
total_count += 1;
}
}
total_count
}
/// Evaluate a predicate against a [`Node`], producing a list of [`Value`]s and, if this is a
/// branch node in the cache tree, a reference to the next [`Node`].
fn evaluate_predicate(
&self,
expired_time_ns: i64,
predicate: &Predicate,
) -> Vec<(Value, Option<&Node>)> {
match &predicate {
Predicate::In(in_list) => in_list
.iter()
.filter_map(|v| {
self.0.get_key_value(v).and_then(|(v, (t, n))| {
(t > &expired_time_ns).then(|| (v.clone(), n.as_ref()))
})
})
.collect(),
Predicate::NotIn(not_in_set) => self
.0
.iter()
.filter(|(v, (t, _))| t > &expired_time_ns && !not_in_set.contains(v))
.map(|(v, (_, n))| (v.clone(), n.as_ref()))
.collect(),
}
}
}
/// A cache value, which for now, only holds strings
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash)]
pub(crate) struct Value(Arc<str>);
impl From<&FieldData> for Value {
fn from(field: &FieldData) -> Self {
match field {
FieldData::Key(s) => Self(Arc::from(s.as_str())),
FieldData::Tag(s) => Self(Arc::from(s.as_str())),
FieldData::String(s) => Self(Arc::from(s.as_str())),
FieldData::Timestamp(_)
| FieldData::Integer(_)
| FieldData::UInteger(_)
| FieldData::Float(_)
| FieldData::Boolean(_) => panic!("unexpected field type for metadata cache"),
}
}
}
impl From<String> for Value {
fn from(value: String) -> Self {
Self(Arc::from(value.as_str()))
}
}
/// A predicate that can be applied when gathering [`RecordBatch`]es from a [`MetaCache`]
///
/// This is intended to be derived from a set of filter expressions in Datafusion by analyzing
/// them with a `LiteralGuarantee`.
///
/// This uses a `BTreeSet` to store the values so that they are iterated over in sorted order.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum Predicate {
In(BTreeSet<Value>),
NotIn(BTreeSet<Value>),
}
impl std::fmt::Display for Predicate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Predicate::In(_) => write!(f, "IN (")?,
Predicate::NotIn(_) => write!(f, "NOT IN (")?,
}
let mut values = self.values();
while let Some(v) = values.next() {
write!(f, "{}", v.0)?;
if values.size_hint().0 > 0 {
write!(f, ",")?;
}
}
write!(f, ")")
}
}
impl Predicate {
pub(crate) fn new_in(in_vals: impl IntoIterator<Item: Into<Arc<str>>>) -> Self {
Self::In(in_vals.into_iter().map(Into::into).map(Value).collect())
}
pub(crate) fn new_not_in(in_vals: impl IntoIterator<Item: Into<Arc<str>>>) -> Self {
Self::NotIn(in_vals.into_iter().map(Into::into).map(Value).collect())
}
fn values(&self) -> impl Iterator<Item = &Value> {
match self {
Predicate::In(vals) => vals.iter(),
Predicate::NotIn(vals) => vals.iter(),
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,310 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::Context;
use arrow::datatypes::SchemaRef;
use influxdb3_catalog::catalog::{Catalog, TableDefinition};
use influxdb3_id::{DbId, TableId};
use influxdb3_wal::{MetaCacheDefinition, WalContents, WalOp};
use iox_time::TimeProvider;
use parking_lot::RwLock;
use crate::meta_cache::cache::{MaxAge, MaxCardinality};
use super::cache::{CreateMetaCacheArgs, MetaCache};
#[derive(Debug, thiserror::Error)]
#[error("metadata cache provider error: {0:#}")]
pub struct Error(#[from] anyhow::Error);
/// Triple nested map for storing a multiple metadata caches per table.
///
/// That is, the map nesting is `database -> table -> cache name`
type CacheMap = RwLock<HashMap<DbId, HashMap<TableId, HashMap<Arc<str>, MetaCache>>>>;
/// Provides the metadata caches for the running instance of InfluxDB
#[derive(Debug)]
pub struct MetaCacheProvider {
pub(crate) time_provider: Arc<dyn TimeProvider>,
pub(crate) catalog: Arc<Catalog>,
pub(crate) cache_map: CacheMap,
}
impl MetaCacheProvider {
/// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's
/// `cache_map` from the definitions in the catalog.
pub fn new_from_catalog(
time_provider: Arc<dyn TimeProvider>,
catalog: Arc<Catalog>,
) -> Result<Arc<Self>, Error> {
let provider = Arc::new(MetaCacheProvider {
time_provider,
catalog: Arc::clone(&catalog),
cache_map: Default::default(),
});
for db_schema in catalog.list_db_schema() {
for table_def in db_schema.tables() {
for (cache_name, cache_def) in table_def.meta_caches() {
assert!(
provider
.create_cache(
db_schema.id,
Some(cache_name),
CreateMetaCacheArgs {
table_def: Arc::clone(&table_def),
max_cardinality: MaxCardinality::try_from(
cache_def.max_cardinality
)?,
max_age: MaxAge::from(Duration::from_secs(
cache_def.max_age_seconds
)),
column_ids: cache_def.column_ids.to_vec()
}
)?
.is_some(),
"there should not be duplicated cache definitions in the catalog"
)
}
}
}
Ok(provider)
}
/// Initialize a [`MetaCacheProvider`] from a [`Catalog`], populating the provider's
/// `cache_map` from the definitions in the catalog. This starts a background process that
/// runs on the provided `eviction_interval` to perform eviction on all of the caches
/// in the created [`MetaCacheProvider`]'s `cache_map`.
pub fn new_from_catalog_with_background_eviction(
time_provider: Arc<dyn TimeProvider>,
catalog: Arc<Catalog>,
eviction_interval: Duration,
) -> Result<Arc<Self>, Error> {
let provider = Self::new_from_catalog(time_provider, catalog)?;
background_eviction_process(Arc::clone(&provider), eviction_interval);
Ok(provider)
}
/// Get a particular cache's name and arrow schema
///
/// This is used for the implementation of DataFusion's `TableFunctionImpl` and
/// `TableProvider` traits as a convenience method for the scenario where there is only a
/// single cache on a table, and therefore one does not need to specify the cache name
/// in addition to the db/table identifiers.
pub(crate) fn get_cache_name_and_schema(
&self,
db_id: DbId,
table_id: TableId,
cache_name: Option<&str>,
) -> Option<(Arc<str>, SchemaRef)> {
self.cache_map
.read()
.get(&db_id)
.and_then(|db| db.get(&table_id))
.and_then(|table| {
if let Some(cache_name) = cache_name {
table
.get_key_value(cache_name)
.map(|(name, mc)| (Arc::clone(name), mc.arrow_schema()))
} else if table.len() == 1 {
table
.iter()
.map(|(name, mc)| (Arc::clone(name), mc.arrow_schema()))
.next()
} else {
None
}
})
}
/// Create a new entry in the metadata cache for a given database and parameters.
///
/// If a new cache is created, this will return the [`MetaCacheDefinition`] for the created
/// cache; otherwise, if the provided arguments are identical to an existing cache, along with
/// any defaults, then `None` will be returned. It is an error to attempt to create a cache that
/// overwite an existing one with different parameters.
///
/// The cache name is optional; if not provided, it will be of the form:
/// ```text
/// <table_name>_<column_names>_meta_cache
/// ```
/// Where `<column_names>` is an `_`-separated list of the column names used in the cache.
pub fn create_cache(
&self,
db_id: DbId,
cache_name: Option<Arc<str>>,
CreateMetaCacheArgs {
table_def,
max_cardinality,
max_age,
column_ids,
}: CreateMetaCacheArgs,
) -> Result<Option<MetaCacheDefinition>, Error> {
let cache_name = if let Some(cache_name) = cache_name {
cache_name
} else {
format!(
"{table_name}_{cols}_meta_cache",
table_name = table_def.table_name,
cols = column_ids
.iter()
.map(
|id| table_def.column_id_to_name(id).with_context(|| format!(
"invalid column id ({id}) encountered in cache creation arguments"
))
)
.collect::<Result<Vec<_>, anyhow::Error>>()?
.join("_")
)
.into()
};
let new_cache = MetaCache::new(
Arc::clone(&self.time_provider),
CreateMetaCacheArgs {
table_def: Arc::clone(&table_def),
max_cardinality,
max_age,
column_ids: column_ids.clone(),
},
)?;
let mut lock = self.cache_map.write();
if let Some(cache) = lock
.get(&db_id)
.and_then(|db| db.get(&table_def.table_id))
.and_then(|table| table.get(&cache_name))
{
return cache
.compare_config(&new_cache)
.map(|_| None)
.map_err(Into::into);
}
lock.entry(db_id)
.or_default()
.entry(table_def.table_id)
.or_default()
.insert(Arc::clone(&cache_name), new_cache);
Ok(Some(MetaCacheDefinition {
table_id: table_def.table_id,
table_name: Arc::clone(&table_def.table_name),
cache_name,
column_ids,
max_cardinality: max_cardinality.into(),
max_age_seconds: max_age.as_seconds(),
}))
}
/// Create a new cache given the database schema and WAL definition. This is useful during WAL
/// replay.
pub fn create_from_definition(
&self,
db_id: DbId,
table_def: Arc<TableDefinition>,
definition: &MetaCacheDefinition,
) {
let meta_cache = MetaCache::new(
Arc::clone(&self.time_provider),
CreateMetaCacheArgs {
table_def,
max_cardinality: definition
.max_cardinality
.try_into()
.expect("definition should have a valid max_cardinality"),
max_age: MaxAge::from(Duration::from_secs(definition.max_age_seconds)),
column_ids: definition.column_ids.to_vec(),
},
)
.expect("definition should be valid coming from the WAL");
self.cache_map
.write()
.entry(db_id)
.or_default()
.entry(definition.table_id)
.or_default()
.insert(Arc::clone(&definition.cache_name), meta_cache);
}
/// Delete a cache from the provider
///
/// This also cleans up the provider hierarchy, so if the delete leaves a branch for a given
/// table or its parent database empty, this will remove that branch.
pub fn delete_cache(
&self,
db_id: DbId,
table_id: TableId,
cache_name: &str,
) -> Result<(), Error> {
let mut lock = self.cache_map.write();
let db = lock.get_mut(&db_id).context("database does not exist")?;
let table = db.get_mut(&table_id).context("table does not exist")?;
table.remove(cache_name).context("cache does not exist")?;
if table.is_empty() {
db.remove(&table_id);
}
if db.is_empty() {
lock.remove(&db_id);
}
Ok(())
}
/// Write the contents of a WAL file to the cache by iterating over its database and table
/// batches to find entries that belong in the cache.
pub fn write_wal_contents_to_cache(&self, wal_contents: &WalContents) {
let mut lock = self.cache_map.write();
for op in &wal_contents.ops {
let WalOp::Write(write_batch) = op else {
continue;
};
let Some(db_caches) = lock.get_mut(&write_batch.database_id) else {
continue;
};
if db_caches.is_empty() {
continue;
}
for (table_id, table_chunks) in &write_batch.table_chunks {
let Some(table_caches) = db_caches.get_mut(table_id) else {
continue;
};
if table_caches.is_empty() {
continue;
}
for (_, cache) in table_caches.iter_mut() {
for chunk in table_chunks.chunk_time_to_chunk.values() {
for row in &chunk.rows {
cache.push(row);
}
}
}
}
}
}
/// Run eviction across all caches in the provider.
pub fn evict_cache_entries(&self) {
let mut lock = self.cache_map.write();
lock.iter_mut().for_each(|(_, db_caches)| {
db_caches.iter_mut().for_each(|(_, table_caches)| {
table_caches.iter_mut().for_each(|(_, cache)| cache.prune())
})
});
}
}
fn background_eviction_process(
provider: Arc<MetaCacheProvider>,
eviction_interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(eviction_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
provider.evict_cache_entries();
}
})
}

View File

@ -0,0 +1,359 @@
use std::{any::Any, sync::Arc};
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::{Session, TableProvider},
common::{internal_err, plan_err, DFSchema, Result},
datasource::{function::TableFunctionImpl, TableType},
execution::context::ExecutionProps,
logical_expr::TableProviderFilterPushDown,
physical_expr::{
create_physical_expr,
utils::{Guarantee, LiteralGuarantee},
},
physical_plan::{memory::MemoryExec, DisplayAs, DisplayFormatType, ExecutionPlan},
prelude::Expr,
scalar::ScalarValue,
};
use indexmap::IndexMap;
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::{ColumnId, DbId};
use super::{cache::Predicate, MetaCacheProvider};
/// The name used to call the metadata cache in SQL queries
pub const META_CACHE_UDTF_NAME: &str = "meta_cache";
/// Implementor of the [`TableProvider`] trait that is produced a call to the [`MetaCacheFunction`]
struct MetaCacheFunctionProvider {
/// Reference to the [`MetaCache`][super::cache::MetaCache] being queried's schema
schema: SchemaRef,
/// Forwarded ref to the [`MetaCacheProvider`] which is used to get the
/// [`MetaCache`][super::cache::MetaCache] for the query, along with the `db_id` and
/// `table_def`. This is done instead of passing forward a reference to the `MetaCache`
/// directly because doing so is not easy or possible with the Rust borrow checker.
provider: Arc<MetaCacheProvider>,
/// The database ID that the called cache is related to
db_id: DbId,
/// The table definition that the called cache is related to
table_def: Arc<TableDefinition>,
/// The name of the cache, which is determined when calling the `meta_cache` function
cache_name: Arc<str>,
}
#[async_trait]
impl TableProvider for MetaCacheFunctionProvider {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
TableType::Temporary
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
async fn scan(
&self,
ctx: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let read = self.provider.cache_map.read();
let (batches, predicates) = if let Some(cache) = read
.get(&self.db_id)
.and_then(|db| db.get(&self.table_def.table_id))
.and_then(|tbl| tbl.get(&self.cache_name))
{
let predicates = convert_filter_exprs(&self.table_def, self.schema(), filters)?;
(
cache
.to_record_batch(&predicates)
.map(|batch| vec![batch])?,
(!predicates.is_empty()).then_some(predicates),
)
} else {
(vec![], None)
};
let mut exec =
MetaCacheExec::try_new(predicates, &[batches], self.schema(), projection.cloned())?;
let show_sizes = ctx.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);
Ok(Arc::new(exec))
}
}
/// Convert the given list of filter expressions to a map of [`ColumnId`] to [`Predicate`]
///
/// The resulting map uses [`IndexMap`] to ensure consistent ordering of the map. This makes testing
/// the filter conversion significantly easier using EXPLAIN queries.
fn convert_filter_exprs(
table_def: &TableDefinition,
cache_schema: SchemaRef,
filters: &[Expr],
) -> Result<IndexMap<ColumnId, Predicate>> {
let mut predicate_map: IndexMap<ColumnId, Option<Predicate>> = IndexMap::new();
// for create_physical_expr:
let schema: DFSchema = cache_schema.try_into()?;
let props = ExecutionProps::new();
// The set of `filters` that are passed in from DataFusion varies: 1) based on how they are
// defined in the query, and 2) based on some decisions that DataFusion makes when parsing the
// query into the `Expr` syntax tree. For example, the predicate:
//
// WHERE foo IN ('bar', 'baz')
//
// instead of being expressed as an `InList`, would be simplified to the following `Expr` tree:
//
// [
// BinaryExpr {
// left: BinaryExpr { left: "foo", op: Eq, right: "bar" },
// op: Or,
// right: BinaryExpr { left: "foo", op: Eq, right: "baz" }
// }
// ]
//
// while the predicate:
//
// WHERE foo = 'bar' OR foo = 'baz' OR foo = 'bop' OR foo = 'bla'
//
// instead of being expressed as a tree of `BinaryExpr`s, is expressed as an `InList` with four
// entries:
//
// [
// InList { col: "foo", values: ["bar", "baz", "bop", "bla"], negated: false }
// ]
//
// Instead of handling all the combinations of `Expr`s that may be passed by the caller of
// `TableProider::scan`, we can use the cache's schema to convert each `Expr` to a `PhysicalExpr`
// and analyze it using DataFusion's `LiteralGuarantee`.
//
// This will distill the provided set of `Expr`s down to either an IN list, or a NOT IN list
// which we can convert to the `Predicate` type for the metadata cache.
//
// The main caveat is that if for some reason there are multiple `Expr`s that apply predicates
// on a given column, i.e., leading to multiple `LiteralGuarantee`s on a specific column, we
// discard those predicates and have DataFusion handle the filtering.
//
// This is a conservative approach; it may be that we can combine multiple literal guarantees on
// a single column, but thusfar, from testing in the parent module, this does not seem necessary.
for expr in filters {
let physical_expr = create_physical_expr(expr, &schema, &props)?;
let literal_guarantees = LiteralGuarantee::analyze(&physical_expr);
for LiteralGuarantee {
column,
guarantee,
literals,
} in literal_guarantees
{
let Some(column_id) = table_def.column_name_to_id(column.name()) else {
return plan_err!(
"invalid column name in filter expression: {}",
column.name()
);
};
let value_iter = literals.into_iter().filter_map(|l| match l {
ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => Some(s),
_ => None,
});
let predicate = match guarantee {
Guarantee::In => Predicate::new_in(value_iter),
Guarantee::NotIn => Predicate::new_not_in(value_iter),
};
predicate_map
.entry(column_id)
.and_modify(|e| {
// We do not currently support multiple literal guarantees per column.
//
// In this case we replace the predicate with None so that it does not filter
// any records from the cache downstream. Datafusion will still do filtering at
// a higher level, once _all_ records are produced from the cache.
e.take();
})
.or_insert_with(|| Some(predicate));
}
}
Ok(predicate_map
.into_iter()
.filter_map(|(column_id, predicate)| predicate.map(|predicate| (column_id, predicate)))
.collect())
}
/// Implementor of the [`TableFunctionImpl`] trait, to be registered as a user-defined table function
/// in the Datafusion `SessionContext`.
#[derive(Debug)]
pub struct MetaCacheFunction {
db_id: DbId,
provider: Arc<MetaCacheProvider>,
}
impl MetaCacheFunction {
pub fn new(db_id: DbId, provider: Arc<MetaCacheProvider>) -> Self {
Self { db_id, provider }
}
}
impl TableFunctionImpl for MetaCacheFunction {
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(table_name)))) = args.first() else {
return plan_err!("first argument must be the table name as a string");
};
let cache_name = match args.get(1) {
Some(Expr::Literal(ScalarValue::Utf8(Some(name)))) => Some(name),
Some(_) => {
return plan_err!("second argument, if passed, must be the cache name as a string")
}
None => None,
};
let Some(table_def) = self
.provider
.catalog
.db_schema_by_id(&self.db_id)
.and_then(|db| db.table_definition(table_name.as_str()))
else {
return plan_err!("provided table name ({}) is invalid", table_name);
};
let Some((cache_name, schema)) = self.provider.get_cache_name_and_schema(
self.db_id,
table_def.table_id,
cache_name.map(|n| n.as_str()),
) else {
return plan_err!("could not find meta cache for the given arguments");
};
Ok(Arc::new(MetaCacheFunctionProvider {
schema,
provider: Arc::clone(&self.provider),
db_id: self.db_id,
table_def,
cache_name,
}))
}
}
/// Custom implementor of the [`ExecutionPlan`] trait for use by the metadata cache
///
/// Wraps a [`MemoryExec`] from DataFusion, and mostly re-uses that. The special functionality
/// provided by this type is to track the predicates that are pushed down to the underlying cache
/// during query planning/execution.
///
/// # Example
///
/// For a query that does not provide any predicates, or one that does provide predicates, but they
/// do no get pushed down, the `EXPLAIN` for said query will contain a line for the `MetaCacheExec`
/// with no predicates, including what is emitted by the inner `MemoryExec`:
///
/// ```text
/// MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]
/// ```
///
/// For queries that do have predicates that get pushed down, the output will include them, e.g.:
///
/// ```text
/// MetaCacheExec: predicates=[[0 IN (us-east)], [1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]
/// ```
#[derive(Debug)]
struct MetaCacheExec {
inner: MemoryExec,
predicates: Option<IndexMap<ColumnId, Predicate>>,
}
impl MetaCacheExec {
fn try_new(
predicates: Option<IndexMap<ColumnId, Predicate>>,
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
Ok(Self {
inner: MemoryExec::try_new(partitions, schema, projection)?,
predicates,
})
}
fn with_show_sizes(self, show_sizes: bool) -> Self {
Self {
inner: self.inner.with_show_sizes(show_sizes),
..self
}
}
}
impl DisplayAs for MetaCacheExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "MetaCacheExec:")?;
if let Some(predicates) = self.predicates.as_ref() {
write!(f, " predicates=[")?;
let mut p_iter = predicates.iter();
while let Some((col_id, predicate)) = p_iter.next() {
write!(f, "[{col_id} {predicate}]")?;
if p_iter.size_hint().0 > 0 {
write!(f, ", ")?;
}
}
write!(f, "]")?;
}
write!(f, " inner=")?;
self.inner.fmt_as(t, f)
}
}
}
}
impl ExecutionPlan for MetaCacheExec {
fn name(&self) -> &str {
"MetaCacheExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.inner.properties()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.inner.children()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
// (copied from MemoryExec):
// MemoryExec has no children
if children.is_empty() {
Ok(self)
} else {
internal_err!("Children cannot be replaced in {self:?}")
}
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::TaskContext>,
) -> Result<datafusion::execution::SendableRecordBatchStream> {
self.inner.execute(partition, context)
}
}

View File

@ -7,7 +7,7 @@ use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteTableDefinition, FieldAdditions, LastCacheDefinition,
LastCacheDelete,
LastCacheDelete, MetaCacheDefinition,
};
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
@ -709,6 +709,7 @@ pub struct TableDefinition {
pub column_map: BiHashMap<ColumnId, Arc<str>>,
pub series_key: Option<Vec<ColumnId>>,
pub last_caches: HashMap<Arc<str>, LastCacheDefinition>,
pub meta_caches: HashMap<Arc<str>, MetaCacheDefinition>,
pub deleted: bool,
}
@ -764,6 +765,7 @@ impl TableDefinition {
column_map,
series_key,
last_caches: HashMap::new(),
meta_caches: HashMap::new(),
deleted: false,
})
}
@ -993,6 +995,12 @@ impl TableDefinition {
.map(|(name, def)| (Arc::clone(name), def))
}
pub fn meta_caches(&self) -> impl Iterator<Item = (Arc<str>, &MetaCacheDefinition)> {
self.meta_caches
.iter()
.map(|(name, def)| (Arc::clone(name), def))
}
pub fn column_name_to_id(&self, name: impl Into<Arc<str>>) -> Option<ColumnId> {
self.column_map.get_by_right(&name.into()).copied()
}

View File

@ -19,6 +19,10 @@ pub fn wal_contents(
}
}
pub fn write_batch_op(write_batch: WriteBatch) -> WalOp {
WalOp::Write(write_batch)
}
pub fn catalog_batch_op(
db_id: DbId,
db_name: impl Into<Arc<str>>,

View File

@ -493,6 +493,23 @@ pub struct LastCacheDelete {
pub name: Arc<str>,
}
/// Defines a metadata cache in a given table and database
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct MetaCacheDefinition {
/// The id of the associated table
pub table_id: TableId,
/// The name of the associated table
pub table_name: Arc<str>,
/// The name of the cache, is unique within the associated table
pub cache_name: Arc<str>,
/// The ids of columns tracked by this metadata cache, in the defined order
pub column_ids: Vec<ColumnId>,
/// The maximum number of distinct value combintions the cache will hold
pub max_cardinality: usize,
/// The maximum age in seconds, similar to a time-to-live (TTL), for entries in the cache
pub max_age_seconds: u64,
}
#[serde_as]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct WriteBatch {

View File

@ -723,7 +723,7 @@ fn validate_and_qualify_v1_line(
/// Result of conversion from line protocol to valid chunked data
/// for the buffer.
#[derive(Debug)]
pub(crate) struct ValidatedLines {
pub struct ValidatedLines {
/// Number of lines passed in
pub(crate) line_count: usize,
/// Number of fields passed in
@ -738,6 +738,12 @@ pub(crate) struct ValidatedLines {
pub(crate) catalog_updates: Option<CatalogBatch>,
}
impl From<ValidatedLines> for WriteBatch {
fn from(value: ValidatedLines) -> Self {
value.valid_data
}
}
impl WriteValidator<LinesParsed> {
/// Convert this into the inner [`LinesParsed`]
///
@ -752,7 +758,7 @@ impl WriteValidator<LinesParsed> {
/// This involves splitting out the writes into different batches for each chunk, which will
/// map to the `Gen1Duration`. This function should be infallible, because
/// the schema for incoming writes has been fully validated.
pub(crate) fn convert_lines_to_buffer(self, gen1_duration: Gen1Duration) -> ValidatedLines {
pub fn convert_lines_to_buffer(self, gen1_duration: Gen1Duration) -> ValidatedLines {
let mut table_chunks = IndexMap::new();
let line_count = self.state.lines.len();
let mut field_count = 0;