feat: Make read_filter work for mutable buffer and read buffer (#882)

* feat: port read_filter to InfluxRPCPlanner

* fix: remove commented out vestigal test

* fix: Apply suggestions from code review

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

* fix: fmt

* fix: Update arrow_deps/src/util.rs

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2021-03-01 11:50:29 -05:00 committed by GitHub
parent 5b329996a9
commit 7d8d00781c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 764 additions and 479 deletions

View File

@ -9,6 +9,7 @@ use arrow::{
error::ArrowError,
record_batch::RecordBatch,
};
use datafusion::logical_plan::{col, Expr};
/// Returns a single column record batch of type Utf8 from the
/// contents of something that can be turned into an iterator over
@ -28,3 +29,36 @@ where
RecordBatch::try_new(schema, columns)
}
/// Traits to help creating DataFusion expressions from strings
pub trait IntoExpr {
/// Creates a DataFusion expr
fn into_expr(&self) -> Expr;
/// creates a DataFusion SortExpr
fn into_sort_expr(&self) -> Expr {
Expr::Sort {
expr: Box::new(self.into_expr()),
asc: true, // Sort ASCENDING
nulls_first: true,
}
}
}
impl IntoExpr for Arc<String> {
fn into_expr(&self) -> Expr {
col(self.as_ref())
}
}
impl IntoExpr for str {
fn into_expr(&self) -> Expr {
col(self)
}
}
impl IntoExpr for Expr {
fn into_expr(&self) -> Expr {
self.clone()
}
}

View File

