feat: Simplify predicates in the `InfluxRpcFrontend` before using them (#3588)

* feat: normalize + simplify RPC predicates before using them

* docs: Update predicate/src/rpc_predicate.rs

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-02-02 14:46:57 -05:00 committed by GitHub
parent 48daede8d7
commit 429d59f1b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 298 additions and 66 deletions

View File

@ -14,7 +14,7 @@ use job_registry::JobRegistry;
use metric::{Attributes, DurationCounter, Metric, U64Counter};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use predicate::predicate::Predicate;
use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta};
use query::{
provider::{ChunkPruner, ProviderBuilder},
pruning::{prune_chunks, PruningObserver},
@ -278,17 +278,6 @@ impl QueryDatabase for QueryCatalogAccess {
self.catalog.partition_addrs()
}
fn table_names(&self) -> Vec<String> {
self.catalog.table_names()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog
.table(table_name)
.ok()
.map(|table| Arc::clone(&table.schema().read()))
}
/// Return a covering set of chunks for a particular table and predicate
fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
self.chunk_access.candidate_chunks(table_name, predicate)
@ -310,6 +299,19 @@ impl QueryDatabase for QueryCatalogAccess {
}
}
impl QueryDatabaseMeta for QueryCatalogAccess {
fn table_names(&self) -> Vec<String> {
self.catalog.table_names()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog
.table(table_name)
.ok()
.map(|table| Arc::clone(&table.schema().read()))
}
}
// Datafusion catalog provider interface
impl CatalogProvider for QueryCatalogAccess {
fn as_any(&self) -> &dyn Any {

View File

@ -42,7 +42,7 @@ use parquet_catalog::{
prune::prune_history as prune_catalog_transaction_history,
};
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use predicate::predicate::Predicate;
use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta};
use query::{
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
QueryCompletedToken, QueryDatabase,
@ -1207,14 +1207,6 @@ impl QueryDatabase for Db {
self.catalog_access.partition_addrs()
}
fn table_names(&self) -> Vec<String> {
self.catalog_access.table_names()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog_access.table_schema(table_name)
}
fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
self.catalog_access.chunks(table_name, predicate)
}
@ -1232,6 +1224,16 @@ impl QueryDatabase for Db {
}
}
impl QueryDatabaseMeta for Db {
fn table_names(&self) -> Vec<String> {
self.catalog_access.table_names()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog_access.table_schema(table_name)
}
}
impl ExecutionContextProvider for Db {
fn new_query_context(self: &Arc<Self>, span_ctx: Option<SpanContext>) -> IOxExecutionContext {
self.exec

View File

@ -833,13 +833,34 @@ fn format_comparison(v: i32, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(test)]
mod tests {
use generated_types::node::Type as RPCNodeType;
use predicate::predicate::Predicate;
use std::collections::BTreeSet;
use predicate::{predicate::Predicate, rpc_predicate::QueryDatabaseMeta};
use std::{collections::BTreeSet, sync::Arc};
use super::*;
struct Tables {
table_names: Vec<String>,
}
impl Tables {
fn new(table_names: &[&str]) -> Self {
let table_names = table_names.iter().map(|s| s.to_string()).collect();
Self { table_names }
}
}
impl QueryDatabaseMeta for Tables {
fn table_names(&self) -> Vec<String> {
self.table_names.clone()
}
fn table_schema(&self, _table_name: &str) -> Option<Arc<schema::Schema>> {
None
}
}
fn table_predicate(predicate: InfluxRpcPredicate) -> Predicate {
let predicates = predicate.table_predicates(|| std::iter::once("foo".to_string()));
let predicates = predicate.table_predicates(&Tables::new(&["foo"]));
assert_eq!(predicates.len(), 1);
predicates.into_iter().next().unwrap().1
}
@ -929,14 +950,14 @@ mod tests {
.expect("successfully converting predicate")
.build();
let tables = ["foo", "bar"];
let tables = Tables::new(&["foo", "bar"]);
let table_predicates =
predicate.table_predicates(|| tables.iter().map(ToString::to_string));
let table_predicates = predicate.table_predicates(&tables);
assert_eq!(table_predicates.len(), 2);
for (expected_table, (table, predicate)) in tables.iter().zip(table_predicates) {
assert_eq!(*expected_table, &table);
for (expected_table, (table, predicate)) in tables.table_names.iter().zip(table_predicates)
{
assert_eq!(expected_table, &table);
let expected_exprs = vec![lit(table).not_eq(lit("foo"))];

View File

@ -3,10 +3,15 @@
use crate::predicate::{BinaryExpr, Predicate};
use datafusion::error::Result as DataFusionResult;
use datafusion::logical_plan::{lit, Column, Expr, ExprRewriter, Operator};
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::{
lit, Column, DFField, DFSchema, Expr, ExprRewriter, Operator, SimplifyInfo,
};
use datafusion::scalar::ScalarValue;
use datafusion_util::AsExpr;
use schema::Schema;
use std::collections::BTreeSet;
use std::sync::Arc;
/// Any column references to this name are rewritten to be
/// the actual table name by the Influx gRPC planner.
@ -74,23 +79,21 @@ impl InfluxRpcPredicate {
/// Convert to a list of [`Predicate`] to apply to specific tables
///
/// `all_table_names` yields a list of all table names in the databases and is used when
/// the storage predicate has no table restriction
///
/// Returns a list of [`Predicate`] and their associated table name
pub fn table_predicates<F, I>(&self, all_table_names: F) -> Vec<(String, Predicate)>
where
F: FnOnce() -> I,
I: IntoIterator<Item = String>,
{
pub fn table_predicates<D: QueryDatabaseMeta>(
&self,
table_info: &D,
) -> Vec<(String, Predicate)> {
let table_names = match &self.table_names {
Some(table_names) => itertools::Either::Left(table_names.iter().cloned()),
None => itertools::Either::Right(all_table_names().into_iter()),
None => itertools::Either::Right(table_info.table_names().into_iter()),
};
table_names
.map(|table| {
let predicate = normalize_predicate(&table, &self.inner);
let schema = table_info.table_schema(&table);
let predicate = normalize_predicate(&table, schema, &self.inner);
(table, predicate)
})
.collect()
@ -107,6 +110,15 @@ impl InfluxRpcPredicate {
}
}
/// Information required to normalize predicates
pub trait QueryDatabaseMeta {
/// Returns a list of table names in this DB
fn table_names(&self) -> Vec<String>;
/// Schema for a specific table if the table exists.
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>>;
}
/// Predicate that has been "specialized" / normalized for a
/// particular table. Specifically:
///
@ -137,7 +149,11 @@ impl InfluxRpcPredicate {
/// ```text
/// ("field1" > 34.2 OR "field2" > 34.2 OR "fieldn" > 34.2)
/// ```
fn normalize_predicate(table_name: &str, predicate: &Predicate) -> Predicate {
fn normalize_predicate(
table_name: &str,
schema: Option<Arc<Schema>>,
predicate: &Predicate,
) -> Predicate {
let mut predicate = predicate.clone();
let mut field_projections = BTreeSet::new();
let mut field_value_exprs = vec![];
@ -156,6 +172,14 @@ fn normalize_predicate(table_name: &str, predicate: &Predicate) -> Predicate {
// column projection set.
rewrite_field_column_references(&mut field_projections, e)
})
.map(|e| {
if let Some(schema) = &schema {
e.simplify(&SimplifyAdapter::new(schema.as_ref()))
.expect("Expression simplificiation failed")
} else {
e
}
})
.collect::<Vec<_>>();
// Store any field value (`_value`) expressions on the `Predicate`.
predicate.value_expr = field_value_exprs;
@ -166,10 +190,53 @@ fn normalize_predicate(table_name: &str, predicate: &Predicate) -> Predicate {
None => predicate.field_columns = Some(field_projections),
};
}
predicate
}
struct SimplifyAdapter {
// TODO avoid re-creating this each time....
// https://github.com/apache/arrow-datafusion/issues/1725
df_schema: DFSchema,
execution_props: ExecutionProps,
}
impl SimplifyAdapter {
fn new(schema: &Schema) -> Self {
let df_schema = DFSchema::new(
schema
.as_arrow()
.fields()
.iter()
.map(|f| DFField::from(f.clone()))
.collect(),
)
.unwrap();
Self {
df_schema,
execution_props: ExecutionProps::new(),
}
}
}
impl SimplifyInfo for SimplifyAdapter {
fn is_boolean_type(&self, expr: &Expr) -> DataFusionResult<bool> {
Ok(expr
.get_type(&self.df_schema)
.ok()
.map(|t| matches!(t, arrow::datatypes::DataType::Boolean))
.unwrap_or(false))
}
fn nullable(&self, expr: &Expr) -> DataFusionResult<bool> {
Ok(expr.nullable(&self.df_schema).ok().unwrap_or(false))
}
fn execution_props(&self) -> &ExecutionProps {
&self.execution_props
}
}
/// Rewrites all references to the [MEASUREMENT_COLUMN_NAME] column
/// with the actual table name
fn rewrite_measurement_references(table_name: &str, expr: Expr) -> Expr {

View File

@ -225,7 +225,7 @@ impl InfluxRpcPlanner {
// Mapping between table and chunks that need full plan
let mut full_plan_table_chunks = BTreeMap::new();
let table_predicates = rpc_predicate.table_predicates(|| database.table_names());
let table_predicates = rpc_predicate.table_predicates(database);
for (table_name, predicate) in &table_predicates {
// Identify which chunks can answer from its metadata and then record its table,
// and which chunks needs full plan and group them into their table
@ -332,7 +332,7 @@ impl InfluxRpcPlanner {
let mut need_full_plans = BTreeMap::new();
let mut known_columns = BTreeSet::new();
let table_predicates = rpc_predicate.table_predicates(|| database.table_names());
let table_predicates = rpc_predicate.table_predicates(database);
for (table_name, predicate) in &table_predicates {
for chunk in database.chunks(table_name, predicate) {
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
@ -462,7 +462,7 @@ impl InfluxRpcPlanner {
let mut need_full_plans = BTreeMap::new();
let mut known_values = BTreeSet::new();
let table_predicates = rpc_predicate.table_predicates(|| database.table_names());
let table_predicates = rpc_predicate.table_predicates(database);
for (table_name, predicate) in &table_predicates {
for chunk in database.chunks(table_name, predicate) {
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
@ -624,7 +624,7 @@ impl InfluxRpcPlanner {
// The executor then figures out which columns have non-null
// values and stops the plan executing once it has them
let table_predicates = rpc_predicate.table_predicates(|| database.table_names());
let table_predicates = rpc_predicate.table_predicates(database);
let mut field_list_plan = FieldListPlan::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
@ -675,7 +675,7 @@ impl InfluxRpcPlanner {
{
debug!(?rpc_predicate, "planning read_filter");
let table_predicates = rpc_predicate.table_predicates(|| database.table_names());
let table_predicates = rpc_predicate.table_predicates(database);
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
let chunks = database.chunks(table_name, predicate);
@ -731,7 +731,7 @@ impl InfluxRpcPlanner {
{
debug!(?rpc_predicate, ?agg, "planning read_group");
let table_predicates = rpc_predicate.table_predicates(|| database.table_names());
let table_predicates = rpc_predicate.table_predicates(database);
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
@ -793,7 +793,7 @@ impl InfluxRpcPlanner {
);
// group tables by chunk, pruning if possible
let table_predicates = rpc_predicate.table_predicates(|| database.table_names());
let table_predicates = rpc_predicate.table_predicates(database);
let mut ss_plans = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in &table_predicates {
let chunks = database.chunks(table_name, predicate);
@ -1835,8 +1835,14 @@ impl<'a> ExprRewriter for MissingColumnsToNull<'a> {
#[cfg(test)]
mod tests {
use datafusion::logical_plan::lit;
use predicate::predicate::PredicateBuilder;
use schema::builder::SchemaBuilder;
use crate::{
exec::Executor,
test::{TestChunk, TestDatabase},
};
use super::*;
#[test]
@ -1945,4 +1951,120 @@ mod tests {
expr, rewritten_expr, expected
);
}
#[tokio::test]
async fn test_predicate_rewrite_table_names() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| {
InfluxRpcPlanner::new()
.table_names(test_db, rpc_predicate)
.expect("creating plan");
})
.await
}
#[tokio::test]
async fn test_predicate_rewrite_tag_keys() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| {
InfluxRpcPlanner::new()
.tag_keys(test_db, rpc_predicate)
.expect("creating plan");
})
.await
}
#[tokio::test]
async fn test_predicate_rewrite_tag_values() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| {
InfluxRpcPlanner::new()
.tag_values(test_db, "foo", rpc_predicate)
.expect("creating plan");
})
.await
}
#[tokio::test]
async fn test_predicate_rewrite_field_columns() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| {
InfluxRpcPlanner::new()
.field_columns(test_db, rpc_predicate)
.expect("creating plan");
})
.await
}
#[tokio::test]
async fn test_predicate_rewrite_read_filter() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| {
InfluxRpcPlanner::new()
.read_filter(test_db, rpc_predicate)
.expect("creating plan");
})
.await
}
#[tokio::test]
async fn test_predicate_read_group() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| {
let agg = Aggregate::None;
let group_columns = &["foo"];
InfluxRpcPlanner::new()
.read_group(test_db, rpc_predicate, agg, group_columns)
.expect("creating plan");
})
.await
}
#[tokio::test]
async fn test_predicate_read_window_aggregate() {
run_test::<_, TestDatabase>(|test_db, rpc_predicate| {
let agg = Aggregate::First;
let every = WindowDuration::from_months(1, false);
let offset = WindowDuration::from_months(1, false);
InfluxRpcPlanner::new()
.read_window_aggregate(test_db, rpc_predicate, agg, every, offset)
.expect("creating plan");
})
.await
}
/// Runs func() and checks that predicates are simplified prior to sending them off
async fn run_test<F, D>(f: F)
where
F: FnOnce(&TestDatabase, InfluxRpcPredicate) + Send,
{
let chunk0 = Arc::new(
TestChunk::new("h2o")
.with_id(0)
.with_tag_column("foo")
.with_time_column(),
);
let silly_predicate = PredicateBuilder::new()
// (foo = 'bar') OR false
.add_expr(col("foo").eq(lit("bar")).or(lit(false)))
.build();
let executor = Arc::new(Executor::new(1));
let test_db = TestDatabase::new(Arc::clone(&executor));
test_db.add_chunk("my_partition_key", Arc::clone(&chunk0));
let rpc_predicate = InfluxRpcPredicate::new(None, silly_predicate);
// run the function
f(&test_db, rpc_predicate);
let actual_predicate = test_db.get_chunks_predicate();
// verify that the predicate was rewritten to foo = 'bar'
let expected_predicate = PredicateBuilder::new()
// (foo = 'bar') OR false
.add_expr(col("foo").eq(lit("bar")))
.build();
assert_eq!(
actual_predicate, expected_predicate,
"\nActual: {:?}\nExpected: {:?}",
actual_predicate, expected_predicate
);
}
}

View File

@ -16,7 +16,10 @@ use data_types::{
use datafusion::physical_plan::SendableRecordBatchStream;
use exec::stringset::StringSet;
use observability_deps::tracing::{debug, trace};
use predicate::predicate::{Predicate, PredicateMatch};
use predicate::{
predicate::{Predicate, PredicateMatch},
rpc_predicate::QueryDatabaseMeta,
};
use schema::selection::Selection;
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
@ -111,18 +114,12 @@ impl<'a> Drop for QueryCompletedToken<'a> {
///
/// Databases store data organized by partitions and each partition stores
/// data in Chunks.
pub trait QueryDatabase: Debug + Send + Sync {
pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
type Chunk: QueryChunk;
/// Return the partition keys for data in this DB
fn partition_addrs(&self) -> Vec<PartitionAddr>;
/// Returns a list of table names in this DB
fn table_names(&self) -> Vec<String>;
/// Schema for a specific table if the table exists.
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>>;
/// Returns a set of chunks within the partition with data that may match
/// the provided predicate. If possible, chunks which have no rows that can
/// possibly match the predicate may be omitted.

View File

@ -28,6 +28,7 @@ use futures::StreamExt;
use hashbrown::HashSet;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use predicate::rpc_predicate::QueryDatabaseMeta;
use schema::selection::Selection;
use schema::{
builder::SchemaBuilder, merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema,
@ -47,6 +48,9 @@ pub struct TestDatabase {
/// `column_names` to return upon next request
column_names: Arc<Mutex<Option<StringSetRef>>>,
/// The predicate passed to the most recent call to `chunks()`
chunks_predicate: Mutex<Predicate>,
}
#[derive(Snafu, Debug)]
@ -68,6 +72,7 @@ impl TestDatabase {
executor,
partitions: Default::default(),
column_names: Default::default(),
chunks_predicate: Default::default(),
}
}
@ -81,6 +86,12 @@ impl TestDatabase {
self
}
/// Add a test chunk to the database
pub fn with_chunk(self, partition_key: &str, chunk: Arc<TestChunk>) -> Self {
self.add_chunk(partition_key, chunk);
self
}
/// Get the specified chunk
pub fn get_chunk(&self, partition_key: &str, id: ChunkId) -> Option<Arc<TestChunk>> {
self.partitions
@ -89,6 +100,11 @@ impl TestDatabase {
.and_then(|p| p.get(&id).cloned())
}
/// Return the most recent predicate passed to get_chunks()
pub fn get_chunks_predicate(&self) -> Predicate {
self.chunks_predicate.lock().clone()
}
/// Set the list of column names that will be returned on a call to
/// column_names
pub fn set_column_names(&self, column_names: Vec<String>) {
@ -119,7 +135,10 @@ impl QueryDatabase for TestDatabase {
.collect()
}
fn chunks(&self, table_name: &str, _predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
// save last predicate
*self.chunks_predicate.lock() = predicate.clone();
let partitions = self.partitions.lock();
partitions
.values()
@ -133,6 +152,16 @@ impl QueryDatabase for TestDatabase {
unimplemented!("summaries not implemented TestDatabase")
}
fn record_query(
&self,
_query_type: impl Into<String>,
_query_text: impl Into<String>,
) -> QueryCompletedToken<'_> {
QueryCompletedToken::new(|| {})
}
}
impl QueryDatabaseMeta for TestDatabase {
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
let mut merger = SchemaMerger::new();
let mut found_one = false;
@ -150,14 +179,6 @@ impl QueryDatabase for TestDatabase {
found_one.then(|| Arc::new(merger.build()))
}
fn record_query(
&self,
_query_type: impl Into<String>,
_query_text: impl Into<String>,
) -> QueryCompletedToken<'_> {
QueryCompletedToken::new(|| {})
}
fn table_names(&self) -> Vec<String> {
let mut values = HashSet::new();
let partitions = self.partitions.lock();