From 7d8d00781cb39a0d3df077c8003409325af98cc2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Mar 2021 11:50:29 -0500 Subject: [PATCH] feat: Make read_filter work for mutable buffer and read buffer (#882) * feat: port read_filter to InfluxRPCPlanner * fix: remove commented out vestigal test * fix: Apply suggestions from code review Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> * fix: fmt * fix: Update arrow_deps/src/util.rs Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- arrow_deps/src/util.rs | 34 ++ mutable_buffer/src/database.rs | 305 +---------- mutable_buffer/src/table.rs | 59 +-- query/src/frontend/influxrpc.rs | 133 +++++ query/src/lib.rs | 14 +- query/src/plan/seriesset.rs | 4 + query/src/test.rs | 39 -- server/src/db.rs | 12 - server/src/db/chunk.rs | 75 ++- server/src/query_tests/influxrpc.rs | 1 + .../src/query_tests/influxrpc/read_filter.rs | 483 ++++++++++++++++++ server/src/query_tests/scenarios.rs | 2 +- src/influxdb_ioxd/rpc/storage/service.rs | 82 +-- 13 files changed, 764 insertions(+), 479 deletions(-) create mode 100644 server/src/query_tests/influxrpc/read_filter.rs diff --git a/arrow_deps/src/util.rs b/arrow_deps/src/util.rs index 28983f33c9..980c94352c 100644 --- a/arrow_deps/src/util.rs +++ b/arrow_deps/src/util.rs @@ -9,6 +9,7 @@ use arrow::{ error::ArrowError, record_batch::RecordBatch, }; +use datafusion::logical_plan::{col, Expr}; /// Returns a single column record batch of type Utf8 from the /// contents of something that can be turned into an iterator over @@ -28,3 +29,36 @@ where RecordBatch::try_new(schema, columns) } + +/// Traits to help creating DataFusion expressions from strings +pub trait IntoExpr { + /// Creates a DataFusion expr + fn into_expr(&self) -> Expr; + + /// creates a DataFusion SortExpr + fn into_sort_expr(&self) -> Expr { + Expr::Sort { + expr: Box::new(self.into_expr()), + asc: true, // Sort ASCENDING + nulls_first: true, + } + } +} + +impl IntoExpr for Arc { + fn into_expr(&self) -> Expr { + col(self.as_ref()) + } +} + +impl IntoExpr for str { + fn into_expr(&self) -> Expr { + col(self) + } +} + +impl IntoExpr for Expr { + fn into_expr(&self) -> Expr { + self.clone() + } +} diff --git a/mutable_buffer/src/database.rs b/mutable_buffer/src/database.rs index 31994698f6..c0b23c1aec 100644 --- a/mutable_buffer/src/database.rs +++ b/mutable_buffer/src/database.rs @@ -229,13 +229,6 @@ impl Database for MutableBufferDb { Ok(()) } - async fn query_series(&self, predicate: Predicate) -> Result { - let mut filter = ChunkTableFilter::new(predicate); - let mut visitor = SeriesVisitor::new(); - self.accept(&mut filter, &mut visitor)?; - Ok(visitor.plans.into()) - } - async fn query_groups( &self, predicate: Predicate, @@ -523,32 +516,6 @@ impl ChunkTableFilter { } } -/// Return DataFusion plans to calculate which series pass the -/// specified predicate. -struct SeriesVisitor { - plans: Vec, -} - -impl SeriesVisitor { - fn new() -> Self { - Self { plans: Vec::new() } - } -} - -impl Visitor for SeriesVisitor { - fn pre_visit_table( - &mut self, - table: &Table, - chunk: &Chunk, - filter: &mut ChunkTableFilter, - ) -> Result<()> { - self.plans - .push(table.series_set_plan(filter.chunk_predicate(), chunk)?); - - Ok(()) - } -} - /// Return DataFusion plans to calculate series that pass the /// specified predicate, grouped according to grouped_columns struct GroupsVisitor { @@ -629,24 +596,10 @@ impl Visitor for WindowGroupsVisitor { mod tests { use super::*; use data_types::selection::Selection; - use query::{ - exec::{ - field::FieldIndexes, - seriesset::{Error as SeriesSetError, SeriesSet, SeriesSetItem}, - Executor, - }, - predicate::PredicateBuilder, - Database, - }; - use arrow_deps::{ - arrow::array::{Array, StringArray}, - datafusion::prelude::*, - }; + use arrow_deps::arrow::array::{Array, StringArray}; use data_types::database_rules::Order; use influxdb_line_protocol::{parse_lines, ParsedLine}; - use test_helpers::{assert_contains, str_pair_vec_to_vec}; - use tokio::sync::mpsc; type TestError = Box; type Result = std::result::Result; @@ -710,216 +663,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_query_series() -> Result { - // This test checks that everything is wired together - // correctly. There are more detailed tests in table.rs that - // test the generated queries. - let db = MutableBufferDb::new("column_namedb"); - - let mut lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 100", // to row 2 - "h2o,state=MA,city=Boston temp=72.4 250", // to row 1 - "h2o,state=CA,city=LA temp=90.0 200", // to row 0 - "h2o,state=CA,city=LA temp=90.0 350", // to row 3 - "o2,state=MA,city=Boston temp=50.4,reading=50 100", // to row 5 - "o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4 - ]; - - // Swap around data is not inserted in series order - lp_lines.swap(0, 2); - lp_lines.swap(4, 5); - - let lp_data = lp_lines.join("\n"); - - let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - write_lines(&db, &lines).await; - - let predicate = Predicate::default(); - - let plans = db - .query_series(predicate) - .await - .expect("Created query_series plan successfully"); - - let results = run_and_gather_results(plans).await; - - assert_eq!(results.len(), 3); - - let series_set0 = results[0].as_ref().expect("Correctly converted"); - assert_eq!(*series_set0.table_name, "h2o"); - assert_eq!( - series_set0.tags, - str_pair_vec_to_vec(&[("city", "Boston"), ("state", "MA")]) - ); - assert_eq!( - series_set0.field_indexes, - FieldIndexes::from_timestamp_and_value_indexes(3, &[2]) - ); - assert_eq!(series_set0.start_row, 0); - assert_eq!(series_set0.num_rows, 2); - - let series_set1 = results[1].as_ref().expect("Correctly converted"); - assert_eq!(*series_set1.table_name, "h2o"); - assert_eq!( - series_set1.tags, - str_pair_vec_to_vec(&[("city", "LA"), ("state", "CA")]) - ); - assert_eq!( - series_set1.field_indexes, - FieldIndexes::from_timestamp_and_value_indexes(3, &[2]) - ); - assert_eq!(series_set1.start_row, 2); - assert_eq!(series_set1.num_rows, 2); - - let series_set2 = results[2].as_ref().expect("Correctly converted"); - assert_eq!(*series_set2.table_name, "o2"); - assert_eq!( - series_set2.tags, - str_pair_vec_to_vec(&[("city", "Boston"), ("state", "MA")]) - ); - assert_eq!( - series_set2.field_indexes, - FieldIndexes::from_timestamp_and_value_indexes(4, &[2, 3]) - ); - assert_eq!(series_set2.start_row, 0); - assert_eq!(series_set2.num_rows, 2); - - Ok(()) - } - - #[tokio::test] - async fn test_query_series_filter() -> Result { - // check the appropriate filters are applied in the datafusion plans - let db = MutableBufferDb::new("column_namedb"); - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 100", - "h2o,state=MA,city=Boston temp=72.4 250", - "h2o,state=CA,city=LA temp=90.0 200", - "h2o,state=CA,city=LA temp=90.0 350", - "o2,state=MA,city=Boston temp=50.4,reading=50 100", - "o2,state=MA,city=Boston temp=53.4,reading=51 250", - ]; - - let lp_data = lp_lines.join("\n"); - - let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - write_lines(&db, &lines).await; - - // filter out one row in h20 - let predicate = PredicateBuilder::default() - .timestamp_range(200, 300) - .add_expr(col("state").eq(lit("CA"))) // state=CA - .build(); - - let plans = db - .query_series(predicate) - .await - .expect("Created query_series plan successfully"); - - let results = run_and_gather_results(plans).await; - - assert_eq!(results.len(), 1); - - let series_set0 = results[0].as_ref().expect("Correctly converted"); - assert_eq!(*series_set0.table_name, "h2o"); - assert_eq!( - series_set0.tags, - str_pair_vec_to_vec(&[("city", "LA"), ("state", "CA")]) - ); - assert_eq!( - series_set0.field_indexes, - FieldIndexes::from_timestamp_and_value_indexes(3, &[2]) - ); - assert_eq!(series_set0.start_row, 0); - assert_eq!(series_set0.num_rows, 1); // only has one row! - - Ok(()) - } - - #[tokio::test] - async fn test_query_series_pred_refers_to_column_not_in_table() -> Result { - let db = MutableBufferDb::new("column_namedb"); - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 100", - "h2o,state=MA,city=Boston temp=72.4 250", - ]; - - let lp_data = lp_lines.join("\n"); - - let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - write_lines(&db, &lines).await; - - let predicate = PredicateBuilder::default() - .add_expr(col("tag_not_in_h20").eq(lit("foo"))) - .build(); - - let plans = db - .query_series(predicate) - .await - .expect("Created query_series plan successfully"); - - let results = run_and_gather_results(plans).await; - assert!(results.is_empty()); - - // predicate with no columns, - let predicate = PredicateBuilder::default() - .add_expr(lit("foo").eq(lit("foo"))) - .build(); - - let plans = db - .query_series(predicate) - .await - .expect("Created query_series plan successfully"); - - let results = run_and_gather_results(plans).await; - assert_eq!(results.len(), 1); - - // predicate with both a column that does and does not appear - let predicate = PredicateBuilder::default() - .add_expr(col("state").eq(lit("MA"))) - .add_expr(col("tag_not_in_h20").eq(lit("foo"))) - .build(); - - let plans = db - .query_series(predicate) - .await - .expect("Created query_series plan successfully"); - - let results = run_and_gather_results(plans).await; - assert!(results.is_empty()); - - Ok(()) - } - - #[tokio::test] - async fn test_query_series_pred_neq() { - let db = MutableBufferDb::new("column_namedb"); - - let lp_lines = vec![ - "h2o,state=MA,city=Boston temp=70.4 100", - "h2o,state=MA,city=Boston temp=72.4 250", - ]; - - let lp_data = lp_lines.join("\n"); - - let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect(); - write_lines(&db, &lines).await; - - let predicate = PredicateBuilder::default() - .add_expr(col("state").not_eq(lit("MA"))) - .build(); - - // Should err as the neq path isn't implemented yet - let err = db.query_series(predicate).await.unwrap_err(); - assert_contains!( - err.to_string(), - "Operator NotEq not yet supported in IOx MutableBuffer" - ); - } - #[tokio::test] async fn db_size() { let db = MutableBufferDb::new("column_namedb"); @@ -963,52 +706,6 @@ mod tests { assert_eq!(partitions[1].read().unwrap().key(), "p2"); } - /// Run the plan and gather the results in a order that can be compared - async fn run_and_gather_results( - plans: SeriesSetPlans, - ) -> Vec> { - // Use a channel sufficiently large to buffer the series - let (tx, mut rx) = mpsc::channel(100); - - // setup to run the execution plan ( - let executor = Executor::default(); - executor - .to_series_set(plans, tx) - .await - .expect("Running series set plan"); - - // gather up the sets and compare them - let mut results = Vec::new(); - while let Some(r) = rx.recv().await { - results.push(r.map(|item| { - if let SeriesSetItem::Data(series_set) = item { - series_set - } - else { - panic!("Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}", item) - } - }) - ); - } - - // sort the results so that we can reliably compare - results.sort_by(|r1, r2| { - match (r1, r2) { - (Ok(r1), Ok(r2)) => r1 - .table_name - .cmp(&r2.table_name) - .then(r1.tags.cmp(&r2.tags)), - // default sort by string representation - (r1, r2) => format!("{:?}", r1).cmp(&format!("{:?}", r2)), - } - }); - - // Print to stdout / test log to facilitate debugging if fails on CI - println!("The results are: {:#?}", results); - - results - } - /// write lines into this database async fn write_lines(database: &MutableBufferDb, lines: &[ParsedLine<'_>]) { let mut writer = query::test::TestLPWriter::default(); diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index 968ca7e48a..579f0cd38e 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -39,6 +39,7 @@ use arrow_deps::{ logical_plan::{Expr, LogicalPlanBuilder}, prelude::*, }, + util::IntoExpr, }; #[derive(Debug, Snafu)] @@ -272,25 +273,6 @@ impl Table { } } - /// Creates a SeriesSet plan that produces an output table with rows that - /// match the predicate - /// - /// The output looks like: - /// (tag_col1, tag_col2, ... field1, field2, ... timestamp) - /// - /// The order of the tag_columns is ordered by name. - /// - /// The data is sorted on tag_col1, tag_col2, ...) so that all - /// rows for a particular series (groups where all tags are the - /// same) occur together in the plan - pub fn series_set_plan( - &self, - chunk_predicate: &ChunkPredicate, - chunk: &Chunk, - ) -> Result { - self.series_set_plan_impl(chunk_predicate, None, chunk) - } - /// Creates the plans for computing series set, ensuring that /// prefix_columns, if any, are the prefix of the ordering. /// @@ -1016,39 +998,6 @@ fn reorder_prefix( Ok(new_tag_columns) } -/// Traits to help creating DataFuson expressions from strings -trait IntoExpr { - /// Creates a DataFuson expr - fn into_expr(&self) -> Expr; - - /// creates a DataFusion SortExpr - fn into_sort_expr(&self) -> Expr { - Expr::Sort { - expr: Box::new(self.into_expr()), - asc: true, // Sort ASCENDING - nulls_first: true, - } - } -} - -impl IntoExpr for Arc { - fn into_expr(&self) -> Expr { - col(self.as_ref()) - } -} - -impl IntoExpr for str { - fn into_expr(&self) -> Expr { - col(self) - } -} - -impl IntoExpr for Expr { - fn into_expr(&self) -> Expr { - self.clone() - } -} - struct AggExprs { agg_exprs: Vec, field_columns: FieldColumns, @@ -1413,7 +1362,7 @@ mod tests { let predicate = PredicateBuilder::default().build(); let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let series_set_plan = table - .series_set_plan(&chunk_predicate, &chunk) + .series_set_plan_impl(&chunk_predicate, None, &chunk) .expect("creating the series set plan"); assert_eq!(series_set_plan.table_name.as_ref(), "table_name"); @@ -1461,7 +1410,7 @@ mod tests { let predicate = PredicateBuilder::default().build(); let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let series_set_plan = table - .series_set_plan(&chunk_predicate, &chunk) + .series_set_plan_impl(&chunk_predicate, None, &chunk) .expect("creating the series set plan"); assert_eq!(series_set_plan.table_name.as_ref(), "table_name"); @@ -1514,7 +1463,7 @@ mod tests { let chunk_predicate = chunk.compile_predicate(&predicate).unwrap(); let series_set_plan = table - .series_set_plan(&chunk_predicate, &chunk) + .series_set_plan_impl(&chunk_predicate, None, &chunk) .expect("creating the series set plan"); assert_eq!(series_set_plan.table_name.as_ref(), "table_name"); diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 54b82178c7..4ad3607005 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -12,6 +12,7 @@ use arrow_deps::{ }, prelude::col, }, + util::IntoExpr, }; use data_types::{ schema::{InfluxColumnType, Schema}, @@ -24,6 +25,7 @@ use crate::{ exec::{make_schema_pivot, stringset::StringSet}, plan::{ fieldlist::FieldListPlan, + seriesset::{SeriesSetPlan, SeriesSetPlans}, stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, }, predicate::{Predicate, PredicateBuilder}, @@ -492,6 +494,60 @@ impl InfluxRPCPlanner { Ok(field_list_plan) } + /// Returns a plan that finds all rows which pass the + /// conditions specified by `predicate` in the form of logical + /// time series. + /// + /// A time series is defined by the unique values in a set of + /// "tag_columns" for each field in the "field_columns", ordered by + /// the time column. + /// + /// The output looks like: + /// ```text + /// (tag_col1, tag_col2, ... field1, field2, ... timestamp) + /// ``` + /// + /// The tag_columns are ordered by name. + /// + /// The data is sorted on (tag_col1, tag_col2, ...) so that all + /// rows for a particular series (groups where all tags are the + /// same) occur together in the plan + + pub async fn read_filter(&self, database: &D, predicate: Predicate) -> Result + where + D: Database + 'static, + { + debug!(predicate=?predicate, "planning read_filter"); + + // group tables by chunk, pruning if possible + // key is table name, values are chunks + let mut table_chunks = BTreeMap::new(); + + for chunk in self.filtered_chunks(database, &predicate).await? { + let table_names = self.chunk_table_names(chunk.as_ref(), &predicate).await?; + for table_name in table_names.into_iter() { + table_chunks + .entry(table_name) + .or_insert_with(Vec::new) + .push(Arc::clone(&chunk)); + } + } + + // now, build up plans for each table + let mut ss_plans = Vec::with_capacity(table_chunks.len()); + for (table_name, chunks) in table_chunks { + let ss_plan = self + .read_filter_plan(table_name, &predicate, chunks) + .await?; + // If we have to do real work, add it to the list of plans + if let Some(ss_plan) = ss_plan { + ss_plans.push(ss_plan); + } + } + + Ok(ss_plans.into()) + } + /// Find all the table names in the specified chunk that pass the predicate async fn chunk_table_names( &self, @@ -647,6 +703,83 @@ impl InfluxRPCPlanner { Ok(Some(plan)) } + /// Creates a plan for computing series sets for a given table, + /// returning None if the predicate rules out matching any rows in + /// the table + /// + /// The created plan looks like: + /// + /// Projection (select the columns needed) + /// Order by (tag_columns, timestamp_column) + /// Filter(predicate) + /// InMemoryScan + async fn read_filter_plan( + &self, + table_name: impl Into, + predicate: &Predicate, + chunks: Vec>, + ) -> Result> + where + C: PartitionChunk + 'static, + { + let table_name = table_name.into(); + let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks).await?; + + let TableScanAndFilter { + plan_builder, + schema, + } = match scan_and_filter { + None => return Ok(None), + Some(t) => t, + }; + + let tags_and_timestamp: Vec = schema + .tags_iter() + .chain(schema.time_iter()) + .map(|field| field.name().into_sort_expr()) + .collect(); + + // Order by + let plan_builder = plan_builder + .sort(&tags_and_timestamp) + .context(BuildingPlan)?; + + // Select away anything that isn't in the influx data model + let tags_fields_and_timestamps: Vec = schema + .tags_iter() + .chain(schema.fields_iter()) + .chain(schema.time_iter()) + .map(|field| field.name().into_expr()) + .collect(); + + let plan_builder = plan_builder + .project(&tags_fields_and_timestamps) + .context(BuildingPlan)?; + + let plan = plan_builder.build().context(BuildingPlan)?; + + let tag_columns = schema + .tags_iter() + .map(|field| Arc::new(field.name().to_string())) + .collect(); + + let field_columns = schema + .fields_iter() + .map(|field| Arc::new(field.name().to_string())) + .collect(); + + // TODO: remove the use of tag_columns and field_column names + // and instead use the schema directly) + let ss_plan = SeriesSetPlan::new_from_shared_timestamp( + Arc::new(table_name), + plan, + tag_columns, + field_columns, + ); + + Ok(Some(ss_plan)) + } + /// Create a plan that scans the specified table, and applies any /// filtering specified on the predicate, if any. /// diff --git a/query/src/lib.rs b/query/src/lib.rs index bbd879e246..b3c6f08d16 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -55,15 +55,6 @@ pub trait Database: Debug + Send + Sync { // The functions below are slated for removal (migration into a gRPC query // frontend) --------- - /// Returns a plan that finds all rows rows which pass the - /// conditions specified by `predicate` in the form of logical - /// time series. - /// - /// A time series is defined by the unique values in a set of - /// "tag_columns" for each field in the "field_columns", orderd by - /// the time column. - async fn query_series(&self, predicate: Predicate) -> Result; - /// Returns a plan that finds rows which pass the conditions /// specified by `predicate` and have been logically grouped and /// aggregate according to `gby_agg`. @@ -153,7 +144,10 @@ pub trait PartitionChunk: Debug + Send + Sync { ) -> Result; /// Provides access to raw `PartitionChunk` data as an - /// asynchronous stream of `RecordBatch`es. + /// asynchronous stream of `RecordBatch`es filtered by a *required* + /// predicate. Note that not all chunks can evaluate all types of + /// predicates and this function will return an error + /// if requested to evaluate with a predicate that is not supported /// /// This is the analog of the `TableProvider` in DataFusion /// diff --git a/query/src/plan/seriesset.rs b/query/src/plan/seriesset.rs index 325a87fd36..0449d0165d 100644 --- a/query/src/plan/seriesset.rs +++ b/query/src/plan/seriesset.rs @@ -7,6 +7,10 @@ use crate::exec::field::FieldColumns; /// A plan that can be run to produce a logical stream of time series, /// as represented as sequence of SeriesSets from a single DataFusion /// plan, optionally grouped in some way. +/// +/// TODO: remove the tag/field designations below and attach a +/// `Schema` to the plan (which has the tag and field column +/// information natively) #[derive(Debug)] pub struct SeriesSetPlan { /// The table name this came from diff --git a/query/src/test.rs b/query/src/test.rs index 5cd5666ca9..4307005bcc 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -56,12 +56,6 @@ pub struct TestDatabase { /// `column_names` to return upon next request column_names: Arc>>, - /// Responses to return on the next request to `query_series` - query_series_values: Arc>>, - - /// The last request for `query_series` - query_series_request: Arc>>, - /// Responses to return on the next request to `query_groups` query_groups_values: Arc>>, @@ -69,13 +63,6 @@ pub struct TestDatabase { query_groups_request: Arc>>, } -/// Records the parameters passed to a `query_series` request -#[derive(Debug, PartialEq, Clone)] -pub struct QuerySeriesRequest { - /// Stringified '{:?}' version of the predicate - pub predicate: String, -} - /// Records the parameters passed to a `query_groups` request #[derive(Debug, PartialEq, Clone)] pub struct QueryGroupsRequest { @@ -159,16 +146,6 @@ impl TestDatabase { *Arc::clone(&self.column_names).lock() = Some(column_names) } - /// Set the series that will be returned on a call to query_series - pub fn set_query_series_values(&self, plan: SeriesSetPlans) { - *Arc::clone(&self.query_series_values).lock() = Some(plan); - } - - /// Get the parameters from the last column name request - pub fn get_query_series_request(&self) -> Option { - Arc::clone(&self.query_series_request).lock().take() - } - /// Set the series that will be returned on a call to query_groups pub fn set_query_groups_values(&self, plan: SeriesSetPlans) { *Arc::clone(&self.query_groups_values).lock() = Some(plan); @@ -234,22 +211,6 @@ impl Database for TestDatabase { Ok(()) } - async fn query_series(&self, predicate: Predicate) -> Result { - let predicate = predicate_to_test_string(&predicate); - - let new_queries_series_request = Some(QuerySeriesRequest { predicate }); - - *Arc::clone(&self.query_series_request).lock() = new_queries_series_request; - - Arc::clone(&self.query_series_values) - .lock() - .take() - // Turn None into an error - .context(General { - message: "No saved query_series in TestDatabase", - }) - } - async fn query_groups( &self, predicate: Predicate, diff --git a/server/src/db.rs b/server/src/db.rs index a85d571c71..c70fcac83c 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -306,18 +306,6 @@ impl Database for Db { .context(MutableBufferWrite) } - async fn query_series( - &self, - predicate: query::predicate::Predicate, - ) -> Result { - self.mutable_buffer - .as_ref() - .context(DatabaseNotReadable)? - .query_series(predicate) - .await - .context(MutableBufferRead) - } - async fn query_groups( &self, predicate: query::predicate::Predicate, diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 146aa33be1..ef61be3740 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -4,6 +4,7 @@ use mutable_buffer::chunk::Chunk as MBChunk; use query::{exec::stringset::StringSet, predicate::Predicate, PartitionChunk}; use read_buffer::Database as ReadBufferDb; use snafu::{ResultExt, Snafu}; +use tracing::debug; use std::sync::Arc; @@ -30,8 +31,14 @@ pub enum Error { #[snafu(display("Internal error restricting schema: {}", source))] InternalSelectingSchema { source: data_types::schema::Error }, - #[snafu(display("Internal Predicate Conversion Error: {}", source))] - InternalPredicateConversion { source: super::pred::Error }, + #[snafu(display("Predicate conversion error: {}", source))] + PredicateConversion { source: super::pred::Error }, + + #[snafu(display( + "Internal error: mutable buffer does not support predicate pushdown, but got: {:?}", + predicate + ))] + InternalPredicateNotSupported { predicate: Predicate }, #[snafu(display("internal error creating plan: {}", source))] InternalPlanCreation { @@ -113,9 +120,13 @@ impl PartitionChunk for DBChunk { if chunk.is_empty() { Some(StringSet::new()) } else { - let chunk_predicate = chunk - .compile_predicate(predicate) - .context(MutableBufferChunk)?; + let chunk_predicate = match chunk.compile_predicate(predicate) { + Ok(chunk_predicate) => chunk_predicate, + Err(e) => { + debug!(?predicate, %e, "mutable buffer predicate not supported for table_names, falling back"); + return Ok(None); + } + }; // we don't support arbitrary expressions in chunk predicate yet if !chunk_predicate.chunk_exprs.is_empty() { @@ -139,11 +150,15 @@ impl PartitionChunk for DBChunk { } => { let chunk_id = *chunk_id; - // TODO: figure out if this predicate was "not - // supported" or "actual error". If not supported, - // should return Ok(None) - let rb_predicate = - to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?; + // If not supported, ReadBuffer can't answer with + // metadata only + let rb_predicate = match to_read_buffer_predicate(&predicate) { + Ok(rb_predicate) => rb_predicate, + Err(e) => { + debug!(?predicate, %e, "read buffer predicate not supported for table_names, falling back"); + return Ok(None); + } + }; let names = db .table_names(partition_key, &[chunk_id], rb_predicate) @@ -243,9 +258,15 @@ impl PartitionChunk for DBChunk { ) -> Result { match self { Self::MutableBuffer { chunk } => { - // Note Mutable buffer doesn't support predicate + // Note MutableBuffer doesn't support predicate // pushdown (other than pruning out the entire chunk // via `might_pass_predicate) + if predicate != &Predicate::default() { + return InternalPredicateNotSupported { + predicate: predicate.clone(), + } + .fail(); + } let schema: Schema = self.table_schema(table_name, selection).await?; Ok(Box::pin(MutableBufferChunkStream::new( @@ -260,8 +281,9 @@ impl PartitionChunk for DBChunk { chunk_id, } => { let chunk_id = *chunk_id; + // Error converting to a rb_predicate needs to fail let rb_predicate = - to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?; + to_read_buffer_predicate(&predicate).context(PredicateConversion)?; let chunk_ids = &[chunk_id]; @@ -320,9 +342,13 @@ impl PartitionChunk for DBChunk { ) -> Result, Self::Error> { match self { Self::MutableBuffer { chunk } => { - let chunk_predicate = chunk - .compile_predicate(predicate) - .context(MutableBufferChunk)?; + let chunk_predicate = match chunk.compile_predicate(predicate) { + Ok(chunk_predicate) => chunk_predicate, + Err(e) => { + debug!(?predicate, %e, "mutable buffer predicate not supported for column_names, falling back"); + return Ok(None); + } + }; chunk .column_names(table_name, &chunk_predicate, columns) @@ -334,8 +360,13 @@ impl PartitionChunk for DBChunk { chunk_id, } => { let chunk_id = *chunk_id; - let rb_predicate = - to_read_buffer_predicate(&predicate).context(InternalPredicateConversion)?; + let rb_predicate = match to_read_buffer_predicate(&predicate) { + Ok(rb_predicate) => rb_predicate, + Err(e) => { + debug!(?predicate, %e, "read buffer predicate not supported for column_names, falling back"); + return Ok(None); + } + }; let chunk_ids = &[chunk_id]; @@ -361,9 +392,13 @@ impl PartitionChunk for DBChunk { Self::MutableBuffer { chunk } => { use mutable_buffer::chunk::Error::UnsupportedColumnTypeForListingValues; - let chunk_predicate = chunk - .compile_predicate(predicate) - .context(MutableBufferChunk)?; + let chunk_predicate = match chunk.compile_predicate(predicate) { + Ok(chunk_predicate) => chunk_predicate, + Err(e) => { + debug!(?predicate, %e, "mutable buffer predicate not supported for column_values, falling back"); + return Ok(None); + } + }; let values = chunk.tag_column_values(table_name, column_name, &chunk_predicate); diff --git a/server/src/query_tests/influxrpc.rs b/server/src/query_tests/influxrpc.rs index 665cf098aa..8306d9930a 100644 --- a/server/src/query_tests/influxrpc.rs +++ b/server/src/query_tests/influxrpc.rs @@ -1,4 +1,5 @@ pub mod field_columns; +pub mod read_filter; pub mod table_names; pub mod tag_keys; pub mod tag_values; diff --git a/server/src/query_tests/influxrpc/read_filter.rs b/server/src/query_tests/influxrpc/read_filter.rs new file mode 100644 index 0000000000..2a449c4048 --- /dev/null +++ b/server/src/query_tests/influxrpc/read_filter.rs @@ -0,0 +1,483 @@ +//! Tests for the Influx gRPC queries +use std::sync::Arc; + +use crate::query_tests::scenarios::*; +use arrow_deps::{ + arrow::util::pretty::pretty_format_batches, + datafusion::logical_plan::{col, lit}, +}; +use async_trait::async_trait; +use query::{ + exec::{ + field::FieldIndexes, + seriesset::{SeriesSet, SeriesSetItem}, + Executor, + }, + frontend::influxrpc::InfluxRPCPlanner, + predicate::{Predicate, PredicateBuilder}, +}; +use tokio::sync::mpsc; + +pub struct TwoMeasurementsMultiSeries {} +#[async_trait] +impl DBSetup for TwoMeasurementsMultiSeries { + async fn make(&self) -> Vec { + let partition_key = "1970-01-01T00"; + + let mut data = vec![ + "h2o,state=MA,city=Boston temp=70.4 100", // to row 2 + "h2o,state=MA,city=Boston temp=72.4 250", // to row 1 + "h2o,state=CA,city=LA temp=90.0 200", // to row 0 + "h2o,state=CA,city=LA temp=90.0 350", // to row 3 + "o2,state=MA,city=Boston temp=50.4,reading=50 100", // to row 5 + "o2,state=MA,city=Boston temp=53.4,reading=51 250", // to row 4 + ]; + + // Swap around data is not inserted in series order + data.swap(0, 2); + data.swap(4, 5); + + make_one_chunk_scenarios(partition_key, &data.join("\n")).await + } +} + +/// runs read_filter(predicate) and compares it to the expected +/// output +macro_rules! run_read_filter_test_case { + ($DB_SETUP:expr, $PREDICATE:expr, $EXPECTED_RESULTS:expr) => { + test_helpers::maybe_start_logging(); + let predicate = $PREDICATE; + let expected_results = $EXPECTED_RESULTS; + for scenario in $DB_SETUP.make().await { + let DBScenario { + scenario_name, db, .. + } = scenario; + println!("Running scenario '{}'", scenario_name); + println!("Predicate: '{:#?}'", predicate); + let planner = InfluxRPCPlanner::new(); + let executor = Executor::new(); + + let plans = planner + .read_filter(&db, predicate.clone()) + .await + .expect("built plan successfully"); + + // Use a channel sufficiently large to buffer the series + let (tx, mut rx) = mpsc::channel(100); + executor + .to_series_set(plans, tx) + .await + .expect("Running series set plan"); + + // gather up the sets and compare them + let mut results = vec![]; + while let Some(r) = rx.recv().await { + let item = r.expect("unexpected error in execution"); + let item = if let SeriesSetItem::Data(series_set) = item { + series_set + } + else { + panic!("Unexpected result from converting. Expected SeriesSetItem::Data, got: {:?}", item) + }; + + results.push(item); + } + + // sort the results so that we can reliably compare + results.sort_by(|r1, r2| { + r1 + .table_name + .cmp(&r2.table_name) + .then(r1.tags.cmp(&r2.tags)) + }); + + let string_results = results + .into_iter() + .map(|s| dump_series_set(s).into_iter()) + .flatten() + .collect::>(); + + assert_eq!( + expected_results, + string_results, + "Error in scenario '{}'\n\nexpected:\n{:#?}\nactual:\n{:#?}", + scenario_name, + expected_results, + string_results + ); + } + }; +} + +/// Format the field indexes into strings +fn dump_field_indexes(f: FieldIndexes) -> Vec { + f.as_slice() + .iter() + .map(|field_index| { + format!( + " (value_index: {}, timestamp_index: {})", + field_index.value_index, field_index.timestamp_index + ) + }) + .collect() +} + +/// Format a the vec of Arc strings paris into strings +fn dump_arc_vec(v: Vec<(Arc, Arc)>) -> Vec { + v.into_iter() + .map(|(k, v)| format!(" ({}, {})", k, v)) + .collect() +} + +/// Format a series set into a format that is easy to compare in tests +fn dump_series_set(s: SeriesSet) -> Vec { + let mut f = vec![]; + f.push("SeriesSet".into()); + f.push(format!("table_name: {}", s.table_name)); + f.push("tags".to_string()); + f.extend(dump_arc_vec(s.tags).into_iter()); + f.push("field_indexes:".to_string()); + f.extend(dump_field_indexes(s.field_indexes).into_iter()); + f.push(format!("start_row: {}", s.start_row)); + f.push(format!("num_rows: {}", s.num_rows)); + f.push("Batches:".into()); + let formatted_batch = pretty_format_batches(&[s.batch]).unwrap(); + f.extend(formatted_batch.trim().split('\n').map(|s| s.to_string())); + + f +} + +#[tokio::test] +async fn test_read_filter_no_data_no_pred() { + let predicate = Predicate::default(); + let expected_results = vec![] as Vec<&str>; + + run_read_filter_test_case!(NoData {}, predicate, expected_results); +} + +#[tokio::test] +async fn test_read_filter_data_no_pred() { + let predicate = Predicate::default(); + let expected_results = vec![ + "SeriesSet", + "table_name: h2o", + "tags", + " (city, Boston)", + " (state, MA)", + "field_indexes:", + " (value_index: 2, timestamp_index: 3)", + "start_row: 0", + "num_rows: 2", + "Batches:", + "+--------+-------+------+------+", + "| city | state | temp | time |", + "+--------+-------+------+------+", + "| Boston | MA | 70.4 | 100 |", + "| Boston | MA | 72.4 | 250 |", + "| LA | CA | 90 | 200 |", + "| LA | CA | 90 | 350 |", + "+--------+-------+------+------+", + "SeriesSet", + "table_name: h2o", + "tags", + " (city, LA)", + " (state, CA)", + "field_indexes:", + " (value_index: 2, timestamp_index: 3)", + "start_row: 2", + "num_rows: 2", + "Batches:", + "+--------+-------+------+------+", + "| city | state | temp | time |", + "+--------+-------+------+------+", + "| Boston | MA | 70.4 | 100 |", + "| Boston | MA | 72.4 | 250 |", + "| LA | CA | 90 | 200 |", + "| LA | CA | 90 | 350 |", + "+--------+-------+------+------+", + "SeriesSet", + "table_name: o2", + "tags", + " (city, Boston)", + " (state, MA)", + "field_indexes:", + " (value_index: 2, timestamp_index: 4)", + " (value_index: 3, timestamp_index: 4)", + "start_row: 0", + "num_rows: 2", + "Batches:", + "+--------+-------+---------+------+------+", + "| city | state | reading | temp | time |", + "+--------+-------+---------+------+------+", + "| Boston | MA | 50 | 50.4 | 100 |", + "| Boston | MA | 51 | 53.4 | 250 |", + "+--------+-------+---------+------+------+", + ]; + + run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results); +} + +#[tokio::test] +async fn test_read_filter_data_filter() { + // filter out one row in h20 + let predicate = PredicateBuilder::default() + .timestamp_range(200, 300) + .add_expr(col("state").eq(lit("CA"))) // state=CA + .build(); + + let expected_results = vec![ + "SeriesSet", + "table_name: h2o", + "tags", + " (city, LA)", + " (state, CA)", + "field_indexes:", + " (value_index: 2, timestamp_index: 3)", + "start_row: 0", + "num_rows: 1", + "Batches:", + "+------+-------+------+------+", + "| city | state | temp | time |", + "+------+-------+------+------+", + "| LA | CA | 90 | 200 |", + "+------+-------+------+------+", + ]; + + run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results); +} + +#[tokio::test] +async fn test_read_filter_data_pred_refers_to_non_existent_column() { + let predicate = PredicateBuilder::default() + .add_expr(col("tag_not_in_h20").eq(lit("foo"))) + .build(); + + let expected_results = vec![] as Vec<&str>; + + run_read_filter_test_case!(TwoMeasurements {}, predicate, expected_results); +} + +#[tokio::test] +async fn test_read_filter_data_pred_no_columns() { + // predicate with no columns, + let predicate = PredicateBuilder::default() + .add_expr(lit("foo").eq(lit("foo"))) + .build(); + + let expected_results = vec![ + "SeriesSet", + "table_name: cpu", + "tags", + " (region, west)", + "field_indexes:", + " (value_index: 1, timestamp_index: 2)", + "start_row: 0", + "num_rows: 2", + "Batches:", + "+--------+------+------+", + "| region | user | time |", + "+--------+------+------+", + "| west | 23.2 | 100 |", + "| west | 21 | 150 |", + "+--------+------+------+", + "SeriesSet", + "table_name: disk", + "tags", + " (region, east)", + "field_indexes:", + " (value_index: 1, timestamp_index: 2)", + "start_row: 0", + "num_rows: 1", + "Batches:", + "+--------+-------+------+", + "| region | bytes | time |", + "+--------+-------+------+", + "| east | 99 | 200 |", + "+--------+-------+------+", + ]; + + run_read_filter_test_case!(TwoMeasurements {}, predicate, expected_results); +} + +#[tokio::test] +async fn test_read_filter_data_pred_refers_to_good_and_non_existent_columns() { + // predicate with both a column that does and does not appear + let predicate = PredicateBuilder::default() + .add_expr(col("state").eq(lit("MA"))) + .add_expr(col("tag_not_in_h20").eq(lit("foo"))) + .build(); + + let expected_results = vec![] as Vec<&str>; + + run_read_filter_test_case!(TwoMeasurements {}, predicate, expected_results); +} + +#[tokio::test] +async fn test_read_filter_data_pred_unsupported_in_scan() { + test_helpers::maybe_start_logging(); + + // These predicates can't be pushed down into chunks, but they can + // be evaluated by the general purpose DataFusion plan + // https://github.com/influxdata/influxdb_iox/issues/883 + // (STATE = 'CA') OR (READING > 0) + let predicate = PredicateBuilder::default() + .add_expr(col("state").eq(lit("CA")).or(col("reading").gt(lit(0)))) + .build(); + + // Note these results are incorrect (they do not include data from h2o where + // state = CA) + let expected_results = vec![ + "SeriesSet", + "table_name: o2", + "tags", + " (city, Boston)", + " (state, MA)", + "field_indexes:", + " (value_index: 2, timestamp_index: 4)", + " (value_index: 3, timestamp_index: 4)", + "start_row: 0", + "num_rows: 2", + "Batches:", + "+--------+-------+---------+------+------+", + "| city | state | reading | temp | time |", + "+--------+-------+---------+------+------+", + "| Boston | MA | 50 | 50.4 | 100 |", + "| Boston | MA | 51 | 53.4 | 250 |", + "+--------+-------+---------+------+------+", + ]; + + run_read_filter_test_case!(TwoMeasurementsMultiSeries {}, predicate, expected_results); +} + +pub struct MeasurementsSortableTags {} +#[async_trait] +impl DBSetup for MeasurementsSortableTags { + async fn make(&self) -> Vec { + let partition_key = "1970-01-01T00"; + + let data = vec![ + "h2o,zz_tag=A,state=MA,city=Kingston temp=70.1 800", + "h2o,state=MA,city=Kingston,zz_tag=B temp=70.2 100", + "h2o,state=CA,city=Boston temp=70.3 250", + "h2o,state=MA,city=Boston,zz_tag=A temp=70.4 1000", + "h2o,state=MA,city=Boston temp=70.5,other=5.0 250", + ]; + + make_one_chunk_scenarios(partition_key, &data.join("\n")).await + } +} + +#[tokio::test] +async fn test_read_filter_data_plan_order() { + test_helpers::maybe_start_logging(); + let predicate = Predicate::default(); + let expected_results = vec![ + "SeriesSet", + "table_name: h2o", + "tags", + " (city, Boston)", + " (state, CA)", + " (zz_tag, )", + "field_indexes:", + " (value_index: 3, timestamp_index: 5)", + " (value_index: 4, timestamp_index: 5)", + "start_row: 0", + "num_rows: 1", + "Batches:", + "+----------+-------+--------+-------+------+------+", + "| city | state | zz_tag | other | temp | time |", + "+----------+-------+--------+-------+------+------+", + "| Boston | CA | | | 70.3 | 250 |", + "| Boston | MA | | 5 | 70.5 | 250 |", + "| Boston | MA | A | | 70.4 | 1000 |", + "| Kingston | MA | A | | 70.1 | 800 |", + "| Kingston | MA | B | | 70.2 | 100 |", + "+----------+-------+--------+-------+------+------+", + "SeriesSet", + "table_name: h2o", + "tags", + " (city, Boston)", + " (state, MA)", + " (zz_tag, )", + "field_indexes:", + " (value_index: 3, timestamp_index: 5)", + " (value_index: 4, timestamp_index: 5)", + "start_row: 1", + "num_rows: 1", + "Batches:", + "+----------+-------+--------+-------+------+------+", + "| city | state | zz_tag | other | temp | time |", + "+----------+-------+--------+-------+------+------+", + "| Boston | CA | | | 70.3 | 250 |", + "| Boston | MA | | 5 | 70.5 | 250 |", + "| Boston | MA | A | | 70.4 | 1000 |", + "| Kingston | MA | A | | 70.1 | 800 |", + "| Kingston | MA | B | | 70.2 | 100 |", + "+----------+-------+--------+-------+------+------+", + "SeriesSet", + "table_name: h2o", + "tags", + " (city, Boston)", + " (state, MA)", + " (zz_tag, A)", + "field_indexes:", + " (value_index: 3, timestamp_index: 5)", + " (value_index: 4, timestamp_index: 5)", + "start_row: 2", + "num_rows: 1", + "Batches:", + "+----------+-------+--------+-------+------+------+", + "| city | state | zz_tag | other | temp | time |", + "+----------+-------+--------+-------+------+------+", + "| Boston | CA | | | 70.3 | 250 |", + "| Boston | MA | | 5 | 70.5 | 250 |", + "| Boston | MA | A | | 70.4 | 1000 |", + "| Kingston | MA | A | | 70.1 | 800 |", + "| Kingston | MA | B | | 70.2 | 100 |", + "+----------+-------+--------+-------+------+------+", + "SeriesSet", + "table_name: h2o", + "tags", + " (city, Kingston)", + " (state, MA)", + " (zz_tag, A)", + "field_indexes:", + " (value_index: 3, timestamp_index: 5)", + " (value_index: 4, timestamp_index: 5)", + "start_row: 3", + "num_rows: 1", + "Batches:", + "+----------+-------+--------+-------+------+------+", + "| city | state | zz_tag | other | temp | time |", + "+----------+-------+--------+-------+------+------+", + "| Boston | CA | | | 70.3 | 250 |", + "| Boston | MA | | 5 | 70.5 | 250 |", + "| Boston | MA | A | | 70.4 | 1000 |", + "| Kingston | MA | A | | 70.1 | 800 |", + "| Kingston | MA | B | | 70.2 | 100 |", + "+----------+-------+--------+-------+------+------+", + "SeriesSet", + "table_name: h2o", + "tags", + " (city, Kingston)", + " (state, MA)", + " (zz_tag, B)", + "field_indexes:", + " (value_index: 3, timestamp_index: 5)", + " (value_index: 4, timestamp_index: 5)", + "start_row: 4", + "num_rows: 1", + "Batches:", + "+----------+-------+--------+-------+------+------+", + "| city | state | zz_tag | other | temp | time |", + "+----------+-------+--------+-------+------+------+", + "| Boston | CA | | | 70.3 | 250 |", + "| Boston | MA | | 5 | 70.5 | 250 |", + "| Boston | MA | A | | 70.4 | 1000 |", + "| Kingston | MA | A | | 70.1 | 800 |", + "| Kingston | MA | B | | 70.2 | 100 |", + "+----------+-------+--------+-------+------+------+", + ]; + + run_read_filter_test_case!(MeasurementsSortableTags {}, predicate, expected_results); +} diff --git a/server/src/query_tests/scenarios.rs b/server/src/query_tests/scenarios.rs index 40bc8d4e09..4ce4974ee7 100644 --- a/server/src/query_tests/scenarios.rs +++ b/server/src/query_tests/scenarios.rs @@ -183,7 +183,7 @@ impl DBSetup for EndToEndTest { /// Data in single closed mutable buffer chunk, one closed mutable chunk /// Data in both read buffer and mutable buffer chunk /// Data in one only read buffer chunk -async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec { +pub(crate) async fn make_one_chunk_scenarios(partition_key: &str, data: &str) -> Vec { let db = make_db(); let mut writer = TestLPWriter::default(); writer.write_lp_string(&db, data).await.unwrap(); diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index 1cc51fec19..570964f0c2 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -911,7 +911,7 @@ async fn read_filter_impl<'a, T>( rpc_predicate: Option, ) -> Result<()> where - T: DatabaseStore, + T: DatabaseStore + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -935,8 +935,10 @@ where let executor = db_store.executor(); - let series_plan = db - .query_series(predicate) + let planner = InfluxRPCPlanner::new(); + + let series_plan = planner + .read_filter(db.as_ref(), predicate) .await .map_err(|e| Box::new(e) as _) .context(PlanningFilteringSeries { db_name })?; @@ -1112,8 +1114,8 @@ mod tests { group_by::{Aggregate as QueryAggregate, WindowDuration as QueryWindowDuration}, plan::seriesset::SeriesSetPlans, test::QueryGroupsRequest, + test::TestChunk, test::TestDatabaseStore, - test::{QuerySeriesRequest, TestChunk}, }; use std::{ convert::TryFrom, @@ -1840,18 +1842,25 @@ mod tests { } #[tokio::test] - async fn test_read_filter() -> Result<(), tonic::Status> { + async fn test_read_filter() { // Start a test gRPC server on a randomally allocated port let mut fixture = Fixture::new().await.expect("Connecting to test server"); let db_info = OrgAndBucket::new(123, 456); let partition_id = 1; - let test_db = fixture + // Add a chunk with a field + let chunk = TestChunk::new(0) + .with_time_column("TheMeasurement") + .with_tag_column("TheMeasurement", "state") + .with_one_row_of_null_data("TheMeasurement"); + + fixture .test_storage .db_or_create(&db_info.db_name) .await - .expect("creating test database"); + .unwrap() + .add_chunk("my_partition_key", Arc::new(chunk)); let source = Some(StorageClientWrapper::read_source( db_info.org_id, @@ -1861,31 +1870,43 @@ mod tests { let request = ReadFilterRequest { read_source: source.clone(), - range: make_timestamp_range(150, 200), + range: make_timestamp_range(0, 10000), predicate: make_state_ma_predicate(), }; - let expected_request = QuerySeriesRequest { - predicate: "Predicate { exprs: [#state Eq Utf8(\"MA\")] range: TimestampRange { start: 150, end: 200 }}".into() - }; + let actual_frames = fixture.storage_client.read_filter(request).await.unwrap(); - let dummy_series_set_plan = SeriesSetPlans::from(vec![]); - test_db.set_query_series_values(dummy_series_set_plan); - - let actual_frames = fixture.storage_client.read_filter(request).await?; - - // TODO: encode this in the test case or something + // TODO: encode the actual output in the test case or something let expected_frames: Vec = vec!["0 frames".into()]; assert_eq!( actual_frames, expected_frames, "unexpected frames returned by query_series", ); - assert_eq!( - test_db.get_query_series_request(), - Some(expected_request), - "unexpected request to query_series", - ); + } + + #[tokio::test] + async fn test_read_filter_error() { + // Start a test gRPC server on a randomally allocated port + let mut fixture = Fixture::new().await.expect("Connecting to test server"); + + let db_info = OrgAndBucket::new(123, 456); + let partition_id = 1; + + let chunk = TestChunk::new(0).with_error("Sugar we are going down"); + + fixture + .test_storage + .db_or_create(&db_info.db_name) + .await + .unwrap() + .add_chunk("my_partition_key", Arc::new(chunk)); + + let source = Some(StorageClientWrapper::read_source( + db_info.org_id, + db_info.bucket_id, + partition_id, + )); // --- // test error @@ -1898,22 +1919,7 @@ mod tests { // Note we don't set the response on the test database, so we expect an error let response = fixture.storage_client.read_filter(request).await; - assert!(response.is_err()); - let response_string = format!("{:?}", response); - let expected_error = "No saved query_series in TestDatabase"; - assert!( - response_string.contains(expected_error), - "'{}' did not contain expected content '{}'", - response_string, - expected_error - ); - - let expected_request = Some(QuerySeriesRequest { - predicate: "Predicate {}".into(), - }); - assert_eq!(test_db.get_query_series_request(), expected_request); - - Ok(()) + assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down"); } #[tokio::test]