@ -229,13 +229,6 @@ impl Database for MutableBufferDb {
Ok(())
}
async fn query_series(&self, predicate: Predicate) -> Result<SeriesSetPlans, Self::Error> {
let mut filter = ChunkTableFilter::new(predicate);
let mut visitor = SeriesVisitor::new();
self.accept(&mut filter, &mut visitor)?;
Ok(visitor.plans.into())
}
async fn query_groups(
&self,
predicate: Predicate,
@ -523,32 +516,6 @@ impl ChunkTableFilter {
}
}
/// Return DataFusion plans to calculate which series pass the
/// specified predicate.
struct SeriesVisitor {
plans: Vec<SeriesSetPlan>,
}
impl SeriesVisitor {
fn new() -> Self {
Self { plans: Vec::new() }
}
}
impl Visitor for SeriesVisitor {
fn pre_visit_table(
&mut self,
table: &Table,
chunk: &Chunk,
filter: &mut ChunkTableFilter,
) -> Result<()> {
self.plans
.push(table.series_set_plan(filter.chunk_predicate(), chunk)?);
Ok(())
}
}
/// Return DataFusion plans to calculate series that pass the
/// specified predicate, grouped according to grouped_columns
struct GroupsVisitor {
@ -629,24 +596,10 @@ impl Visitor for WindowGroupsVisitor {
mod tests {
use super::*;
use data_types::selection::Selection;
use query::{
exec::{
field::FieldIndexes,
seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetItem},
Executor,
},
predicate::PredicateBuilder,
Database,
};
use arrow_deps::{
arrow::array::{Array, StringArray},
datafusion::prelude::*,
};
use arrow_deps::arrow::array::{Array, StringArray};
use data_types::database_rules::Order;
use influxdb_line_protocol::{parse_lines, ParsedLine};
use test_helpers::{assert_contains, str_pair_vec_to_vec};
use tokio::sync::mpsc;
type TestError = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T = (), E = TestError> = std::result::Result<T, E>;
@ -710,216 +663,6 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_query_series() -> Result {
// This test checks that everything is wired together
// correctly. There are more detailed tests in table.rs that
// test the generated queries.
let db = MutableBufferDb::new("column_namedb");
let mut lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100", // to row 2
"h2o,state=MA,city=Boston temp=72.4 250", // to row 1
"h2o,state=CA,city=LA temp=90.0 200", // to row 0
"h2o,state=CA,city=LA temp=90.0 350", // to row 3
"o2,state=MA,city=Boston temp=50.4,reading=50 100", // to row 5
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
lp_lines.swap(0, 2);
lp_lines.swap(4, 5);
let lp_data = lp_lines.join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await;
let predicate = Predicate::default();
let plans = db
.query_series(predicate)
.await
.expect("Created query_series plan successfully");
let results = run_and_gather_results(plans).await;
assert_eq!(results.len(), 3);
let series_set0 = results[0].as_ref().expect("Correctly converted");
assert_eq!(*series_set0.table_name, "h2o");
assert_eq!(
series_set0.tags,
str_pair_vec_to_vec(&[("city", "Boston"), ("state", "MA")])
);
assert_eq!(
series_set0.field_indexes,
FieldIndexes::from_timestamp_and_value_indexes(3, &[2])
);
assert_eq!(series_set0.start_row, 0);
assert_eq!(series_set0.num_rows, 2);
let series_set1 = results[1].as_ref().expect("Correctly converted");
assert_eq!(*series_set1.table_name, "h2o");
assert_eq!(
series_set1.tags,
str_pair_vec_to_vec(&[("city", "LA"), ("state", "CA")])
);
assert_eq!(
series_set1.field_indexes,
FieldIndexes::from_timestamp_and_value_indexes(3, &[2])
);
assert_eq!(series_set1.start_row, 2);
assert_eq!(series_set1.num_rows, 2);
let series_set2 = results[2].as_ref().expect("Correctly converted");
assert_eq!(*series_set2.table_name, "o2");
assert_eq!(
series_set2.tags,
str_pair_vec_to_vec(&[("city", "Boston"), ("state", "MA")])
);
assert_eq!(
series_set2.field_indexes,
FieldIndexes::from_timestamp_and_value_indexes(4, &[2, 3])
);
assert_eq!(series_set2.start_row, 0);
assert_eq!(series_set2.num_rows, 2);
Ok(())
}
#[tokio::test]
async fn test_query_series_filter() -> Result {
// check the appropriate filters are applied in the datafusion plans
let db = MutableBufferDb::new("column_namedb");
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Boston temp=72.4 250",
"h2o,state=CA,city=LA temp=90.0 200",
"h2o,state=CA,city=LA temp=90.0 350",
"o2,state=MA,city=Boston temp=50.4,reading=50 100",
"o2,state=MA,city=Boston temp=53.4,reading=51 250",
];
let lp_data = lp_lines.join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await;
// filter out one row in h20
let predicate = PredicateBuilder::default()
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("CA"))) // state=CA
.build();
let plans = db
.query_series(predicate)
.await
.expect("Created query_series plan successfully");
let results = run_and_gather_results(plans).await;
assert_eq!(results.len(), 1);
let series_set0 = results[0].as_ref().expect("Correctly converted");
assert_eq!(*series_set0.table_name, "h2o");
assert_eq!(
series_set0.tags,
str_pair_vec_to_vec(&[("city", "LA"), ("state", "CA")])
);
assert_eq!(
series_set0.field_indexes,
FieldIndexes::from_timestamp_and_value_indexes(3, &[2])
);
assert_eq!(series_set0.start_row, 0);
assert_eq!(series_set0.num_rows, 1); // only has one row!
Ok(())
}
#[tokio::test]
async fn test_query_series_pred_refers_to_column_not_in_table() -> Result {
let db = MutableBufferDb::new("column_namedb");
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Boston temp=72.4 250",
];
let lp_data = lp_lines.join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await;
let predicate = PredicateBuilder::default()
.add_expr(col("tag_not_in_h20").eq(lit("foo")))
.build();
let plans = db
.query_series(predicate)
.await
.expect("Created query_series plan successfully");
let results = run_and_gather_results(plans).await;
assert!(results.is_empty());
// predicate with no columns,
let predicate = PredicateBuilder::default()
.add_expr(lit("foo").eq(lit("foo")))
.build();
let plans = db
.query_series(predicate)
.await
.expect("Created query_series plan successfully");
let results = run_and_gather_results(plans).await;
assert_eq!(results.len(), 1);
// predicate with both a column that does and does not appear
let predicate = PredicateBuilder::default()
.add_expr(col("state").eq(lit("MA")))
.add_expr(col("tag_not_in_h20").eq(lit("foo")))
.build();
let plans = db
.query_series(predicate)
.await
.expect("Created query_series plan successfully");
let results = run_and_gather_results(plans).await;
assert!(results.is_empty());
Ok(())
}
#[tokio::test]
async fn test_query_series_pred_neq() {
let db = MutableBufferDb::new("column_namedb");
let lp_lines = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Boston temp=72.4 250",
];
let lp_data = lp_lines.join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
write_lines(&db, &lines).await;
let predicate = PredicateBuilder::default()
.add_expr(col("state").not_eq(lit("MA")))
.build();
// Should err as the neq path isn't implemented yet
let err = db.query_series(predicate).await.unwrap_err();
assert_contains!(
err.to_string(),
"Operator NotEq not yet supported in IOx MutableBuffer"
);
}
#[tokio::test]
async fn db_size() {
let db = MutableBufferDb::new("column_namedb");
@ -963,52 +706,6 @@ mod tests {
assert_eq!(partitions[1].read().unwrap().key(), "p2");
}
/// Run the plan and gather the results in a order that can be compared
async fn run_and_gather_results(
plans: SeriesSetPlans,
) -> Vec<Result<SeriesSet, SeriesSetError>> {
// Use a channel sufficiently large to buffer the series
let (tx, mut rx) = mpsc::channel(100);
// setup to run the execution plan (
let executor = Executor::default();
executor
.to_series_set(plans, tx)
.await
.expect("Running series set plan");
// gather up the sets and compare them
let mut results = Vec::new();
while let Some(r) = rx.recv().await {
results.push(r.map(|item| {
if let SeriesSetItem::Data(series_set) = item {
series_set
}
else {
panic!("Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}", item)
}
})
);
}
// sort the results so that we can reliably compare
results.sort_by(|r1, r2| {
match (r1, r2) {
(Ok(r1), Ok(r2)) => r1
.table_name
.cmp(&r2.table_name)
.then(r1.tags.cmp(&r2.tags)),
// default sort by string representation
(r1, r2) => format!("{:?}", r1).cmp(&format!("{:?}", r2)),
}
});
// Print to stdout / test log to facilitate debugging if fails on CI
println!("The results are: {:#?}", results);
results
}
/// write lines into this database
async fn write_lines(database: &MutableBufferDb, lines: &[ParsedLine<'_>]) {
let mut writer = query::test::TestLPWriter::default();

View File

@ -39,6 +39,7 @@ use arrow_deps::{
logical_plan::{Expr, LogicalPlanBuilder},
prelude::*,
},
util::IntoExpr,
};
#[derive(Debug, Snafu)]
@ -272,25 +273,6 @@ impl Table {
}
}
/// Creates a SeriesSet plan that produces an output table with rows that
/// match the predicate
///
/// The output looks like:
/// (tag_col1, tag_col2, ... field1, field2, ... timestamp)
///
/// The order of the tag_columns is ordered by name.
///
/// The data is sorted on tag_col1, tag_col2, ...) so that all
/// rows for a particular series (groups where all tags are the
/// same) occur together in the plan
pub fn series_set_plan(
&self,
chunk_predicate: &ChunkPredicate,
chunk: &Chunk,
) -> Result<SeriesSetPlan> {
self.series_set_plan_impl(chunk_predicate, None, chunk)
}
/// Creates the plans for computing series set, ensuring that
/// prefix_columns, if any, are the prefix of the ordering.
///
@ -1016,39 +998,6 @@ fn reorder_prefix(
Ok(new_tag_columns)
}
/// Traits to help creating DataFuson expressions from strings
trait IntoExpr {
/// Creates a DataFuson expr
fn into_expr(&self) -> Expr;
/// creates a DataFusion SortExpr
fn into_sort_expr(&self) -> Expr {
Expr::Sort {
expr: Box::new(self.into_expr()),
asc: true, // Sort ASCENDING
nulls_first: true,
}
}
}
impl IntoExpr for Arc<String> {
fn into_expr(&self) -> Expr {
col(self.as_ref())
}
}
impl IntoExpr for str {
fn into_expr(&self) -> Expr {
col(self)
}
}
impl IntoExpr for Expr {
fn into_expr(&self) -> Expr {
self.clone()
}
}
struct AggExprs {
agg_exprs: Vec<Expr>,
field_columns: FieldColumns,
@ -1413,7 +1362,7 @@ mod tests {
let predicate = PredicateBuilder::default().build();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let series_set_plan = table
.series_set_plan(&chunk_predicate, &chunk)
.series_set_plan_impl(&chunk_predicate, None, &chunk)
.expect("creating the series set plan");
assert_eq!(series_set_plan.table_name.as_ref(), "table_name");
@ -1461,7 +1410,7 @@ mod tests {
let predicate = PredicateBuilder::default().build();
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let series_set_plan = table
.series_set_plan(&chunk_predicate, &chunk)
.series_set_plan_impl(&chunk_predicate, None, &chunk)
.expect("creating the series set plan");
assert_eq!(series_set_plan.table_name.as_ref(), "table_name");
@ -1514,7 +1463,7 @@ mod tests {
let chunk_predicate = chunk.compile_predicate(&predicate).unwrap();
let series_set_plan = table
.series_set_plan(&chunk_predicate, &chunk)
.series_set_plan_impl(&chunk_predicate, None, &chunk)
.expect("creating the series set plan");
assert_eq!(series_set_plan.table_name.as_ref(), "table_name");

View File

@ -12,6 +12,7 @@ use arrow_deps::{
},
prelude::col,
},
util::IntoExpr,
};
use data_types::{
schema::{InfluxColumnType, Schema},
@ -24,6 +25,7 @@ use crate::{
exec::{make_schema_pivot, stringset::StringSet},
plan::{
fieldlist::FieldListPlan,
seriesset::{SeriesSetPlan, SeriesSetPlans},
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
},
predicate::{Predicate, PredicateBuilder},
@ -492,6 +494,60 @@ impl InfluxRPCPlanner {
Ok(field_list_plan)
}
/// Returns a plan that finds all rows which pass the
/// conditions specified by `predicate` in the form of logical
/// time series.
///
/// A time series is defined by the unique values in a set of
/// "tag_columns" for each field in the "field_columns", ordered by
/// the time column.
///
/// The output looks like:
/// ```text
/// (tag_col1, tag_col2, ... field1, field2, ... timestamp)
/// ```
///
/// The tag_columns are ordered by name.
///
/// The data is sorted on (tag_col1, tag_col2, ...) so that all
/// rows for a particular series (groups where all tags are the
/// same) occur together in the plan
pub async fn read_filter<D>(&self, database: &D, predicate: Predicate) -> Result<SeriesSetPlans>
where
D: Database + 'static,
{
debug!(predicate=?predicate, "planning read_filter");
// group tables by chunk, pruning if possible
// key is table name, values are chunks
let mut table_chunks = BTreeMap::new();
for chunk in self.filtered_chunks(database, &predicate).await? {
let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?;
for table_name in table_names.into_iter() {
table_chunks
.entry(table_name)
.or_insert_with(Vec::new)
.push(Arc::clone(&chunk));
}
}
// now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks {
let ss_plan = self
.read_filter_plan(table_name, &predicate, chunks)
.await?;
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan);
}
}
Ok(ss_plans.into())
}
/// Find all the table names in the specified chunk that pass the predicate
async fn chunk_table_names<C>(
&self,
@ -647,6 +703,83 @@ impl InfluxRPCPlanner {
Ok(Some(plan))
}
/// Creates a plan for computing series sets for a given table,
/// returning None if the predicate rules out matching any rows in
/// the table
///
/// The created plan looks like:
///
/// Projection (select the columns needed)
/// Order by (tag_columns, timestamp_column)
/// Filter(predicate)
/// InMemoryScan
async fn read_filter_plan<C>(
&self,
table_name: impl Into<String>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<SeriesSetPlan>>
where
C: PartitionChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?;
let TableScanAndFilter {
plan_builder,
schema,
} = match scan_and_filter {
None => return Ok(None),
Some(t) => t,
};
let tags_and_timestamp: Vec<Expr> = schema
.tags_iter()
.chain(schema.time_iter())
.map(|field| field.name().into_sort_expr())
.collect();
// Order by
let plan_builder = plan_builder
.sort(&tags_and_timestamp)
.context(BuildingPlan)?;
// Select away anything that isn't in the influx data model
let tags_fields_and_timestamps: Vec<Expr> = schema
.tags_iter()
.chain(schema.fields_iter())
.chain(schema.time_iter())
.map(|field| field.name().into_expr())
.collect();
let plan_builder = plan_builder
.project(&tags_fields_and_timestamps)
.context(BuildingPlan)?;
let plan = plan_builder.build().context(BuildingPlan)?;
let tag_columns = schema
.tags_iter()
.map(|field| Arc::new(field.name().to_string()))
.collect();
let field_columns = schema
.fields_iter()
.map(|field| Arc::new(field.name().to_string()))
.collect();
// TODO: remove the use of tag_columns and field_column names
// and instead use the schema directly)
let ss_plan = SeriesSetPlan::new_from_shared_timestamp(
Arc::new(table_name),
plan,
tag_columns,
field_columns,
);
Ok(Some(ss_plan))
}
/// Create a plan that scans the specified table, and applies any
/// filtering specified on the predicate, if any.
///

View File

@ -55,15 +55,6 @@ pub trait Database: Debug + Send + Sync {
// The functions below are slated for removal (migration into a gRPC query
// frontend) ---------
/// Returns a plan that finds all rows rows which pass the
/// conditions specified by `predicate` in the form of logical
/// time series.
///
/// A time series is defined by the unique values in a set of
/// "tag_columns" for each field in the "field_columns", orderd by
/// the time column.
async fn query_series(&self, predicate: Predicate) -> Result<SeriesSetPlans, Self::Error>;
/// Returns a plan that finds rows which pass the conditions
/// specified by `predicate` and have been logically grouped and
/// aggregate according to `gby_agg`.
@ -153,7 +144,10 @@ pub trait PartitionChunk: Debug + Send + Sync {
) -> Result<Schema, Self::Error>;
/// Provides access to raw `PartitionChunk` data as an
/// asynchronous stream of `RecordBatch`es.
/// asynchronous stream of `RecordBatch`es filtered by a *required*
/// predicate. Note that not all chunks can evaluate all types of
/// predicates and this function will return an error
/// if requested to evaluate with a predicate that is not supported
///
/// This is the analog of the `TableProvider` in DataFusion
///

View File

@ -7,6 +7,10 @@ use crate::exec::field::FieldColumns;
/// A plan that can be run to produce a logical stream of time series,
/// as represented as sequence of SeriesSets from a single DataFusion
/// plan, optionally grouped in some way.
///
/// TODO: remove the tag/field designations below and attach a
/// `Schema` to the plan (which has the tag and field column
/// information natively)
#[derive(Debug)]
pub struct SeriesSetPlan {
/// The table name this came from

View File

@ -56,12 +56,6 @@ pub struct TestDatabase {
/// `column_names` to return upon next request
column_names: Arc<Mutex<Option<StringSetRef>>>,
/// Responses to return on the next request to `query_series`
query_series_values: Arc<Mutex<Option<SeriesSetPlans>>>,
/// The last request for `query_series`
query_series_request: Arc<Mutex<Option<QuerySeriesRequest>>>,
/// Responses to return on the next request to `query_groups`
query_groups_values: Arc<Mutex<Option<SeriesSetPlans>>>,
@ -69,13 +63,6 @@ pub struct TestDatabase {
query_groups_request: Arc<Mutex<Option<QueryGroupsRequest>>>,
}
/// Records the parameters passed to a `query_series` request
#[derive(Debug, PartialEq, Clone)]
pub struct QuerySeriesRequest {
/// Stringified '{:?}' version of the predicate
pub predicate: String,
}
/// Records the parameters passed to a `query_groups` request
#[derive(Debug, PartialEq, Clone)]
pub struct QueryGroupsRequest {
@ -159,16 +146,6 @@ impl TestDatabase {
*Arc::clone(&self.column_names).lock() = Some(column_names)
}
/// Set the series that will be returned on a call to query_series
pub fn set_query_series_values(&self, plan: SeriesSetPlans) {
*Arc::clone(&self.query_series_values).lock() = Some(plan);
}
/// Get the parameters from the last column name request
pub fn get_query_series_request(&self) -> Option<QuerySeriesRequest> {
Arc::clone(&self.query_series_request).lock().take()
}
/// Set the series that will be returned on a call to query_groups
pub fn set_query_groups_values(&self, plan: SeriesSetPlans) {
*Arc::clone(&self.query_groups_values).lock() = Some(plan);
@ -234,22 +211,6 @@ impl Database for TestDatabase {
Ok(())
}
async fn query_series(&self, predicate: Predicate) -> Result<SeriesSetPlans, Self::Error> {
let predicate = predicate_to_test_string(&predicate);
let new_queries_series_request = Some(QuerySeriesRequest { predicate });
*Arc::clone(&self.query_series_request).lock() = new_queries_series_request;
Arc::clone(&self.query_series_values)
.lock()
.take()
// Turn None into an error
.context(General {
message: "No saved query_series in TestDatabase",
})
}
async fn query_groups(
&self,
predicate: Predicate,

View File

@ -306,18 +306,6 @@ impl Database for Db {
.context(MutableBufferWrite)
}
async fn query_series(
&self,
predicate: query::predicate::Predicate,
) -> Result<query::plan::seriesset::SeriesSetPlans, Self::Error> {
self.mutable_buffer
.as_ref()
.context(DatabaseNotReadable)?
.query_series(predicate)
.await
.context(MutableBufferRead)
}
async fn query_groups(
&self,
predicate: query::predicate::Predicate,

View File

@ -4,6 +4,7 @@ use mutable_buffer::chunk::Chunk as MBChunk;
use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk};
use read_buffer::Database as ReadBufferDb;
use snafu::{ResultExt, Snafu};
use tracing::debug;
use std::sync::Arc;
@ -30,8 +31,14 @@ pub enum Error {
#[snafu(display("Internal error restricting schema: {}", source))]
InternalSelectingSchema { source: data_types::schema::Error },
#[snafu(display("Internal Predicate Conversion Error: {}", source))]
InternalPredicateConversion { source: super::pred::Error },
#[snafu(display("Predicate conversion error: {}", source))]
PredicateConversion { source: super::pred::Error },
#[snafu(display(
"Internal error: mutable buffer does not support predicate pushdown, but got: {:?}",
predicate
))]
InternalPredicateNotSupported { predicate: Predicate },
#[snafu(display("internal error creating plan: {}", source))]
InternalPlanCreation {
@ -113,9 +120,13 @@ impl PartitionChunk for DBChunk {
if chunk.is_empty() {
Some(StringSet::new())
} else {
let chunk_predicate = chunk
.compile_predicate(predicate)
.context(MutableBufferChunk)?;
let chunk_predicate = match chunk.compile_predicate(predicate) {
Ok(chunk_predicate) => chunk_predicate,
Err(e) => {
debug!(?predicate, %e, "mutable buffer predicate not supported for table_names, falling back");
return Ok(None);
}
};
// we don't support arbitrary expressions in chunk predicate yet
if !chunk_predicate.chunk_exprs.is_empty() {
@ -139,11 +150,15 @@ impl PartitionChunk for DBChunk {
} => {
let chunk_id = *chunk_id;
// TODO: figure out if this predicate was "not
// supported" or "actual error". If not supported,
// should return Ok(None)
let rb_predicate =
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
// If not supported, ReadBuffer can't answer with
// metadata only
let rb_predicate = match to_read_buffer_predicate(&predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back");
return Ok(None);
}
};
let names = db
.table_names(partition_key, &[chunk_id], rb_predicate)
@ -243,9 +258,15 @@ impl PartitionChunk for DBChunk {
) -> Result<SendableRecordBatchStream, Self::Error> {
match self {
Self::MutableBuffer { chunk } => {
// Note Mutable buffer doesn't support predicate
// Note MutableBuffer doesn't support predicate
// pushdown (other than pruning out the entire chunk
// via `might_pass_predicate)
if predicate != &Predicate::default() {
return InternalPredicateNotSupported {
predicate: predicate.clone(),
}
.fail();
}
let schema: Schema = self.table_schema(table_name, selection).await?;
Ok(Box::pin(MutableBufferChunkStream::new(
@ -260,8 +281,9 @@ impl PartitionChunk for DBChunk {
chunk_id,
} => {
let chunk_id = *chunk_id;
// Error converting to a rb_predicate needs to fail
let rb_predicate =
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
to_read_buffer_predicate(&predicate).context(PredicateConversion)?;
let chunk_ids = &[chunk_id];
@ -320,9 +342,13 @@ impl PartitionChunk for DBChunk {
) -> Result<Option<StringSet>, Self::Error> {
match self {
Self::MutableBuffer { chunk } => {
let chunk_predicate = chunk
.compile_predicate(predicate)
.context(MutableBufferChunk)?;
let chunk_predicate = match chunk.compile_predicate(predicate) {
Ok(chunk_predicate) => chunk_predicate,
Err(e) => {
debug!(?predicate, %e, "mutable buffer predicate not supported for column_names, falling back");
return Ok(None);
}
};
chunk
.column_names(table_name, &chunk_predicate, columns)
@ -334,8 +360,13 @@ impl PartitionChunk for DBChunk {
chunk_id,
} => {
let chunk_id = *chunk_id;
let rb_predicate =
to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?;
let rb_predicate = match to_read_buffer_predicate(&predicate) {
Ok(rb_predicate) => rb_predicate,
Err(e) => {
debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back");
return Ok(None);
}
};
let chunk_ids = &[chunk_id];
@ -361,9 +392,13 @@ impl PartitionChunk for DBChunk {
Self::MutableBuffer { chunk } => {
use mutable_buffer::chunk::Error::UnsupportedColumnTypeForListingValues;
let chunk_predicate = chunk
.compile_predicate(predicate)
.context(MutableBufferChunk)?;
let chunk_predicate = match chunk.compile_predicate(predicate) {
Ok(chunk_predicate) => chunk_predicate,
Err(e) => {
debug!(?predicate, %e, "mutable buffer predicate not supported for column_values, falling back");
return Ok(None);
}
};
let values = chunk.tag_column_values(table_name, column_name, &chunk_predicate);

View File

@ -1,4 +1,5 @@
pub mod field_columns;
pub mod read_filter;
pub mod table_names;
pub mod tag_keys;
pub mod tag_values;

View File

@ -0,0 +1,483 @@
//! Tests for the Influx gRPC queries
use std::sync::Arc;
use crate::query_tests::scenarios::*;
use arrow_deps::{
arrow::util::pretty::pretty_format_batches,
datafusion::logical_plan::{col, lit},
};
use async_trait::async_trait;
use query::{
exec::{
field::FieldIndexes,
seriesset::{SeriesSet, SeriesSetItem},
Executor,
},
frontend::influxrpc::InfluxRPCPlanner,
predicate::{Predicate, PredicateBuilder},
};
use tokio::sync::mpsc;
pub struct TwoMeasurementsMultiSeries {}
#[async_trait]
impl DBSetup for TwoMeasurementsMultiSeries {
async fn make(&self) -> Vec<DBScenario> {
let partition_key = "1970-01-01T00";
let mut data = vec![
"h2o,state=MA,city=Boston temp=70.4 100", // to row 2
"h2o,state=MA,city=Boston temp=72.4 250", // to row 1
"h2o,state=CA,city=LA temp=90.0 200", // to row 0
"h2o,state=CA,city=LA temp=90.0 350", // to row 3
"o2,state=MA,city=Boston temp=50.4,reading=50 100", // to row 5
"o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4
];
// Swap around data is not inserted in series order
data.swap(0, 2);
data.swap(4, 5);
make_one_chunk_scenarios(partition_key, &data.join("\n")).await
}
}
/// runs read_filter(predicate) and compares it to the expected
/// output
macro_rules! run_read_filter_test_case {
($DB_SETUP:expr, $PREDICATE:expr, $EXPECTED_RESULTS:expr) => {
test_helpers::maybe_start_logging();
let predicate = $PREDICATE;
let expected_results = $EXPECTED_RESULTS;
for scenario in $DB_SETUP.make().await {
let DBScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let planner = InfluxRPCPlanner::new();
let executor = Executor::new();
let plans = planner
.read_filter(&db, predicate.clone())
.await
.expect("built plan successfully");
// Use a channel sufficiently large to buffer the series
let (tx, mut rx) = mpsc::channel(100);
executor
.to_series_set(plans, tx)
.await
.expect("Running series set plan");
// gather up the sets and compare them
let mut results = vec![];
while let Some(r) = rx.recv().await {
let item = r.expect("unexpected error in execution");
let item = if let SeriesSetItem::Data(series_set) = item {
series_set
}
else {
panic!("Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}", item)
};
results.push(item);
}
// sort the results so that we can reliably compare
results.sort_by(|r1, r2| {
r1
.table_name
.cmp(&r2.table_name)
.then(r1.tags.cmp(&r2.tags))
});
let string_results = results
.into_iter()
.map(|s| dump_series_set(s).into_iter())
.flatten()
.collect::<Vec<_>>();
assert_eq!(
expected_results,
string_results,
"Error in scenario '{}'\n\nexpected:\n{:#?}\nactual:\n{:#?}",
scenario_name,
expected_results,
string_results
);
}
};
}
/// Format the field indexes into strings
fn dump_field_indexes(f: FieldIndexes) -> Vec<String> {
f.as_slice()
.iter()
.map(|field_index| {
format!(
" (value_index: {}, timestamp_index: {})",
field_index.value_index, field_index.timestamp_index
)
})
.collect()
}
/// Format a the vec of Arc strings paris into strings
fn dump_arc_vec(v: Vec<(Arc<String>, Arc<String>)>) -> Vec<String> {
v.into_iter()
.map(|(k, v)| format!(" ({}, {})", k, v))
.collect()
}
/// Format a series set into a format that is easy to compare in tests
fn dump_series_set(s: SeriesSet) -> Vec<String> {
let mut f = vec![];
f.push("SeriesSet".into());
f.push(format!("table_name: {}", s.table_name));
f.push("tags".to_string());
f.extend(dump_arc_vec(s.tags).into_iter());
f.push("field_indexes:".to_string());
f.extend(dump_field_indexes(s.field_indexes).into_iter());
f.push(format!("start_row: {}", s.start_row));
f.push(format!("num_rows: {}", s.num_rows));
f.push("Batches:".into());
let formatted_batch = pretty_format_batches(&[s.batch]).unwrap();
f.extend(formatted_batch.trim().split('\n').map(|s| s.to_string()));
f
}
#[tokio::test]
async fn test_read_filter_no_data_no_pred() {
let predicate = Predicate::default();
let expected_results = vec![] as Vec<&str>;
run_read_filter_test_case!(NoData {}, predicate, expected_results);
}
#[tokio::test]
async fn test_read_filter_data_no_pred() {
let predicate = Predicate::default();
let expected_results = vec![
"SeriesSet",
"table_name: h2o",
"tags",
" (city, Boston)",
" (state, MA)",
"field_indexes:",
" (value_index: 2, timestamp_index: 3)",
"start_row: 0",
"num_rows: 2",
"Batches:",
"+--------+-------+------+------+",
"| city | state | temp | time |",
"+--------+-------+------+------+",
"| Boston | MA | 70.4 | 100 |",
"| Boston | MA | 72.4 | 250 |",
"| LA | CA | 90 | 200 |",
"| LA | CA | 90 | 350 |",
"+--------+-------+------+------+",
"SeriesSet",
"table_name: h2o",
"tags",
" (city, LA)",
" (state, CA)",
"field_indexes:",
" (value_index: 2, timestamp_index: 3)",
"start_row: 2",
"num_rows: 2",
"Batches:",
"+--------+-------+------+------+",
"| city | state | temp | time |",
"+--------+-------+------+------+",
"| Boston | MA | 70.4 | 100 |",
"| Boston | MA | 72.4 | 250 |",
"| LA | CA | 90 | 200 |",
"| LA | CA | 90 | 350 |",
"+--------+-------+------+------+",
"SeriesSet",
"table_name: o2",
"tags",
" (city, Boston)",
" (state, MA)",
"field_indexes:",
" (value_index: 2, timestamp_index: 4)",
" (value_index: 3, timestamp_index: 4)",
"start_row: 0",
"num_rows: 2",
"Batches:",
"+--------+-------+---------+------+------+",
"| city | state | reading | temp | time |",
"+--------+-------+---------+------+------+",
"| Boston | MA | 50 | 50.4 | 100 |",
"| Boston | MA | 51 | 53.4 | 250 |",
"+--------+-------+---------+------+------+",
];
run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results);
}
#[tokio::test]
async fn test_read_filter_data_filter() {
// filter out one row in h20
let predicate = PredicateBuilder::default()
.timestamp_range(200, 300)
.add_expr(col("state").eq(lit("CA"))) // state=CA
.build();
let expected_results = vec![
"SeriesSet",
"table_name: h2o",
"tags",
" (city, LA)",
" (state, CA)",
"field_indexes:",
" (value_index: 2, timestamp_index: 3)",
"start_row: 0",
"num_rows: 1",
"Batches:",
"+------+-------+------+------+",
"| city | state | temp | time |",
"+------+-------+------+------+",
"| LA | CA | 90 | 200 |",
"+------+-------+------+------+",
];
run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results);
}
#[tokio::test]
async fn test_read_filter_data_pred_refers_to_non_existent_column() {
let predicate = PredicateBuilder::default()
.add_expr(col("tag_not_in_h20").eq(lit("foo")))
.build();
let expected_results = vec![] as Vec<&str>;
run_read_filter_test_case!(TwoMeasurements {}, predicate, expected_results);
}
#[tokio::test]
async fn test_read_filter_data_pred_no_columns() {
// predicate with no columns,
let predicate = PredicateBuilder::default()
.add_expr(lit("foo").eq(lit("foo")))
.build();
let expected_results = vec![
"SeriesSet",
"table_name: cpu",
"tags",
" (region, west)",
"field_indexes:",
" (value_index: 1, timestamp_index: 2)",
"start_row: 0",
"num_rows: 2",
"Batches:",
"+--------+------+------+",
"| region | user | time |",
"+--------+------+------+",
"| west | 23.2 | 100 |",
"| west | 21 | 150 |",
"+--------+------+------+",
"SeriesSet",
"table_name: disk",
"tags",
" (region, east)",
"field_indexes:",
" (value_index: 1, timestamp_index: 2)",
"start_row: 0",
"num_rows: 1",
"Batches:",
"+--------+-------+------+",
"| region | bytes | time |",
"+--------+-------+------+",
"| east | 99 | 200 |",
"+--------+-------+------+",
];
run_read_filter_test_case!(TwoMeasurements {}, predicate, expected_results);
}
#[tokio::test]
async fn test_read_filter_data_pred_refers_to_good_and_non_existent_columns() {
// predicate with both a column that does and does not appear
let predicate = PredicateBuilder::default()
.add_expr(col("state").eq(lit("MA")))
.add_expr(col("tag_not_in_h20").eq(lit("foo")))
.build();
let expected_results = vec![] as Vec<&str>;
run_read_filter_test_case!(TwoMeasurements {}, predicate, expected_results);
}
#[tokio::test]
async fn test_read_filter_data_pred_unsupported_in_scan() {
test_helpers::maybe_start_logging();
// These predicates can't be pushed down into chunks, but they can
// be evaluated by the general purpose DataFusion plan
// https://github.com/influxdata/influxdb_iox/issues/883
// (STATE = 'CA') OR (READING > 0)
let predicate = PredicateBuilder::default()
.add_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0))))
.build();
// Note these results are incorrect (they do not include data from h2o where
// state = CA)
let expected_results = vec![
"SeriesSet",
"table_name: o2",
"tags",
" (city, Boston)",
" (state, MA)",
"field_indexes:",
" (value_index: 2, timestamp_index: 4)",
" (value_index: 3, timestamp_index: 4)",
"start_row: 0",
"num_rows: 2",
"Batches:",
"+--------+-------+---------+------+------+",
"| city | state | reading | temp | time |",
"+--------+-------+---------+------+------+",
"| Boston | MA | 50 | 50.4 | 100 |",
"| Boston | MA | 51 | 53.4 | 250 |",
"+--------+-------+---------+------+------+",
];
run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results);
}
pub struct MeasurementsSortableTags {}
#[async_trait]
impl DBSetup for MeasurementsSortableTags {
async fn make(&self) -> Vec<DBScenario> {
let partition_key = "1970-01-01T00";
let data = vec![
"h2o,zz_tag=A,state=MA,city=Kingston temp=70.1 800",
"h2o,state=MA,city=Kingston,zz_tag=B temp=70.2 100",
"h2o,state=CA,city=Boston temp=70.3 250",
"h2o,state=MA,city=Boston,zz_tag=A temp=70.4 1000",
"h2o,state=MA,city=Boston temp=70.5,other=5.0 250",
];
make_one_chunk_scenarios(partition_key, &data.join("\n")).await
}
}
#[tokio::test]
async fn test_read_filter_data_plan_order() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default();
let expected_results = vec![
"SeriesSet",
"table_name: h2o",
"tags",
" (city, Boston)",
" (state, CA)",
" (zz_tag, )",
"field_indexes:",
" (value_index: 3, timestamp_index: 5)",
" (value_index: 4, timestamp_index: 5)",
"start_row: 0",
"num_rows: 1",
"Batches:",
"+----------+-------+--------+-------+------+------+",
"| city | state | zz_tag | other | temp | time |",
"+----------+-------+--------+-------+------+------+",
"| Boston | CA | | | 70.3 | 250 |",
"| Boston | MA | | 5 | 70.5 | 250 |",
"| Boston | MA | A | | 70.4 | 1000 |",
"| Kingston | MA | A | | 70.1 | 800 |",
"| Kingston | MA | B | | 70.2 | 100 |",
"+----------+-------+--------+-------+------+------+",
"SeriesSet",
"table_name: h2o",
"tags",
" (city, Boston)",
" (state, MA)",
" (zz_tag, )",
"field_indexes:",
" (value_index: 3, timestamp_index: 5)",
" (value_index: 4, timestamp_index: 5)",
"start_row: 1",
"num_rows: 1",
"Batches:",
"+----------+-------+--------+-------+------+------+",
"| city | state | zz_tag | other | temp | time |",
"+----------+-------+--------+-------+------+------+",
"| Boston | CA | | | 70.3 | 250 |",
"| Boston | MA | | 5 | 70.5 | 250 |",
"| Boston | MA | A | | 70.4 | 1000 |",
"| Kingston | MA | A | | 70.1 | 800 |",
"| Kingston | MA | B | | 70.2 | 100 |",
"+----------+-------+--------+-------+------+------+",
"SeriesSet",
"table_name: h2o",
"tags",
" (city, Boston)",
" (state, MA)",
" (zz_tag, A)",
"field_indexes:",
" (value_index: 3, timestamp_index: 5)",
" (value_index: 4, timestamp_index: 5)",
"start_row: 2",
"num_rows: 1",
"Batches:",
"+----------+-------+--------+-------+------+------+",
"| city | state | zz_tag | other | temp | time |",
"+----------+-------+--------+-------+------+------+",
"| Boston | CA | | | 70.3 | 250 |",
"| Boston | MA | | 5 | 70.5 | 250 |",
"| Boston | MA | A | | 70.4 | 1000 |",
"| Kingston | MA | A | | 70.1 | 800 |",
"| Kingston | MA | B | | 70.2 | 100 |",
"+----------+-------+--------+-------+------+------+",
"SeriesSet",
"table_name: h2o",
"tags",
" (city, Kingston)",
" (state, MA)",
" (zz_tag, A)",
"field_indexes:",
" (value_index: 3, timestamp_index: 5)",
" (value_index: 4, timestamp_index: 5)",
"start_row: 3",
"num_rows: 1",
"Batches:",
"+----------+-------+--------+-------+------+------+",
"| city | state | zz_tag | other | temp | time |",
"+----------+-------+--------+-------+------+------+",
"| Boston | CA | | | 70.3 | 250 |",
"| Boston | MA | | 5 | 70.5 | 250 |",
"| Boston | MA | A | | 70.4 | 1000 |",
"| Kingston | MA | A | | 70.1 | 800 |",
"| Kingston | MA | B | | 70.2 | 100 |",
"+----------+-------+--------+-------+------+------+",
"SeriesSet",
"table_name: h2o",
"tags",
" (city, Kingston)",
" (state, MA)",
" (zz_tag, B)",
"field_indexes:",
" (value_index: 3, timestamp_index: 5)",
" (value_index: 4, timestamp_index: 5)",
"start_row: 4",
"num_rows: 1",
"Batches:",
"+----------+-------+--------+-------+------+------+",
"| city | state | zz_tag | other | temp | time |",
"+----------+-------+--------+-------+------+------+",
"| Boston | CA | | | 70.3 | 250 |",
"| Boston | MA | | 5 | 70.5 | 250 |",
"| Boston | MA | A | | 70.4 | 1000 |",
"| Kingston | MA | A | | 70.1 | 800 |",
"| Kingston | MA | B | | 70.2 | 100 |",
"+----------+-------+--------+-------+------+------+",
];
run_read_filter_test_case!(MeasurementsSortableTags {}, predicate, expected_results);
}

View File

@ -183,7 +183,7 @@ impl DBSetup for EndToEndTest {
/// Data in single closed mutable buffer chunk, one closed mutable chunk
/// Data in both read buffer and mutable buffer chunk
/// Data in one only read buffer chunk
async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec<DBScenario> {
pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec<DBScenario> {
let db = make_db();
let mut writer = TestLPWriter::default();
writer.write_lp_string(&db, data).await.unwrap();

View File

@ -911,7 +911,7 @@ async fn read_filter_impl<'a, T>(
rpc_predicate: Option<Predicate>,
) -> Result<()>
where
T: DatabaseStore,
T: DatabaseStore + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -935,8 +935,10 @@ where
let executor = db_store.executor();
let series_plan = db
.query_series(predicate)
let planner = InfluxRPCPlanner::new();
let series_plan = planner
.read_filter(db.as_ref(), predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(PlanningFilteringSeries { db_name })?;
@ -1112,8 +1114,8 @@ mod tests {
group_by::{Aggregate as QueryAggregate, WindowDuration as QueryWindowDuration},
plan::seriesset::SeriesSetPlans,
test::QueryGroupsRequest,
test::TestChunk,
test::TestDatabaseStore,
test::{QuerySeriesRequest, TestChunk},
};
use std::{
convert::TryFrom,
@ -1840,18 +1842,25 @@ mod tests {
}
#[tokio::test]
async fn test_read_filter() -> Result<(), tonic::Status> {
async fn test_read_filter() {
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
let test_db = fixture
// Add a chunk with a field
let chunk = TestChunk::new(0)
.with_time_column("TheMeasurement")
.with_tag_column("TheMeasurement", "state")
.with_one_row_of_null_data("TheMeasurement");
fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.expect("creating test database");
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
@ -1861,31 +1870,43 @@ mod tests {
let request = ReadFilterRequest {
read_source: source.clone(),
range: make_timestamp_range(150, 200),
range: make_timestamp_range(0, 10000),
predicate: make_state_ma_predicate(),
};
let expected_request = QuerySeriesRequest {
predicate: "Predicate { exprs: [#state Eq Utf8(\"MA\")] range: TimestampRange { start: 150, end: 200 }}".into()
};
let actual_frames = fixture.storage_client.read_filter(request).await.unwrap();
let dummy_series_set_plan = SeriesSetPlans::from(vec![]);
test_db.set_query_series_values(dummy_series_set_plan);
let actual_frames = fixture.storage_client.read_filter(request).await?;
// TODO: encode this in the test case or something
// TODO: encode the actual output in the test case or something
let expected_frames: Vec<String> = vec!["0 frames".into()];
assert_eq!(
actual_frames, expected_frames,
"unexpected frames returned by query_series",
);
assert_eq!(
test_db.get_query_series_request(),
Some(expected_request),
"unexpected request to query_series",
);
}
#[tokio::test]
async fn test_read_filter_error() {
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
let db_info = OrgAndBucket::new(123, 456);
let partition_id = 1;
let chunk = TestChunk::new(0).with_error("Sugar we are going down");
fixture
.test_storage
.db_or_create(&db_info.db_name)
.await
.unwrap()
.add_chunk("my_partition_key", Arc::new(chunk));
let source = Some(StorageClientWrapper::read_source(
db_info.org_id,
db_info.bucket_id,
partition_id,
));
// ---
// test error
@ -1898,22 +1919,7 @@ mod tests {
// Note we don't set the response on the test database, so we expect an error
let response = fixture.storage_client.read_filter(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
let expected_error = "No saved query_series in TestDatabase";
assert!(
response_string.contains(expected_error),
"'{}' did not contain expected content '{}'",
response_string,
expected_error
);
let expected_request = Some(QuerySeriesRequest {
predicate: "Predicate {}".into(),
});
assert_eq!(test_db.get_query_series_request(), expected_request);
Ok(())
assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down");
}
#[tokio::test]