feat: LastCacheExec to track predicate pushdown in last cache queries (#25621)

pull/25626/head
Trevor Hilton 2024-12-06 10:53:19 -08:00 committed by GitHub
parent 9b87cd7a65
commit 154ff7da23
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 807 additions and 259 deletions

View File

@ -1,5 +1,5 @@
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{BTreeSet, HashMap, HashSet, VecDeque},
ops::Deref,
sync::Arc,
time::{Duration, Instant},
@ -17,11 +17,6 @@ use arrow::{
},
error::ArrowError,
};
use datafusion::{
logical_expr::{expr::InList, BinaryExpr, Operator},
prelude::Expr,
scalar::ScalarValue,
};
use indexmap::{IndexMap, IndexSet};
use influxdb3_catalog::catalog::{ColumnDefinition, TableDefinition, TIME_COLUMN_NAME};
use influxdb3_id::{ColumnId, TableId};
@ -56,8 +51,6 @@ pub(crate) struct LastCache {
/// map preserves the order of the elements, thereby maintaining the order of the keys in
/// the cache.
pub(crate) key_column_ids: Arc<IndexSet<ColumnId>>,
/// The key columns for this cache, by their names
pub(crate) key_column_name_to_ids: Arc<HashMap<Arc<str>, ColumnId>>,
/// The value columns for this cache
pub(crate) value_columns: ValueColumnType,
/// The Arrow Schema for the table that this cache is associated with
@ -86,6 +79,7 @@ pub struct CreateLastCacheArgs {
/// The default cache time-to-live (TTL) is 4 hours
pub(crate) const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 4);
/// The time to live (TTL) for entries in the cache
#[derive(Debug, Clone, Copy, Deserialize)]
pub struct LastCacheTtl(Duration);
@ -115,6 +109,7 @@ impl Default for LastCacheTtl {
}
}
/// Specifies the key column configuration for a new [`LastCache`]
#[derive(Debug, Default, Clone)]
pub enum LastCacheKeyColumnsArg {
/// Use the series key columns in their order as the last cache key columns
@ -133,6 +128,7 @@ impl From<Option<Vec<ColumnId>>> for LastCacheKeyColumnsArg {
}
}
/// Specifies the value column configuration for a new [`LastCache`]
#[derive(Debug, Default, Clone)]
pub enum LastCacheValueColumnsArg {
/// Use all non-key columns as value columns when initialized, and add new field columns that
@ -156,6 +152,10 @@ impl From<Option<Vec<ColumnId>>> for LastCacheValueColumnsArg {
impl LastCache {
/// Create a new [`LastCache`]
///
/// The uses the provided `TableDefinition` to build an arrow schema for the cache. This will
/// validate the given arguments and can error if there are invalid columns specified, or if a
/// non-compatible column is used as a key to the cache.
pub(crate) fn new(
CreateLastCacheArgs {
table_def,
@ -260,7 +260,6 @@ impl LastCache {
count,
ttl: ttl.into(),
key_column_ids: Arc::new(key_column_ids),
key_column_name_to_ids: Arc::new(key_column_name_to_ids),
value_columns: match value_columns {
LastCacheValueColumnsArg::AcceptNew => ValueColumnType::AcceptNew { seen },
LastCacheValueColumnsArg::Explicit(_) => ValueColumnType::Explicit {
@ -340,32 +339,36 @@ impl LastCache {
/// This will panic if the internal cache state's keys are out-of-order with respect to the
/// order of the `key_columns` on this [`LastCache`]
pub(crate) fn push(&mut self, row: &Row, table_def: Arc<TableDefinition>) {
let mut values = Vec::with_capacity(self.key_column_ids.len());
for id in self.key_column_ids.iter() {
let Some(value) = row
.fields
.iter()
.find(|f| &f.id == id)
.map(|f| KeyValue::from(&f.value))
else {
// ignore the row if it does not contain all key columns
return;
};
values.push(value);
}
let accept_new_fields = self.accept_new_fields();
let mut target = &mut self.state;
let mut key_iter = self.key_column_ids.iter().peekable();
while let (Some(col_id), peek) = (key_iter.next(), key_iter.peek()) {
let mut iter = self.key_column_ids.iter().zip(values).peekable();
while let (Some((col_id, value)), peek) = (iter.next(), iter.peek()) {
if target.is_init() {
*target = LastCacheState::Key(LastCacheKey {
column_id: *col_id,
value_map: Default::default(),
});
}
let Some(value) = row
.fields
.iter()
.find(|f| f.id == *col_id)
.map(|f| KeyValue::from(&f.value))
else {
// ignore the row if it does not contain all key columns
return;
};
let cache_key = target.as_key_mut().unwrap();
assert_eq!(
&cache_key.column_id, col_id,
"key columns must match cache key order"
);
target = cache_key.value_map.entry(value).or_insert_with(|| {
if let Some(next_col_id) = peek {
if let Some((next_col_id, _)) = peek {
LastCacheState::Key(LastCacheKey {
column_id: **next_col_id,
value_map: Default::default(),
@ -411,14 +414,14 @@ impl LastCache {
pub(crate) fn to_record_batches(
&self,
table_def: Arc<TableDefinition>,
predicates: &[Predicate],
predicates: &IndexMap<ColumnId, Predicate>,
) -> Result<Vec<RecordBatch>, ArrowError> {
// map the provided predicates on to the key columns
// there may not be predicates provided for each key column, hence the Option
let predicates: Vec<Option<&Predicate>> = self
.key_column_ids
.iter()
.map(|col_id| predicates.iter().find(|p| p.column_id == *col_id))
.map(|id| predicates.get(id))
.collect();
let mut caches = vec![ExtendedLastCacheState {
@ -465,78 +468,6 @@ impl LastCache {
.collect()
}
/// Convert a set of DataFusion filter [`Expr`]s into [`Predicate`]s
///
/// This only handles binary expressions, e.g., `foo = 'bar'`, and will use the `key_columns`
/// to filter out expressions that do not match key columns in the cache.
pub(crate) fn convert_filter_exprs(&self, exprs: &[Expr]) -> Vec<Predicate> {
exprs
.iter()
.filter_map(|expr| {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let col_id = if let Expr::Column(c) = left.as_ref() {
self.key_column_name_to_ids.get(c.name()).copied()?
} else {
return None;
};
let value = match right.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(v))) => {
KeyValue::String(v.to_owned())
}
Expr::Literal(ScalarValue::Boolean(Some(v))) => KeyValue::Bool(*v),
// TODO: handle integer types that can be casted up to i64/u64:
Expr::Literal(ScalarValue::Int64(Some(v))) => KeyValue::Int(*v),
Expr::Literal(ScalarValue::UInt64(Some(v))) => KeyValue::UInt(*v),
_ => return None,
};
match op {
Operator::Eq => Some(Predicate::new_eq(col_id, value)),
Operator::NotEq => Some(Predicate::new_not_eq(col_id, value)),
_ => None,
}
}
Expr::InList(InList {
expr,
list,
negated,
}) => {
let col_id = if let Expr::Column(c) = expr.as_ref() {
self.key_column_name_to_ids.get(c.name()).copied()?
} else {
return None;
};
let values: Vec<KeyValue> = list
.iter()
.filter_map(|e| match e {
Expr::Literal(ScalarValue::Utf8(Some(v))) => {
Some(KeyValue::String(v.to_owned()))
}
Expr::Literal(ScalarValue::Boolean(Some(v))) => {
Some(KeyValue::Bool(*v))
}
// TODO: handle integer types that can be casted up to i64/u64:
Expr::Literal(ScalarValue::Int64(Some(v))) => {
Some(KeyValue::Int(*v))
}
Expr::Literal(ScalarValue::UInt64(Some(v))) => {
Some(KeyValue::UInt(*v))
}
_ => None,
})
.collect();
if *negated {
Some(Predicate::new_not_in(col_id, values))
} else {
Some(Predicate::new_in(col_id, values))
}
}
_ => None,
}
})
.collect()
}
/// Remove expired values from the internal cache state
pub(crate) fn remove_expired(&mut self) {
self.state.remove_expired();
@ -663,50 +594,52 @@ impl ExtendedLastCacheState<'_> {
}
/// A predicate used for evaluating key column values in the cache on query
///
/// Can either be an inclusive set or exclusive set. `BTreeSet` is used to
/// have the predicate values odered and displayed in a sorted order in
/// query `EXPLAIN` plans.
#[derive(Debug, Clone)]
pub struct Predicate {
/// The left-hand-side of the predicate as a valid `ColumnId`
column_id: ColumnId,
/// The right-hand-side of the predicate
kind: PredicateKind,
pub(crate) enum Predicate {
In(BTreeSet<KeyValue>),
NotIn(BTreeSet<KeyValue>),
}
impl std::fmt::Display for Predicate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Predicate::In(_) => write!(f, "IN (")?,
Predicate::NotIn(_) => write!(f, "NOT IN (")?,
}
let mut values = self.values();
while let Some(v) = values.next() {
write!(f, "{v}")?;
if values.size_hint().0 > 0 {
write!(f, ",")?;
}
}
write!(f, ")")
}
}
impl Predicate {
pub(crate) fn new_eq(column_id: ColumnId, value: KeyValue) -> Self {
Self {
column_id,
kind: PredicateKind::Eq(value),
}
}
pub(crate) fn new_not_eq(column_id: ColumnId, value: KeyValue) -> Self {
Self {
column_id,
kind: PredicateKind::NotEq(value),
}
}
pub(crate) fn new_in(column_id: ColumnId, values: Vec<KeyValue>) -> Self {
Self {
column_id,
kind: PredicateKind::In(values),
}
}
pub(crate) fn new_not_in(column_id: ColumnId, values: Vec<KeyValue>) -> Self {
Self {
column_id,
kind: PredicateKind::NotIn(values),
fn values(&self) -> impl Iterator<Item = &KeyValue> {
match self {
Predicate::In(btree_set) => btree_set.iter(),
Predicate::NotIn(btree_set) => btree_set.iter(),
}
}
}
#[derive(Debug, Clone)]
pub(crate) enum PredicateKind {
Eq(KeyValue),
NotEq(KeyValue),
In(Vec<KeyValue>),
NotIn(Vec<KeyValue>),
#[cfg(test)]
impl Predicate {
pub(crate) fn new_in(values: impl IntoIterator<Item = KeyValue>) -> Self {
Self::In(values.into_iter().collect())
}
pub(crate) fn new_not_in(values: impl IntoIterator<Item = KeyValue>) -> Self {
Self::NotIn(values.into_iter().collect())
}
}
/// Represents the hierarchical last cache structure
@ -777,37 +710,16 @@ struct LastCacheKey {
impl LastCacheKey {
/// Evaluate the provided [`Predicate`] by using its value to lookup in this [`LastCacheKey`]'s
/// value map.
///
/// # Panics
///
/// This assumes that a predicate for this [`LastCacheKey`]'s column was provided, and will panic
/// otherwise.
fn evaluate_predicate<'a: 'b, 'b>(
&'a self,
predicate: &'b Predicate,
) -> Vec<(&'a LastCacheState, &'b KeyValue)> {
if predicate.column_id != self.column_id {
panic!(
"attempted to evaluate unexpected predicate with key {} for column with id {}",
predicate.column_id, self.column_id
);
}
match &predicate.kind {
PredicateKind::Eq(val) => self
.value_map
.get(val)
.map(|s| vec![(s, val)])
.unwrap_or_default(),
PredicateKind::NotEq(val) => self
.value_map
.iter()
.filter_map(|(v, s)| (v != val).then_some((s, v)))
.collect(),
PredicateKind::In(vals) => vals
match predicate {
Predicate::In(vals) => vals
.iter()
.filter_map(|v| self.value_map.get(v).map(|s| (s, v)))
.collect(),
PredicateKind::NotIn(vals) => self
Predicate::NotIn(vals) => self
.value_map
.iter()
.filter_map(|(v, s)| (!vals.contains(v)).then_some((s, v)))
@ -828,7 +740,7 @@ impl LastCacheKey {
}
/// A value for a key column in a [`LastCache`]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Hash, PartialOrd, Ord)]
pub(crate) enum KeyValue {
String(String),
Int(i64),
@ -836,6 +748,17 @@ pub(crate) enum KeyValue {
Bool(bool),
}
impl std::fmt::Display for KeyValue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
KeyValue::String(s) => write!(f, "'{s}'"),
KeyValue::Int(i) => write!(f, "{i}i"),
KeyValue::UInt(u) => write!(f, "{u}u"),
KeyValue::Bool(b) => write!(f, "{b}"),
}
}
}
#[cfg(test)]
impl KeyValue {
pub(crate) fn string(s: impl Into<String>) -> Self {

View File

@ -8,7 +8,7 @@ mod provider;
pub use provider::LastCacheProvider;
mod table_function;
use schema::InfluxColumnType;
pub use table_function::LastCacheFunction;
pub use table_function::{LastCacheFunction, LAST_CACHE_UDTF_NAME};
#[derive(Debug, thiserror::Error)]
pub enum Error {
@ -44,8 +44,11 @@ impl Error {
mod tests {
use std::{cmp::Ordering, sync::Arc, thread, time::Duration};
use arrow::array::AsArray;
use arrow_util::{assert_batches_eq, assert_batches_sorted_eq};
use bimap::BiHashMap;
use datafusion::prelude::SessionContext;
use indexmap::IndexMap;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition};
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{LastCacheDefinition, LastCacheSize};
@ -56,13 +59,19 @@ mod tests {
KeyValue, LastCache, LastCacheKeyColumnsArg, LastCacheValueColumnsArg, Predicate,
DEFAULT_CACHE_TTL,
},
CreateLastCacheArgs, LastCacheProvider,
CreateLastCacheArgs, LastCacheFunction, LastCacheProvider, LAST_CACHE_UDTF_NAME,
},
test_helpers::{column_ids_for_names, TestWriter},
};
use super::LastCacheTtl;
fn predicates(
preds: impl IntoIterator<Item = (ColumnId, Predicate)>,
) -> IndexMap<ColumnId, Predicate> {
preds.into_iter().collect()
}
#[test]
fn pick_up_latest_write() {
let writer = TestWriter::new();
@ -88,11 +97,11 @@ mod tests {
cache.push(row, Arc::clone(&table_def));
}
let predicates = &[Predicate::new_eq(col_id, KeyValue::string("a"))];
let predicates = predicates([(col_id, Predicate::new_in([KeyValue::string("a")]))]);
// Check what is in the last cache:
let batch = cache
.to_record_batches(Arc::clone(&table_def), predicates)
.to_record_batches(Arc::clone(&table_def), &predicates)
.unwrap();
assert_batches_eq!(
@ -112,7 +121,7 @@ mod tests {
cache.push(row, Arc::clone(&table_def));
}
let batch = cache.to_record_batches(table_def, predicates).unwrap();
let batch = cache.to_record_batches(table_def, &predicates).unwrap();
assert_batches_eq!(
[
@ -177,17 +186,17 @@ mod tests {
}
struct TestCase<'a> {
predicates: &'a [Predicate],
predicates: IndexMap<ColumnId, Predicate>,
expected: &'a [&'a str],
}
let test_cases = [
// Predicate including both key columns only produces value columns from the cache
TestCase {
predicates: &[
Predicate::new_eq(region_col_id, KeyValue::string("us")),
Predicate::new_eq(host_col_id, KeyValue::string("c")),
],
predicates: predicates([
(region_col_id, Predicate::new_in([KeyValue::string("us")])),
(host_col_id, Predicate::new_in([KeyValue::string("c")])),
]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -199,7 +208,10 @@ mod tests {
// Predicate on only region key column will have host column outputted in addition to
// the value columns:
TestCase {
predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("us"))],
predicates: predicates([(
region_col_id,
Predicate::new_in([KeyValue::string("us")]),
)]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -212,7 +224,10 @@ mod tests {
},
// Similar to previous, with a different region predicate:
TestCase {
predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("ca"))],
predicates: predicates([(
region_col_id,
Predicate::new_in([KeyValue::string("ca")]),
)]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -226,7 +241,7 @@ mod tests {
// Predicate on only host key column will have region column outputted in addition to
// the value columns:
TestCase {
predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("a"))],
predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("a")]))]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -238,7 +253,7 @@ mod tests {
// Omitting all key columns from the predicate will have all key columns included in
// the query result:
TestCase {
predicates: &[],
predicates: predicates([]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -255,10 +270,10 @@ mod tests {
// Using a non-existent key column as a predicate has no effect:
// TODO: should this be an error?
TestCase {
predicates: &[Predicate::new_eq(
predicates: predicates([(
ColumnId::new(),
KeyValue::string("12345"),
)],
Predicate::new_in([KeyValue::string("12345")]),
)]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -274,31 +289,37 @@ mod tests {
},
// Using a non existent key column value yields empty result set:
TestCase {
predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("eu"))],
predicates: predicates([(
region_col_id,
Predicate::new_in([KeyValue::string("eu")]),
)]),
expected: &["++", "++"],
},
// Using an invalid combination of key column values yields an empty result set:
TestCase {
predicates: &[
Predicate::new_eq(region_col_id, KeyValue::string("ca")),
Predicate::new_eq(host_col_id, KeyValue::string("a")),
],
predicates: predicates([
(region_col_id, Predicate::new_in([KeyValue::string("ca")])),
(host_col_id, Predicate::new_in([KeyValue::string("a")])),
]),
expected: &["++", "++"],
},
// Using a non-existent key column value (for host column) also yields empty result set:
TestCase {
predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("g"))],
predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("g")]))]),
expected: &["++", "++"],
},
// Using an incorrect type for a key column value in predicate also yields empty result
// set. TODO: should this be an error?
TestCase {
predicates: &[Predicate::new_eq(host_col_id, KeyValue::Bool(true))],
predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::Bool(true)]))]),
expected: &["++", "++"],
},
// Using a != predicate
// Using a NOT IN predicate
TestCase {
predicates: &[Predicate::new_not_eq(region_col_id, KeyValue::string("us"))],
predicates: predicates([(
region_col_id,
Predicate::new_not_in([KeyValue::string("us")]),
)]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -311,10 +332,10 @@ mod tests {
},
// Using an IN predicate:
TestCase {
predicates: &[Predicate::new_in(
predicates: predicates([(
host_col_id,
vec![KeyValue::string("a"), KeyValue::string("b")],
)],
Predicate::new_in([KeyValue::string("a"), KeyValue::string("b")]),
)]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -326,10 +347,10 @@ mod tests {
},
// Using a NOT IN predicate:
TestCase {
predicates: &[Predicate::new_not_in(
predicates: predicates([(
host_col_id,
vec![KeyValue::string("a"), KeyValue::string("b")],
)],
Predicate::new_not_in([KeyValue::string("a"), KeyValue::string("b")]),
)]),
expected: &[
"+--------+------+-----------------------------+-------+",
"| region | host | time | usage |",
@ -345,7 +366,7 @@ mod tests {
for t in test_cases {
let batches = cache
.to_record_batches(Arc::clone(&table_def), t.predicates)
.to_record_batches(Arc::clone(&table_def), &t.predicates)
.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
@ -408,16 +429,16 @@ mod tests {
}
struct TestCase<'a> {
predicates: &'a [Predicate],
predicates: IndexMap<ColumnId, Predicate>,
expected: &'a [&'a str],
}
let test_cases = [
TestCase {
predicates: &[
Predicate::new_eq(region_col_id, KeyValue::string("us")),
Predicate::new_eq(host_col_id, KeyValue::string("a")),
],
predicates: predicates([
(region_col_id, Predicate::new_in([KeyValue::string("us")])),
(host_col_id, Predicate::new_in([KeyValue::string("a")])),
]),
expected: &[
"+--------+------+--------------------------------+-------+",
"| region | host | time | usage |",
@ -430,7 +451,10 @@ mod tests {
],
},
TestCase {
predicates: &[Predicate::new_eq(region_col_id, KeyValue::string("us"))],
predicates: predicates([(
region_col_id,
Predicate::new_in([KeyValue::string("us")]),
)]),
expected: &[
"+--------+------+--------------------------------+-------+",
"| region | host | time | usage |",
@ -447,7 +471,7 @@ mod tests {
],
},
TestCase {
predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("a"))],
predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("a")]))]),
expected: &[
"+--------+------+--------------------------------+-------+",
"| region | host | time | usage |",
@ -460,7 +484,7 @@ mod tests {
],
},
TestCase {
predicates: &[Predicate::new_eq(host_col_id, KeyValue::string("b"))],
predicates: predicates([(host_col_id, Predicate::new_in([KeyValue::string("b")]))]),
expected: &[
"+--------+------+--------------------------------+-------+",
"| region | host | time | usage |",
@ -473,7 +497,7 @@ mod tests {
],
},
TestCase {
predicates: &[],
predicates: predicates([]),
expected: &[
"+--------+------+--------------------------------+-------+",
"| region | host | time | usage |",
@ -493,7 +517,7 @@ mod tests {
for t in test_cases {
let batches = cache
.to_record_batches(Arc::clone(&table_def), t.predicates)
.to_record_batches(Arc::clone(&table_def), &t.predicates)
.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
}
@ -536,15 +560,13 @@ mod tests {
}
// Check the cache for values:
let predicates = &[
Predicate::new_eq(region_col_id, KeyValue::string("us")),
Predicate::new_eq(host_col_id, KeyValue::string("a")),
];
let p = predicates([
(region_col_id, Predicate::new_in([KeyValue::string("us")])),
(host_col_id, Predicate::new_in([KeyValue::string("a")])),
]);
// Check what is in the last cache:
let batches = cache
.to_record_batches(Arc::clone(&table_def), predicates)
.unwrap();
let batches = cache.to_record_batches(Arc::clone(&table_def), &p).unwrap();
assert_batches_sorted_eq!(
[
@ -561,9 +583,7 @@ mod tests {
thread::sleep(Duration::from_millis(1000));
// Check what is in the last cache:
let batches = cache
.to_record_batches(Arc::clone(&table_def), predicates)
.unwrap();
let batches = cache.to_record_batches(Arc::clone(&table_def), &p).unwrap();
// The cache is completely empty after the TTL evicted data, so it will give back nothing:
assert_batches_sorted_eq!(
@ -583,12 +603,10 @@ mod tests {
}
// Check the cache for values:
let predicates = &[Predicate::new_eq(host_col_id, KeyValue::string("a"))];
let p = predicates([(host_col_id, Predicate::new_in([KeyValue::string("a")]))]);
// Check what is in the last cache:
let batches = cache
.to_record_batches(Arc::clone(&table_def), predicates)
.unwrap();
let batches = cache.to_record_batches(Arc::clone(&table_def), &p).unwrap();
assert_batches_sorted_eq!(
[
@ -645,14 +663,14 @@ mod tests {
}
struct TestCase<'a> {
predicates: &'a [Predicate],
predicates: IndexMap<ColumnId, Predicate>,
expected: &'a [&'a str],
}
let test_cases = [
// No predicates gives everything:
TestCase {
predicates: &[],
predicates: predicates([]),
expected: &[
"+--------------+--------+-------------+-----------+---------+-----------------------------+",
"| component_id | active | type | loc | reading | time |",
@ -668,7 +686,9 @@ mod tests {
},
// Predicates on tag key column work as expected:
TestCase {
predicates: &[Predicate::new_eq(component_id_col_id, KeyValue::string("333"))],
predicates: predicates([
(component_id_col_id, Predicate::new_in([KeyValue::string("333")]))
]),
expected: &[
"+--------------+--------+--------+------+---------+-----------------------------+",
"| component_id | active | type | loc | reading | time |",
@ -679,7 +699,9 @@ mod tests {
},
// Predicate on a non-string field key:
TestCase {
predicates: &[Predicate::new_eq(active_col_id, KeyValue::Bool(false))],
predicates: predicates([
(active_col_id, Predicate::new_in([KeyValue::Bool(false)]))
]),
expected: &[
"+--------------+--------+-------------+---------+---------+-----------------------------+",
"| component_id | active | type | loc | reading | time |",
@ -691,7 +713,9 @@ mod tests {
},
// Predicate on a string field key:
TestCase {
predicates: &[Predicate::new_eq(type_col_id, KeyValue::string("camera"))],
predicates: predicates([
(type_col_id, Predicate::new_in([KeyValue::string("camera")]))
]),
expected: &[
"+--------------+--------+--------+-----------+---------+-----------------------------+",
"| component_id | active | type | loc | reading | time |",
@ -706,7 +730,7 @@ mod tests {
for t in test_cases {
let batches = cache
.to_record_batches(Arc::clone(&table_def), t.predicates)
.to_record_batches(Arc::clone(&table_def), &t.predicates)
.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
}
@ -748,14 +772,14 @@ mod tests {
}
struct TestCase<'a> {
predicates: &'a [Predicate],
predicates: IndexMap<ColumnId, Predicate>,
expected: &'a [&'a str],
}
let test_cases = [
// No predicates yields everything in the cache
TestCase {
predicates: &[],
predicates: predicates([]),
expected: &[
"+-------+--------+-------+-------+-----------------------------+",
"| state | county | farm | speed | time |",
@ -771,7 +795,10 @@ mod tests {
},
// Predicate on state column, which is part of the series key:
TestCase {
predicates: &[Predicate::new_eq(state_col_id, KeyValue::string("ca"))],
predicates: predicates([(
state_col_id,
Predicate::new_in([KeyValue::string("ca")]),
)]),
expected: &[
"+-------+--------+-------+-------+-----------------------------+",
"| state | county | farm | speed | time |",
@ -787,7 +814,10 @@ mod tests {
},
// Predicate on county column, which is part of the series key:
TestCase {
predicates: &[Predicate::new_eq(county_col_id, KeyValue::string("napa"))],
predicates: predicates([(
county_col_id,
Predicate::new_in([KeyValue::string("napa")]),
)]),
expected: &[
"+-------+--------+-------+-------+-----------------------------+",
"| state | county | farm | speed | time |",
@ -799,7 +829,10 @@ mod tests {
},
// Predicate on farm column, which is part of the series key:
TestCase {
predicates: &[Predicate::new_eq(farm_col_id, KeyValue::string("30-01"))],
predicates: predicates([(
farm_col_id,
Predicate::new_in([KeyValue::string("30-01")]),
)]),
expected: &[
"+-------+--------+-------+-------+-----------------------------+",
"| state | county | farm | speed | time |",
@ -810,11 +843,14 @@ mod tests {
},
// Predicate on all series key columns:
TestCase {
predicates: &[
Predicate::new_eq(state_col_id, KeyValue::string("ca")),
Predicate::new_eq(county_col_id, KeyValue::string("nevada")),
Predicate::new_eq(farm_col_id, KeyValue::string("40-01")),
],
predicates: predicates([
(state_col_id, Predicate::new_in([KeyValue::string("ca")])),
(
county_col_id,
Predicate::new_in([KeyValue::string("nevada")]),
),
(farm_col_id, Predicate::new_in([KeyValue::string("40-01")])),
]),
expected: &[
"+-------+--------+-------+-------+-----------------------------+",
"| state | county | farm | speed | time |",
@ -827,7 +863,7 @@ mod tests {
for t in test_cases {
let batches = cache
.to_record_batches(Arc::clone(&table_def), t.predicates)
.to_record_batches(Arc::clone(&table_def), &t.predicates)
.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
@ -870,7 +906,7 @@ mod tests {
cache.push(row, Arc::clone(&table_def));
}
let batches = cache.to_record_batches(table_def, &[]).unwrap();
let batches = cache.to_record_batches(table_def, &predicates([])).unwrap();
assert_batches_sorted_eq!(
[
@ -925,14 +961,17 @@ mod tests {
}
struct TestCase<'a> {
predicates: &'a [Predicate],
predicates: IndexMap<ColumnId, Predicate>,
expected: &'a [&'a str],
}
let test_cases = [
// Cache that has values in the zone columns should produce them:
TestCase {
predicates: &[Predicate::new_eq(game_id_col_id, KeyValue::string("4"))],
predicates: predicates([(
game_id_col_id,
Predicate::new_in([KeyValue::string("4")]),
)]),
expected: &[
"+---------+-----------+-----------------------------+------+------+",
"| game_id | player | time | type | zone |",
@ -943,7 +982,10 @@ mod tests {
},
// Cache that does not have a zone column will produce it with nulls:
TestCase {
predicates: &[Predicate::new_eq(game_id_col_id, KeyValue::string("1"))],
predicates: predicates([(
game_id_col_id,
Predicate::new_in([KeyValue::string("1")]),
)]),
expected: &[
"+---------+-----------+-----------------------------+------+------+",
"| game_id | player | time | type | zone |",
@ -954,7 +996,7 @@ mod tests {
},
// Pulling from multiple caches will fill in with nulls:
TestCase {
predicates: &[],
predicates: predicates([]),
expected: &[
"+---------+-----------+-----------------------------+------+------+",
"| game_id | player | time | type | zone |",
@ -970,7 +1012,7 @@ mod tests {
for t in test_cases {
let batches = cache
.to_record_batches(Arc::clone(&table_def), t.predicates)
.to_record_batches(Arc::clone(&table_def), &t.predicates)
.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
@ -1028,14 +1070,14 @@ mod tests {
}
struct TestCase<'a> {
predicates: &'a [Predicate],
predicates: IndexMap<ColumnId, Predicate>,
expected: &'a [&'a str],
}
let test_cases = [
// Can query on specific key column values:
TestCase {
predicates: &[Predicate::new_eq(t1_col_id, KeyValue::string("a"))],
predicates: predicates([(t1_col_id, Predicate::new_in([KeyValue::string("a")]))]),
expected: &[
"+----+-----+-----+-----+-----+--------------------------------+",
"| t1 | f1 | f2 | f3 | f4 | time |",
@ -1045,7 +1087,7 @@ mod tests {
],
},
TestCase {
predicates: &[Predicate::new_eq(t1_col_id, KeyValue::string("b"))],
predicates: predicates([(t1_col_id, Predicate::new_in([KeyValue::string("b")]))]),
expected: &[
"+----+------+----+------+------+--------------------------------+",
"| t1 | f1 | f2 | f3 | f4 | time |",
@ -1055,7 +1097,7 @@ mod tests {
],
},
TestCase {
predicates: &[Predicate::new_eq(t1_col_id, KeyValue::string("c"))],
predicates: predicates([(t1_col_id, Predicate::new_in([KeyValue::string("c")]))]),
expected: &[
"+----+-------+-------+-------+----+--------------------------------+",
"| t1 | f1 | f2 | f3 | f4 | time |",
@ -1066,7 +1108,7 @@ mod tests {
},
// Can query accross key column values:
TestCase {
predicates: &[],
predicates: predicates([]),
expected: &[
"+----+-------+-------+-------+------+--------------------------------+",
"| t1 | f1 | f2 | f3 | f4 | time |",
@ -1081,7 +1123,7 @@ mod tests {
for t in test_cases {
let batches = cache
.to_record_batches(Arc::clone(&table_def), t.predicates)
.to_record_batches(Arc::clone(&table_def), &t.predicates)
.unwrap();
assert_batches_sorted_eq!(t.expected, &batches);
@ -1325,4 +1367,267 @@ mod tests {
});
insta::assert_json_snapshot!(caches);
}
/// This test sets up a [`LastCacheProvider`], creates a [`LastCache`] using the `region` and
/// `host` columns as keys, and then writes row data containing several unique combinations of
/// the key columns to the cache. It then sets up a DataFusion [`SessionContext`], registers
/// the [`LastCacheFunction`] as a UDTF, and runs a series of test cases to verify queries made
/// using the function.
///
/// The purpose of this is to verify that the predicate pushdown by the UDTF [`TableProvider`]
/// is working.
///
/// Each test case verifies both the `RecordBatch` output, as well as the output of the `EXPLAIN`
/// for a given query. The `EXPLAIN` contains a line for the `LastCacheExec`, which will list
/// out any predicates that were pushed down from the provided SQL query to the cache.
#[tokio::test]
async fn datafusion_udtf_predicate_conversion() {
let writer = TestWriter::new();
let _ = writer.write_lp_to_write_batch("cpu,region=us-east,host=a usage=99,temp=88", 0);
// create a last cache provider so we can use it to create our UDTF provider:
let db_schema = writer.db_schema();
let table_def = db_schema.table_definition("cpu").unwrap();
let provider = LastCacheProvider::new_from_catalog(writer.catalog()).unwrap();
provider
.create_cache(
db_schema.id,
None,
CreateLastCacheArgs {
table_def,
count: LastCacheSize::default(),
ttl: LastCacheTtl::default(),
key_columns: LastCacheKeyColumnsArg::SeriesKey,
value_columns: LastCacheValueColumnsArg::AcceptNew,
},
)
.unwrap();
// make some writes into the cache:
let write_batch = writer.write_lp_to_write_batch(
"\
cpu,region=us-east,host=a usage=77,temp=66\n\
cpu,region=us-east,host=b usage=77,temp=66\n\
cpu,region=us-west,host=c usage=77,temp=66\n\
cpu,region=us-west,host=d usage=77,temp=66\n\
cpu,region=ca-east,host=e usage=77,temp=66\n\
cpu,region=ca-cent,host=f usage=77,temp=66\n\
cpu,region=ca-west,host=g usage=77,temp=66\n\
cpu,region=ca-west,host=h usage=77,temp=66\n\
cpu,region=eu-cent,host=i usage=77,temp=66\n\
cpu,region=eu-cent,host=j usage=77,temp=66\n\
cpu,region=eu-west,host=k usage=77,temp=66\n\
cpu,region=eu-west,host=l usage=77,temp=66\n\
",
1_000,
);
let wal_contents = influxdb3_wal::create::wal_contents(
(0, 1, 0),
[influxdb3_wal::create::write_batch_op(write_batch)],
);
provider.write_wal_contents_to_cache(&wal_contents);
let ctx = SessionContext::new();
let last_cache_fn = LastCacheFunction::new(db_schema.id, Arc::clone(&provider));
ctx.register_udtf(LAST_CACHE_UDTF_NAME, Arc::new(last_cache_fn));
struct TestCase<'a> {
/// A short description of the test
_desc: &'a str,
/// A SQL expression to evaluate using the datafusion session context, should be of
/// the form:
/// ```sql
/// SELECT * FROM last_cache('cpu') ...
/// ```
sql: &'a str,
/// Expected record batch output
expected: &'a [&'a str],
/// Expected EXPLAIN output contains this.
///
/// For checking the `LastCacheExec` portion of the EXPLAIN output for the given `sql`
/// query. A "contains" is used instead of matching the whole EXPLAIN output to prevent
/// flakyness from upstream changes to other parts of the query plan.
explain_contains: &'a str,
}
let test_cases = [
TestCase {
_desc: "no predicates",
sql: "SELECT * FROM last_cache('cpu')",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| ca-cent | f | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-east | e | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-cent | i | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-cent | j | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-west | k | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-west | l | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains:
"LastCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[12]",
},
TestCase {
_desc: "eq predicate on region",
sql: "SELECT * FROM last_cache('cpu') WHERE region = 'us-east'",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains: "LastCacheExec: predicates=[[region@0 IN ('us-east')]] inner=MemoryExec: partitions=1, partition_sizes=[2]",
},
TestCase {
_desc: "not eq predicate on region",
sql: "SELECT * FROM last_cache('cpu') WHERE region != 'us-east'",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| ca-cent | f | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-east | e | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-cent | i | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-cent | j | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-west | k | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-west | l | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains: "LastCacheExec: predicates=[[region@0 NOT IN ('us-east')]] inner=MemoryExec: partitions=1, partition_sizes=[10]",
},
TestCase {
_desc: "double eq predicate on region",
sql: "SELECT * FROM last_cache('cpu') \
WHERE region = 'us-east' \
OR region = 'us-west'",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains: "LastCacheExec: predicates=[[region@0 IN ('us-east','us-west')]] inner=MemoryExec: partitions=1, partition_sizes=[4]",
},
TestCase {
_desc: "triple eq predicate on region",
sql: "SELECT * FROM last_cache('cpu') \
WHERE region = 'us-east' \
OR region = 'us-west' \
OR region = 'ca-west'",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains: "LastCacheExec: predicates=[[region@0 IN ('ca-west','us-east','us-west')]] inner=MemoryExec: partitions=1, partition_sizes=[6]",
},
TestCase {
_desc: "eq predicate on region AND eq predicate on host",
sql: "SELECT * FROM last_cache('cpu') \
WHERE (region = 'us-east' OR region = 'us-west') \
AND (host = 'a' OR host = 'c')",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains: "LastCacheExec: predicates=[[region@0 IN ('us-east','us-west')], [host@1 IN ('a','c')]] inner=MemoryExec: partitions=1, partition_sizes=[2]",
},
TestCase {
_desc: "in predicate on region",
sql: "SELECT * FROM last_cache('cpu') \
WHERE region IN ('ca-east', 'ca-west')",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| ca-east | e | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-west | g | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| ca-west | h | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains: "LastCacheExec: predicates=[[region@0 IN ('ca-east','ca-west')]] inner=MemoryExec: partitions=1, partition_sizes=[3]",
},
TestCase {
_desc: "not in predicate on region",
sql: "SELECT * FROM last_cache('cpu') \
WHERE region NOT IN ('ca-east', 'ca-west')",
expected: &[
"+---------+------+------+-----------------------------+-------+",
"| region | host | temp | time | usage |",
"+---------+------+------+-----------------------------+-------+",
"| ca-cent | f | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-cent | i | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-cent | j | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-west | k | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| eu-west | l | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | a | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-east | b | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | c | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"| us-west | d | 66.0 | 1970-01-01T00:00:00.000001Z | 77.0 |",
"+---------+------+------+-----------------------------+-------+",
],
explain_contains: "LastCacheExec: predicates=[[region@0 NOT IN ('ca-east','ca-west')]] inner=MemoryExec: partitions=1, partition_sizes=[9]",
},
];
for tc in test_cases {
// do the query:
let results = ctx.sql(tc.sql).await.unwrap().collect().await.unwrap();
println!("test case: {}", tc._desc);
// check the result:
assert_batches_sorted_eq!(tc.expected, &results);
let explain = ctx
.sql(format!("EXPLAIN {sql}", sql = tc.sql).as_str())
.await
.unwrap()
.collect()
.await
.unwrap()
.pop()
.unwrap();
assert!(
explain
.column_by_name("plan")
.unwrap()
.as_string::<i32>()
.iter()
.any(|plan| plan.is_some_and(|plan| plan.contains(tc.explain_contains))),
"explain plan did not contain the expression:\n\n\
{expected}\n\n\
instead, the output was:\n\n\
{actual:#?}",
expected = tc.explain_contains,
actual = explain.column_by_name("plan").unwrap().as_string::<i32>(),
);
}
}
}

View File

@ -9,7 +9,7 @@ use observability_deps::tracing::debug;
use parking_lot::RwLock;
use super::{
cache::{LastCache, LastCacheValueColumnsArg, Predicate},
cache::{LastCache, LastCacheValueColumnsArg},
CreateLastCacheArgs, Error,
};
@ -341,7 +341,6 @@ impl LastCacheProvider {
db_id: DbId,
table_id: TableId,
cache_name: Option<&str>,
predicates: &[Predicate],
) -> Option<Result<Vec<RecordBatch>, ArrowError>> {
let table_def = self
.catalog
@ -362,7 +361,7 @@ impl LastCacheProvider {
None
}
})
.map(|lc| lc.to_record_batches(table_def, predicates))
.map(|lc| lc.to_record_batches(table_def, &Default::default()))
}
/// Returns the total number of caches contained in the provider

View File

@ -1,28 +1,49 @@
use std::{any::Any, sync::Arc};
use arrow::datatypes::SchemaRef;
use arrow::{array::RecordBatch, datatypes::SchemaRef};
use async_trait::async_trait;
use datafusion::{
catalog::{Session, TableProvider},
common::plan_err,
common::{internal_err, plan_err, DFSchema},
datasource::{function::TableFunctionImpl, TableType},
error::DataFusionError,
execution::context::ExecutionProps,
logical_expr::TableProviderFilterPushDown,
physical_plan::{memory::MemoryExec, ExecutionPlan},
physical_expr::{
create_physical_expr,
utils::{Guarantee, LiteralGuarantee},
},
physical_plan::{memory::MemoryExec, DisplayAs, DisplayFormatType, ExecutionPlan},
prelude::Expr,
scalar::ScalarValue,
};
use indexmap::{IndexMap, IndexSet};
use influxdb3_catalog::catalog::TableDefinition;
use influxdb3_id::DbId;
use influxdb3_id::{ColumnId, DbId};
use schema::{InfluxColumnType, InfluxFieldType};
use super::LastCacheProvider;
use super::{
cache::{KeyValue, Predicate},
LastCacheProvider,
};
/// The name of the function that is called to query the last cache
pub const LAST_CACHE_UDTF_NAME: &str = "last_cache";
/// Implementor of the [`TableProvider`] trait that is produced with a call to the
/// [`LastCacheFunction`]
#[derive(Debug)]
struct LastCacheFunctionProvider {
/// The database ID that the query calling to the cache is associated with
db_id: DbId,
/// The table definition that the cache being called is associated with
table_def: Arc<TableDefinition>,
/// The name of the cache
cache_name: Arc<str>,
/// Reference to the cache's schema
schema: SchemaRef,
/// Forwarded reference of the [`LastCacheProvider`], which is used to get the `LastCache`
/// for the query using the `db_id` and `table_def`.
provider: Arc<LastCacheProvider>,
}
@ -55,19 +76,32 @@ impl TableProvider for LastCacheFunctionProvider {
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let read = self.provider.cache_map.read();
let batches = if let Some(cache) = read
let (predicates, batches) = if let Some(cache) = read
.get(&self.db_id)
.and_then(|db| db.get(&self.table_def.table_id))
.and_then(|tbl| tbl.get(&self.cache_name))
{
let predicates = cache.convert_filter_exprs(filters);
cache.to_record_batches(Arc::clone(&self.table_def), &predicates)?
let predicates = convert_filter_exprs(
self.table_def.as_ref(),
cache.key_column_ids.as_ref(),
Arc::clone(&self.schema),
filters,
)?;
let batches = cache.to_record_batches(Arc::clone(&self.table_def), &predicates)?;
((!predicates.is_empty()).then_some(predicates), batches)
} else {
// If there is no cache, it means that it was removed, in which case, we just return
// an empty set of record batches.
vec![]
(None, vec![])
};
let mut exec = MemoryExec::try_new(&[batches], self.schema(), projection.cloned())?;
drop(read);
let mut exec = LastCacheExec::try_new(
predicates,
Arc::clone(&self.table_def),
&[batches],
self.schema(),
projection.cloned(),
)?;
let show_sizes = ctx.config_options().explain.show_sizes;
exec = exec.with_show_sizes(show_sizes);
@ -76,6 +110,181 @@ impl TableProvider for LastCacheFunctionProvider {
}
}
/// Convert the given list of filter expresions `filters` to a map of [`ColumnId`] to [`Predicate`]
///
/// The resulting map is an [`IndexMap`] to ensure consistent ordering of entries in the map, which
/// makes testing the filter conversions easier via `EXPLAIN` query plans.
fn convert_filter_exprs(
table_def: &TableDefinition,
cache_key_column_ids: &IndexSet<ColumnId>,
cache_schema: SchemaRef,
filters: &[Expr],
) -> Result<IndexMap<ColumnId, Predicate>, DataFusionError> {
let mut predicate_map: IndexMap<ColumnId, Option<Predicate>> = IndexMap::new();
// used by `create_physical_expr` in the loop below:
let schema: DFSchema = cache_schema.try_into()?;
let props = ExecutionProps::new();
// The set of `filters` that are passed in from DataFusion varies: 1) based on how they are
// defined in the query, and 2) based on some decisions that DataFusion makes when parsing the
// query into the `Expr` syntax tree. For example, the predicate:
//
// WHERE foo IN ('bar', 'baz')
//
// instead of being expressed as an `InList`, would be simplified to the following `Expr` tree:
//
// [
// BinaryExpr {
// left: BinaryExpr { left: "foo", op: Eq, right: "bar" },
// op: Or,
// right: BinaryExpr { left: "foo", op: Eq, right: "baz" }
// }
// ]
//
// while the predicate:
//
// WHERE foo = 'bar' OR foo = 'baz' OR foo = 'bop' OR foo = 'bla'
//
// instead of being expressed as a tree of `BinaryExpr`s, is expressed as an `InList` with four
// entries:
//
// [
// InList { col: "foo", values: ["bar", "baz", "bop", "bla"], negated: false }
// ]
//
// Instead of handling all the combinations of `Expr`s that may be passed by the caller of
// `TableProider::scan`, we can use the cache's schema to convert each `Expr` to a `PhysicalExpr`
// and analyze it using DataFusion's `LiteralGuarantee`.
//
// This will distill the provided set of `Expr`s down to either an IN list, or a NOT IN list
// which we can convert to the `Predicate` type for the lastcache.
//
// Special handling is taken for the case where multiple literal guarantees are encountered for
// a given column. This would happen for clauses split with an AND conjunction. From the tests
// run thusfar, this happens when a query contains a WHERE clause, e.g.,
//
// WHERE a != 'foo' AND a != 'bar'
//
// or,
//
// WHERE a NOT IN ('foo', 'bar')
//
// which DataFusion simplifies to the previous clause that uses an AND binary expression.
for expr in filters {
let physical_expr = create_physical_expr(expr, &schema, &props)?;
let literal_guarantees = LiteralGuarantee::analyze(&physical_expr);
for LiteralGuarantee {
column,
guarantee,
literals,
} in literal_guarantees
{
let Some(column_def) = table_def.column_definition(column.name()) else {
return plan_err!(
"invalid column name in filter expression: {}",
column.name()
);
};
// do not handle predicates on non-key columns, let datafusion do that:
if !cache_key_column_ids.contains(&column_def.id) {
continue;
}
// convert the literal values from the query into `KeyValue`s for the last cache
// predicate, and also validate that the literal type is compatible with the column
// being predicated.
let value_set = literals
.into_iter()
.map(|literal| match (literal, column_def.data_type) {
(
ScalarValue::Boolean(Some(b)),
InfluxColumnType::Field(InfluxFieldType::Boolean),
) => Ok(KeyValue::Bool(b)),
(
ScalarValue::Int64(Some(i)),
InfluxColumnType::Field(InfluxFieldType::Integer),
) => Ok(KeyValue::Int(i)),
(
ScalarValue::UInt64(Some(u)),
InfluxColumnType::Field(InfluxFieldType::UInteger),
) => Ok(KeyValue::UInt(u)),
(
ScalarValue::Utf8(Some(s))
| ScalarValue::Utf8View(Some(s))
| ScalarValue::LargeUtf8(Some(s)),
InfluxColumnType::Tag | InfluxColumnType::Field(InfluxFieldType::String),
) => Ok(KeyValue::String(s)),
// TODO: handle Dictionary here?
(other_literal, column_data_type) => {
plan_err!(
"incompatible literal applied in predicate to column, \
column: {}, \
literal: {other_literal}, \
column type: {column_data_type}",
column.name()
)
}
})
.collect::<Result<_, DataFusionError>>()?;
let mut predicate = match guarantee {
Guarantee::In => Predicate::In(value_set),
Guarantee::NotIn => Predicate::NotIn(value_set),
};
// place the predicate into the map, handling the case for a column already encountered
predicate_map
.entry(column_def.id)
.and_modify(|e| {
if let Some(existing) = e {
match (existing, &mut predicate) {
// if we encounter a IN predicate on a column for which we already have
// a IN guarantee, we take their intersection, i.e.,
//
// a IN (1, 2) AND a IN (2, 3)
//
// becomes
//
// a IN (2)
(Predicate::In(ref mut existing_set), Predicate::In(new_set)) => {
*existing_set =
existing_set.intersection(new_set).cloned().collect();
// if the result is empty, just remove the predicate
if existing_set.is_empty() {
e.take();
}
}
// if we encounter a NOT IN predicate on a column for which we already
// have a NOT IN guarantee, we extend the two, i.e.,
//
// a NOT IN (1, 2) AND a NOT IN (3, 4)
//
// becomes
//
// a NOT IN (1, 2, 3, 4)
(Predicate::NotIn(existing_set), Predicate::NotIn(new_set)) => {
existing_set.append(new_set)
}
// for non matching predicate types, we just remove by taking the
// Option. We will let DataFusion handle the predicate at a higher
// filter level in this case...
_ => {
e.take();
}
}
}
})
.or_insert_with(|| Some(predicate));
}
}
Ok(predicate_map
.into_iter()
.filter_map(|(column_id, predicate)| predicate.map(|predicate| (column_id, predicate)))
.collect())
}
/// Implementor of the [`TableFunctionImpl`] trait, to be registered as a user-defined table
/// function in the DataFusion `SessionContext`.
#[derive(Debug)]
pub struct LastCacheFunction {
db_id: DbId,
@ -127,3 +336,117 @@ impl TableFunctionImpl for LastCacheFunction {
}))
}
}
/// Custom implementor of the [`ExecutionPlan`] trait for use by the last cache
///
/// Wraps a [`MemoryExec`] from DataFusion which it relies on for the actual implementation of the
/// [`ExecutionPlan`] trait. The additional functionality provided by this type is that it tracks
/// the predicates that are pushed down to the underlying cache during query planning/execution.
///
/// # Example
///
/// For a query that does not provide any predicates, or one that does provide predicates, but they
/// do not get pushed down, the `EXPLAIN` for said query will contain a line for the `LastCacheExec`
/// with no predicates, as well as the info emitted for the inner `MemoryExec`, e.g.,
///
/// ```text
/// LastCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[12]
/// ```
///
/// For queries that do have predicates that get pushed down, the output will include them, e.g.,
///
/// ```text
/// LastCacheExec: predicates=[[region@0 IN ('us-east','us-west')]] inner=[...]
/// ```
#[derive(Debug)]
struct LastCacheExec {
inner: MemoryExec,
table_def: Arc<TableDefinition>,
predicates: Option<IndexMap<ColumnId, Predicate>>,
}
impl LastCacheExec {
fn try_new(
predicates: Option<IndexMap<ColumnId, Predicate>>,
table_def: Arc<TableDefinition>,
partitions: &[Vec<RecordBatch>],
cache_schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self, DataFusionError> {
Ok(Self {
inner: MemoryExec::try_new(partitions, cache_schema, projection)?,
table_def,
predicates,
})
}
fn with_show_sizes(self, show_sizes: bool) -> Self {
Self {
inner: self.inner.with_show_sizes(show_sizes),
..self
}
}
}
impl DisplayAs for LastCacheExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "LastCacheExec:")?;
if let Some(predicates) = self.predicates.as_ref() {
write!(f, " predicates=[")?;
let mut p_iter = predicates.iter();
while let Some((col_id, predicate)) = p_iter.next() {
let col_name = self.table_def.column_id_to_name(col_id).unwrap_or_default();
write!(f, "[{col_name}@{col_id} {predicate}]")?;
if p_iter.size_hint().0 > 0 {
write!(f, ", ")?;
}
}
write!(f, "]")?;
}
write!(f, " inner=")?;
self.inner.fmt_as(t, f)
}
}
}
}
impl ExecutionPlan for LastCacheExec {
fn name(&self) -> &str {
"LastCacheExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.inner.properties()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.inner.children()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
// (copied from MemoryExec):
// MemoryExec has no children
if children.is_empty() {
Ok(self)
} else {
internal_err!("Children cannot be replaced in {self:?}")
}
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::TaskContext>,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
self.inner.execute(partition, context)
}
}

View File

@ -19,7 +19,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::Expr;
use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_cache::last_cache::LastCacheFunction;
use influxdb3_cache::last_cache::{LastCacheFunction, LAST_CACHE_UDTF_NAME};
use influxdb3_cache::meta_cache::{MetaCacheFunction, META_CACHE_UDTF_NAME};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_sys_events::SysEventStore;
@ -491,8 +491,6 @@ impl QueryNamespace for Database {
}
}
const LAST_CACHE_UDTF_NAME: &str = "last_cache";
impl CatalogProvider for Database {
fn as_any(&self) -> &dyn Any {
self as &dyn Any

View File

@ -974,7 +974,7 @@ mod tests {
];
let actual = wbuf
.last_cache_provider()
.get_cache_record_batches(db_id, tbl_id, None, &[])
.get_cache_record_batches(db_id, tbl_id, None)
.unwrap()
.unwrap();
assert_batches_eq!(&expected, &actual);