test: Port read_filter query_tests to end-to-end tests

pull/24376/head
Carol (Nichols || Goulding) 2023-01-25 14:27:25 -05:00
parent 9d490ceb1a
commit 67c430da63
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 820 additions and 637 deletions

View File

@ -1,7 +1,8 @@
use super::{dump::dump_data_frames, read_group_data, run_data_test};
use super::{dump::dump_data_frames, read_group_data, run_data_test, InfluxRpcTest};
use async_trait::async_trait;
use futures::{prelude::*, FutureExt};
use generated_types::{
read_response::frame::Data, storage_client::StorageClient, ReadFilterRequest,
node::Logical, read_response::frame::Data, storage_client::StorageClient, ReadFilterRequest,
};
use influxdb_iox_client::connection::GrpcConnection;
use std::sync::Arc;
@ -377,3 +378,669 @@ async fn do_read_filter_request(
dump_data_frames(&frames)
}
#[tokio::test]
async fn no_predicate() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new(),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [100, 250], values: \"70.4,72.4\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 0",
"FloatPointsFrame, timestamps: [200, 350], values: \"90,90\"",
"SeriesFrame, tags: _field=reading,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [100, 250], values: \"50,51\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [100, 250], values: \"50.4,53.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn exclusive_timestamp_range_predicate() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new()
// should *not* return the 350 row as the predicate is range.start <= ts < range.end
.timestamp_range(349, 350),
expected_results: vec![],
})
.run()
.await;
}
#[tokio::test]
async fn inclusive_timestamp_range_predicate() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new()
// *should* return the 350 row as the predicate is range.start <= ts < range.end
.timestamp_range(350, 351),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 0",
"FloatPointsFrame, timestamps: [350], values: \"90\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn exact_timestamp_range_predicate_multiple_results() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new().timestamp_range(250, 251),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"72.4\"",
"SeriesFrame, tags: _field=reading,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"51\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"53.4\"",
],
})
.run()
.await;
}
// #[tokio::test]
// async fn tag_predicate_always_true() {
// // This is how the test existed in query_tests:
//
// let predicate = Predicate::new()
// // region = region
// .with_expr(col("region").eq(col("region")));
// let predicate = InfluxRpcPredicate::new(None, predicate);
//
// // expect both series to be returned
// let expected_results = vec![
// "Series tags={_field=user, _measurement=cpu, region=west}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
// "Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
// ];
//
// run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
//
// // Translated, this test is:
//
// Arc::new(ReadFilterTest {
// setup_name: "TwoMeasurements",
// request: GrpcRequestBuilder::new().tag_predicate("region", "region"),
// expected_results: vec![
// "SeriesFrame, tags: _field=user,_measurement=cpu,region=west, type: 0",
// "FloatPointsFrame, timestamps: [100, 150], values: \"23.2,21\"",
// "SeriesFrame, tags: _field=bytes,_measurement=disk,region=east, type: 1",
// "IntegerPointsFrame, timestamps: [200], values: \"99\"",
// ],
// })
// .run()
// .await;
//
// // This fails because the tag predicate now means 'column region = literal "region"' which
// // evaluates to no rows, rather than all rows.
// }
// #[tokio::test]
// async fn invalid_tag_predicate() {
// // This is how the test existed in query_tests:
//
// let v = ScalarValue::Binary(Some(vec![]));
// let predicate = Predicate::new()
// // region > <binary> (region is a tag(string) column, so this predicate is invalid)
// .with_expr(col("region").gt(lit(v)));
// let predicate = InfluxRpcPredicate::new(None, predicate);
//
// let expected_error = "Dictionary(Int32, Utf8) > Binary' can't be evaluated because there isn't a common type to coerce the types to";
//
// run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
//
// // I don't know how to translate this query to a `generated_types::Predicate`.
// }
// #[tokio::test]
// async fn test_read_filter_invalid_predicate_case() {
// // This is how the test existed in query_tests:
//
// let v = ScalarValue::Binary(Some(vec![]));
// let predicate = Predicate::new()
// // https://github.com/influxdata/influxdb_iox/issues/3635
// // model what happens when a field is treated like a tag
// // CASE WHEN system" IS NULL THEN '' ELSE system END = binary;
// .with_expr(make_empty_tag_ref_expr("system").eq(lit(v)));
// let predicate = InfluxRpcPredicate::new(None, predicate);
//
// let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Binary' can't be evaluated because there isn't a common type to coerce the types to";
//
// run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
//
// // I don't know how to translate this query to a `generated_types::Predicate`.
// }
#[tokio::test]
async fn unknown_columns_in_predicate_no_results() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurements",
request: GrpcRequestBuilder::new()
// bar is not a column that appears in the data; produce no results
.tag_predicate("bar", "baz"),
expected_results: vec![],
})
.run()
.await;
}
#[tokio::test]
async fn tag_predicate_containing_field() {
// Columns in the RPC predicate must be treated as tags:
// https://github.com/influxdata/idpe/issues/16238
Arc::new(ReadFilterTest {
setup_name: "StringFieldWithNumericValue",
request: GrpcRequestBuilder::new()
// fld exists in the table, but only as a field, not a tag, so no data should be
// returned.
.tag_predicate("fld", "200"),
expected_results: vec![],
})
.run()
.await;
}
// #[tokio::test]
// async fn tag_predicate_containing_field_coerce_number() {
// // This is how the test existed in query tests:
//
// // Same as above except for the predicate compares to an integer literal.
// let predicate = Predicate::new().with_expr(make_empty_tag_ref_expr("fld").eq(lit(200)));
// let predicate = InfluxRpcPredicate::new(None, predicate);
// let expected_results = vec![];
// run_read_filter_test_case(StringFieldWithNumericValue {}, predicate, expected_results).await;
//
// // I don't know how to create a tag predicate using an integer literal.
// }
#[tokio::test]
async fn tag_predicates() {
let expected_results = vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 0",
"FloatPointsFrame, timestamps: [200], values: \"90\"",
];
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new()
.timestamp_range(200, 300)
// filter to one row in h2o
.tag_predicate("state", "CA"),
expected_results: expected_results.clone(),
})
.run()
.await;
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new()
.timestamp_range(200, 300)
// Same results via a != predicate.
.not_tag_predicate("state", "MA"),
expected_results,
})
.run()
.await;
}
#[tokio::test]
async fn field_and_tag_predicates() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: GrpcRequestBuilder::new()
// filter to one row in h2o
.field_predicate("other_temp")
.tag_predicate("state", "CA"),
expected_results: vec![
"SeriesFrame, tags: _field=other_temp,_measurement=h2o,city=Boston,state=CA, type: 0",
"FloatPointsFrame, timestamps: [350], values: \"72.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn field_exact_match_predicate() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: GrpcRequestBuilder::new().field_predicate("temp"),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [50, 100000], values: \"70.4,70.4\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,state=CA, type: 0",
"FloatPointsFrame, timestamps: [300], values: \"79\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [50], values: \"53.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn not_field_predicate() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: GrpcRequestBuilder::new().not_field_predicate("temp"),
expected_results: vec![
"SeriesFrame, tags: _field=other_temp,_measurement=h2o,city=Boston,state=CA, type: 0",
"FloatPointsFrame, timestamps: [350], values: \"72.4\"",
"SeriesFrame, tags: _field=moisture,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [100000], values: \"43\"",
"SeriesFrame, tags: _field=other_temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"70.4\"",
"SeriesFrame, tags: _field=reading,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [50], values: \"51\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn field_regex_match_predicates() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: GrpcRequestBuilder::new().regex_match_predicate("_field", "temp"),
expected_results: vec![
// Should see results for temp and other_temp (but not reading)
"SeriesFrame, tags: _field=other_temp,_measurement=h2o,city=Boston,state=CA, type: 0",
"FloatPointsFrame, timestamps: [350], values: \"72.4\"",
"SeriesFrame, tags: _field=other_temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"70.4\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [50, 100000], values: \"70.4,70.4\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,state=CA, type: 0",
"FloatPointsFrame, timestamps: [300], values: \"79\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [50], values: \"53.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn field_and_measurement_predicates() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: GrpcRequestBuilder::new()
.field_predicate("temp")
.measurement_predicate("h2o"),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [50, 100000], values: \"70.4,70.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn multi_field_and_measurement_predicates() {
// Predicate should pick 'temp' field from h2o and 'other_temp' from o2
//
// (_field = 'other_temp' AND _measurement = 'h2o') OR (_field = 'temp' AND _measurement = 'o2')
let p1 = GrpcRequestBuilder::new()
.field_predicate("other_temp")
.measurement_predicate("h2o");
let p2 = GrpcRequestBuilder::new()
.field_predicate("temp")
.measurement_predicate("o2");
let node2 = p2.predicate.unwrap().root.unwrap();
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: p1.combine_predicate(Logical::Or, node2),
expected_results: vec![
// SHOULD NOT contain temp from h2o
"SeriesFrame, tags: _field=other_temp,_measurement=h2o,city=Boston,state=CA, type: 0",
"FloatPointsFrame, timestamps: [350], values: \"72.4\"",
"SeriesFrame, tags: _field=other_temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"70.4\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,state=CA, type: 0",
"FloatPointsFrame, timestamps: [300], values: \"79\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [50], values: \"53.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn measurement_predicates() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: GrpcRequestBuilder::new()
// use an expr on table name to pick just the last row from o2
.timestamp_range(200, 400)
.measurement_predicate("o2"),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=o2,state=CA, type: 0",
"FloatPointsFrame, timestamps: [300], values: \"79\"",
],
})
.run()
.await;
}
// #[tokio::test]
// async fn predicate_no_columns() {
// // This is how the test existed in query tests:
// // predicate with no columns,
// let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo")));
// let predicate = InfluxRpcPredicate::new(None, predicate);
//
// let expected_results = vec![
// "Series tags={_field=user, _measurement=cpu, region=west}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
// "Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
// ];
//
// run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
//
// // I don't know how to create a tag predicate using a string literal.
// }
#[tokio::test]
async fn tag_regex_match_predicates() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new()
.timestamp_range(200, 300)
// will match CA state
.regex_match_predicate("state", "C.*"),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 0",
"FloatPointsFrame, timestamps: [200], values: \"90\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn tag_regex_not_match_predicates() {
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiSeries",
request: GrpcRequestBuilder::new()
.timestamp_range(200, 300)
// will filter out any rows with a state that matches "CA"
.not_regex_match_predicate("state", "C.*"),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"72.4\"",
"SeriesFrame, tags: _field=reading,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"51\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"53.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn tag_regex_escaped_predicates() {
Arc::new(ReadFilterTest {
setup_name: "MeasurementStatusCode",
request: GrpcRequestBuilder::new()
// Came from InfluxQL as:
//
// ```text
// SELECT value
// FROM db0.rp0.status_code
// WHERE url =~ /https\:\/\/influxdb\.com/
// ```
.regex_match_predicate("url", r#"https\://influxdb\.com"#),
expected_results: vec![
// expect one series with influxdb.com
"SeriesFrame, tags: _field=value,_measurement=status_code,url=https://influxdb.com, \
type: 0",
"FloatPointsFrame, timestamps: [1527018816000000000], values: \"418\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn tag_not_match_regex_escaped_predicates() {
Arc::new(ReadFilterTest {
setup_name: "MeasurementStatusCode",
request: GrpcRequestBuilder::new()
// Came from InfluxQL as:
//
// ```text
// SELECT value
// FROM db0.rp0.status_code
// WHERE url !~ /https\:\/\/influxdb\.com/
// ```
.not_regex_match_predicate("url", r#"https\://influxdb\.com"#),
expected_results: vec![
// expect one series with example.com
"SeriesFrame, tags: _field=value,_measurement=status_code,url=http://www.example.com, \
type: 0",
"FloatPointsFrame, timestamps: [1527018806000000000], values: \"404\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn predicate_unsupported_in_scan() {
// These kinds of predicates can't be pushed down into chunks, but they can be evaluated by the
// general purpose DataFusion plan
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsMultiTagValue",
request: GrpcRequestBuilder::new()
.or_tag_predicates([("state", "CA"), ("city", "Boston")].into_iter()),
expected_results: vec![
// Note these results include data from both o2 and h2o
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [100], values: \"70.4\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 0",
"FloatPointsFrame, timestamps: [200], values: \"90\"",
"SeriesFrame, tags: _field=reading,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [100], values: \"50\"",
"SeriesFrame, tags: _field=temp,_measurement=o2,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [100], values: \"50.4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn multi_negation() {
// reproducer for https://github.com/influxdata/influxdb_iox/issues/4800
Arc::new(ReadFilterTest {
setup_name: "EndToEndTest",
request: GrpcRequestBuilder::new()
.or_tag_predicates([("host", "server01"), ("host", "")].into_iter()),
expected_results: vec![
"SeriesFrame, tags: _field=color,_measurement=attributes, type: 4",
"StringPointsFrame, timestamps: [8000], values: blue",
"SeriesFrame, tags: _field=value,_measurement=cpu_load_short,host=server01, type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"27.99\"",
"SeriesFrame, tags: \
_field=value,_measurement=cpu_load_short,host=server01,region=us-east, type: 0",
"FloatPointsFrame, timestamps: [3000], values: \"1234567.891011\"",
"SeriesFrame, tags: \
_field=value,_measurement=cpu_load_short,host=server01,region=us-west, type: 0",
"FloatPointsFrame, timestamps: [0, 4000], values: \"0.64,0.000003\"",
"SeriesFrame, tags: _field=active,_measurement=status, type: 3",
"BooleanPointsFrame, timestamps: [7000], values: true",
"SeriesFrame, tags: _field=in,_measurement=swap,host=server01,name=disk0, type: 0",
"FloatPointsFrame, timestamps: [6000], values: \"3\"",
"SeriesFrame, tags: _field=out,_measurement=swap,host=server01,name=disk0, type: 0",
"FloatPointsFrame, timestamps: [6000], values: \"4\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn data_plan_order() {
Arc::new(ReadFilterTest {
setup_name: "MeasurementsSortableTags",
request: GrpcRequestBuilder::new(),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=CA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"70.3\"",
"SeriesFrame, tags: _field=other,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"5\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [250], values: \"70.5\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA,zz_tag=A, \
type: 0",
"FloatPointsFrame, timestamps: [1000], values: \"70.4\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Kingston,state=MA,zz_tag=A, \
type: 0",
"FloatPointsFrame, timestamps: [800], values: \"70.1\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Kingston,state=MA,zz_tag=B, \
type: 0",
"FloatPointsFrame, timestamps: [100], values: \"70.2\"",
],
})
.run()
.await;
}
// #[tokio::test]
// async fn filter_on_value() {
// // This is how the test existed in query tests:
//
// test_helpers::maybe_start_logging();
//
// let predicate = Predicate::default()
// .with_expr(col("_value").eq(lit(1.77)))
// .with_expr(col("_field").eq(lit("load4")));
// let predicate = InfluxRpcPredicate::new(None, predicate);
//
// let expected_results = vec![
// "Series tags={_field=load4, _measurement=system, host=host.local}\n FloatPoints timestamps: [1527018806000000000, 1527018826000000000], values: [1.77, 1.77]",
// ];
//
// run_read_filter_test_case(MeasurementsForDefect2845 {}, predicate, expected_results).await;
//
// // I can't figure out how to create a `_value=` expression.
// }
#[tokio::test]
#[should_panic(expected = "Unsupported _field predicate")]
async fn unsupported_field_predicate() {
// Tell the test to panic with the expected message if `TEST_INTEGRATION` isn't set so that
// this still passes
maybe_skip_integration!("Unsupported _field predicate");
Arc::new(ReadFilterTest {
setup_name: "TwoMeasurementsManyFields",
request: GrpcRequestBuilder::new()
.not_field_predicate("temp")
.or_tag_predicates([("city", "Boston")].into_iter()),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=o2,state=CA, type: 0",
"FloatPointsFrame, timestamps: [300], values: \"79\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn periods_in_names() {
Arc::new(ReadFilterTest {
setup_name: "PeriodsInNames",
request: GrpcRequestBuilder::new().timestamp_range(0, 1_700_000_001_000_000_000),
expected_results: vec![
// Should return both series
"SeriesFrame, tags: \
_field=field.one,_measurement=measurement.one,tag.one=value,tag.two=other, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000001], values: \"1\"",
"SeriesFrame, tags: \
_field=field.two,_measurement=measurement.one,tag.one=value,tag.two=other, type: 3",
"BooleanPointsFrame, timestamps: [1609459201000000001], values: true",
"SeriesFrame, tags: \
_field=field.one,_measurement=measurement.one,tag.one=value2,tag.two=other2, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000002], values: \"1\"",
"SeriesFrame, tags: \
_field=field.two,_measurement=measurement.one,tag.one=value2,tag.two=other2, type: 3",
"BooleanPointsFrame, timestamps: [1609459201000000002], values: false",
],
})
.run()
.await;
}
#[tokio::test]
async fn periods_in_predicates() {
Arc::new(ReadFilterTest {
setup_name: "PeriodsInNames",
request: GrpcRequestBuilder::new()
.timestamp_range(0, 1_700_000_001_000_000_000)
.tag_predicate("tag.one", "value"),
expected_results: vec![
// Should return both series
"SeriesFrame, tags: \
_field=field.one,_measurement=measurement.one,tag.one=value,tag.two=other, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000001], values: \"1\"",
"SeriesFrame, tags: \
_field=field.two,_measurement=measurement.one,tag.one=value,tag.two=other, type: 3",
"BooleanPointsFrame, timestamps: [1609459201000000001], values: true",
],
})
.run()
.await;
}
#[derive(Debug)]
struct ReadFilterTest {
setup_name: &'static str,
request: GrpcRequestBuilder,
expected_results: Vec<&'static str>,
}
#[async_trait]
impl InfluxRpcTest for ReadFilterTest {
fn setup_name(&self) -> &'static str {
self.setup_name
}
async fn request_and_assert(&self, cluster: &MiniCluster) {
let mut storage_client = cluster.querier_storage_client();
let read_filter_request = self.request.clone().source(cluster).build_read_filter();
let read_filter_response = storage_client
.read_filter(read_filter_request)
.await
.expect("successful read_filter call");
let responses: Vec<_> = read_filter_response
.into_inner()
.try_collect()
.await
.unwrap();
let frames: Vec<_> = responses
.into_iter()
.flat_map(|r| r.frames)
.flat_map(|f| f.data)
.collect();
let results = dump_data_frames(&frames);
assert_eq!(results, self.expected_results);
}
}

View File

@ -506,5 +506,144 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
},
],
),
(
"TwoMeasurementsMultiSeries",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
// Data is deliberately not in series order.
"h2o,state=CA,city=LA temp=90.0 200",
"h2o,state=MA,city=Boston temp=72.4 250",
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=CA,city=LA temp=90.0 350",
"o2,state=MA,city=Boston temp=53.4,reading=51 250",
"o2,state=MA,city=Boston temp=50.4,reading=50 100",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 2,
},
],
),
(
// This recreates the test case for <https://github.com/influxdata/idpe/issues/16238>.
"StringFieldWithNumericValue",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
["m,tag0=foo fld=\"200\" 1000", "m,tag0=foo fld=\"404\" 1050"].join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"MeasurementStatusCode",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"status_code,url=http://www.example.com value=404 1527018806000000000",
"status_code,url=https://influxdb.com value=418 1527018816000000000",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"TwoMeasurementsMultiTagValue",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Lowell temp=75.4 100",
"h2o,state=CA,city=LA temp=90.0 200",
"o2,state=MA,city=Boston temp=50.4,reading=50 100",
"o2,state=KS,city=Topeka temp=60.4,reading=60 100",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 2,
},
],
),
(
"MeasurementsSortableTags",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"h2o,zz_tag=A,state=MA,city=Kingston temp=70.1 800",
"h2o,state=MA,city=Kingston,zz_tag=B temp=70.2 100",
"h2o,state=CA,city=Boston temp=70.3 250",
"h2o,state=MA,city=Boston,zz_tag=A temp=70.4 1000",
"h2o,state=MA,city=Boston temp=70.5,other=5.0 250",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
// See issue: https://github.com/influxdata/influxdb_iox/issues/2845
"MeasurementsForDefect2845",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"system,host=host.local load1=1.83 1527018806000000000",
"system,host=host.local load1=1.63 1527018816000000000",
"system,host=host.local load3=1.72 1527018806000000000",
"system,host=host.local load4=1.77 1527018806000000000",
"system,host=host.local load4=1.78 1527018816000000000",
"system,host=host.local load4=1.77 1527018826000000000",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"EndToEndTest",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"cpu_load_short,host=server01,region=us-west value=0.64 0000",
"cpu_load_short,host=server01 value=27.99 1000",
"cpu_load_short,host=server02,region=us-west value=3.89 2000",
"cpu_load_short,host=server01,region=us-east value=1234567.891011 3000",
"cpu_load_short,host=server01,region=us-west value=0.000003 4000",
"system,host=server03 uptime=1303385 5000",
"swap,host=server01,name=disk0 in=3,out=4 6000",
"status active=t 7000",
"attributes color=\"blue\" 8000",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 5,
},
],
),
])
});

View File

@ -1,4 +1,3 @@
pub mod read_filter;
pub mod read_group;
pub mod read_window_aggregate;
pub mod table_names;

View File

@ -1,632 +0,0 @@
//! Tests for the Influx gRPC queries
use std::sync::Arc;
#[cfg(test)]
use crate::scenarios::{
DbScenario, DbSetup, EndToEndTest, StringFieldWithNumericValue, TwoMeasurements,
TwoMeasurementsManyFields,
};
use crate::{
db::AbstractDb,
influxrpc::util::run_series_set_plan_maybe_error,
scenarios::{
MeasurementStatusCode, MeasurementsForDefect2845, MeasurementsSortableTags, PeriodsInNames,
TwoMeasurementsMultiSeries, TwoMeasurementsMultiTagValue,
},
};
use datafusion::{
prelude::{col, lit},
scalar::ScalarValue,
};
use datafusion_util::AsExpr;
use iox_query::frontend::influxrpc::InfluxRpcPlanner;
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::Predicate;
use test_helpers::assert_contains;
use super::util::make_empty_tag_ref_expr;
/// runs read_filter(predicate) and compares it to the expected
/// output
async fn run_read_filter_test_case<D>(
db_setup: D,
predicate: InfluxRpcPredicate,
expected_results: Vec<&str>,
) where
D: DbSetup,
{
test_helpers::maybe_start_logging();
for scenario in db_setup.make().await {
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let string_results = run_read_filter(predicate.clone(), db)
.await
.expect("Unexpected error running read filter");
assert_eq!(
expected_results, string_results,
"Error in scenario '{}'\n\nexpected:\n{:#?}\n\nactual:\n{:#?}\n\n",
scenario_name, expected_results, string_results
);
}
}
/// runs read_filter(predicate) and compares it to the expected
/// output
async fn run_read_filter(
predicate: InfluxRpcPredicate,
db: Arc<dyn AbstractDb>,
) -> Result<Vec<String>, String> {
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.read_filter(db.as_query_namespace_arc(), predicate)
.await
.map_err(|e| e.to_string())?;
run_series_set_plan_maybe_error(&ctx, plan)
.await
.map_err(|e| e.to_string())
}
/// runs read_filter(predicate), expecting an error and compares to expected message
async fn run_read_filter_error_case<D>(
db_setup: D,
predicate: InfluxRpcPredicate,
expected_error: &str,
) where
D: DbSetup,
{
test_helpers::maybe_start_logging();
for scenario in db_setup.make().await {
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let result = run_read_filter(predicate.clone(), db)
.await
.expect_err("Unexpected success running error case");
assert_contains!(result.to_string(), expected_error);
}
}
#[tokio::test]
async fn test_read_filter_data_no_pred() {
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [70.4, 72.4]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 350], values: [90.0, 90.0]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.0, 51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100, 250], values: [50.4, 53.4]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeries {},
InfluxRpcPredicate::default(),
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_filter_data_exclusive_predicate() {
let predicate = Predicate::new()
// should not return the 350 row as predicate is
// range.start <= ts < range.end
.with_range(349, 350);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_inclusive_predicate() {
let predicate = Predicate::new()
// should return 350 row!
.with_range(350, 351);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [350], values: [90.0]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_exact_predicate() {
let predicate = Predicate::new()
// should return 250 rows!
.with_range(250, 251);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_tag_predicate() {
let predicate = Predicate::new()
// region = region
.with_expr(col("region").eq(col("region")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// expect both series to be returned
let expected_results = vec![
"Series tags={_field=user, _measurement=cpu, region=west}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
"Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
];
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_invalid_predicate() {
let v = ScalarValue::Binary(Some(vec![]));
let predicate = Predicate::new()
// region > <binary> (region is a tag(string) column, so this predicate is invalid)
.with_expr(col("region").gt(lit(v)));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "Dictionary(Int32, Utf8) > Binary' can't be evaluated because there isn't a common type to coerce the types to";
run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
}
#[tokio::test]
async fn test_read_filter_invalid_predicate_case() {
let v = ScalarValue::Binary(Some(vec![]));
let predicate = Predicate::new()
// https://github.com/influxdata/influxdb_iox/issues/3635
// model what happens when a field is treated like a tag
// CASE WHEN system" IS NULL THEN '' ELSE system END = binary;
.with_expr(make_empty_tag_ref_expr("system").eq(lit(v)));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "gRPC planner got error creating predicates: Error during planning: 'Utf8 = Binary' can't be evaluated because there isn't a common type to coerce the types to";
run_read_filter_error_case(TwoMeasurements {}, predicate, expected_error).await;
}
#[tokio::test]
async fn test_read_filter_unknown_column_in_predicate() {
let predicate = Predicate::new()
// mystery_region and bar are not real columns, so this predicate is
// invalid but IOx should be able to handle it (and produce no results)
.with_expr(
col("baz").eq(lit(4i32)).or(col("bar")
.eq(lit("baz"))
.and(col("mystery_region").gt(lit(5i32)))),
);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![];
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_field_as_tag() {
// Columns in the RPC predicate must be treated as tags:
// https://github.com/influxdata/idpe/issues/16238
let predicate = Predicate::new().with_expr(make_empty_tag_ref_expr("fld").eq(lit("200")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// fld exists in the table, but only as a field, not a tag, so no data should be returned.
let expected_results = vec![];
run_read_filter_test_case(StringFieldWithNumericValue {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_field_as_tag_coerce_number() {
// Same as above except for the predicate compares to an integer literal.
let predicate = Predicate::new().with_expr(make_empty_tag_ref_expr("fld").eq(lit(200)));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![];
run_read_filter_test_case(StringFieldWithNumericValue {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_filter() {
// filter out one row in h20
let predicate = Predicate::default()
.with_range(200, 300)
.with_expr(col("state").eq(lit("CA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200], values: [90.0]",
];
run_read_filter_test_case(
TwoMeasurementsMultiSeries {},
predicate,
expected_results.clone(),
)
.await;
// Same results via a != predicate.
let predicate = Predicate::default()
.with_range(200, 300)
.with_expr(col("state").not_eq(lit("MA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_filter_fields() {
// filter out one row in h20
let predicate = Predicate::default()
.with_field_columns(vec!["other_temp"])
.with_expr(col("state").eq(lit("CA"))); // state=CA
let predicate = InfluxRpcPredicate::new(None, predicate);
// Only expect other_temp in this location
let expected_results = vec![
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [350], values: [72.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_filter_measurement_pred() {
// use an expr on table name to pick just the last row from o2
let predicate = Predicate::default()
.with_range(200, 400)
.with_expr(col("_measurement").eq(lit("o2")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// Only expect other_temp in this location
let expected_results = vec![
"Series tags={_field=temp, _measurement=o2, state=CA}\n FloatPoints timestamps: [300], values: [79.0]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_refers_to_non_existent_column() {
let predicate = Predicate::default().with_expr(col("tag_not_in_h20").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![] as Vec<&str>;
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_no_columns() {
// predicate with no columns,
let predicate = Predicate::default().with_expr(lit("foo").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=user, _measurement=cpu, region=west}\n FloatPoints timestamps: [100, 150], values: [23.2, 21.0]",
"Series tags={_field=bytes, _measurement=disk, region=east}\n IntegerPoints timestamps: [200], values: [99]",
];
run_read_filter_test_case(TwoMeasurements {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_match() {
let predicate = Predicate::default()
.with_range(200, 300)
// will match CA state
.with_regex_match_expr("state", "C.*");
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200], values: [90.0]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_match_on_field() {
let predicate = Predicate::default().with_regex_match_expr("_field", "temp");
let predicate = InfluxRpcPredicate::new(None, predicate);
// Should see results for temp and other_temp (but not reading)
let expected_results = vec![
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
"Series tags={_field=temp, _measurement=o2, state=CA}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_using_regex_not_match() {
let predicate = Predicate::default()
.with_range(200, 300)
// will filter out any rows with a state that matches "CA"
.with_regex_not_match_expr("state", "C.*");
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [72.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [51.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsMultiSeries {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_regex_escape() {
let predicate = Predicate::default()
// Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url =~ /https\:\/\/influxdb\.com/`,
.with_regex_match_expr("url", r#"https\://influxdb\.com"#);
let predicate = InfluxRpcPredicate::new(None, predicate);
// expect one series with influxdb.com
let expected_results = vec![
"Series tags={_field=value, _measurement=status_code, url=https://influxdb.com}\n FloatPoints timestamps: [1527018816000000000], values: [418.0]",
];
run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_not_match_regex_escape() {
let predicate = Predicate::default()
// Came from InfluxQL like `SELECT value FROM db0.rp0.status_code WHERE url !~ /https\:\/\/influxdb\.com/`,
.with_regex_not_match_expr("url", r#"https\://influxdb\.com"#);
let predicate = InfluxRpcPredicate::new(None, predicate);
// expect one series with influxdb.com
let expected_results = vec![
"Series tags={_field=value, _measurement=status_code, url=http://www.example.com}\n FloatPoints timestamps: [1527018806000000000], values: [404.0]",
];
run_read_filter_test_case(MeasurementStatusCode {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_pred_unsupported_in_scan() {
test_helpers::maybe_start_logging();
// These predicates can't be pushed down into chunks, but they can
// be evaluated by the general purpose DataFusion plan
// (STATE = 'CA') OR (CITY = 'Boston')
let predicate = Predicate::default()
.with_expr(col("state").eq(lit("CA")).or(col("city").eq(lit("Boston"))));
let predicate = InfluxRpcPredicate::new(None, predicate);
// Note these results include data from both o2 and h2o
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200], values: [90.0]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100], values: [50.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [100], values: [50.4]",
];
run_read_filter_test_case(TwoMeasurementsMultiTagValue {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_data_plan_order() {
test_helpers::maybe_start_logging();
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [250], values: [70.3]",
"Series tags={_field=other, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [5.0]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.5]",
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA, zz_tag=A}\n FloatPoints timestamps: [1000], values: [70.4]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=A}\n FloatPoints timestamps: [800], values: [70.1]",
"Series tags={_field=temp, _measurement=h2o, city=Kingston, state=MA, zz_tag=B}\n FloatPoints timestamps: [100], values: [70.2]",
];
run_read_filter_test_case(
MeasurementsSortableTags {},
InfluxRpcPredicate::default(),
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_filter_filter_on_value() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default()
.with_expr(col("_value").eq(lit(1.77)))
.with_expr(col("_field").eq(lit("load4")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=load4, _measurement=system, host=host.local}\n FloatPoints timestamps: [1527018806000000000, 1527018826000000000], values: [1.77, 1.77]",
];
run_read_filter_test_case(MeasurementsForDefect2845 {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_on_field() {
test_helpers::maybe_start_logging();
// Predicate should pick 'temp' field from h2o
// (_field = 'temp')
let p1 = col("_field").eq(lit("temp"));
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
"Series tags={_field=temp, _measurement=o2, state=CA}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_on_not_field() {
test_helpers::maybe_start_logging();
// Predicate should pick up all fields other than 'temp' from h2o
// (_field != 'temp')
let p1 = col("_field").not_eq(lit("temp"));
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_field=moisture, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [100000], values: [43.0]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_field=reading, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [51.0]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_unsupported_field_predicate() {
test_helpers::maybe_start_logging();
// Can not evaluate the following predicate as it refers to both
// _field and the values in the "tag" column, so we can't figure
// out which columns ("_field" prediate) at planning time -- we
// have to do it at runtime somehow
//
// https://github.com/influxdata/influxdb_iox/issues/5310
// (_field != 'temp') OR (city = "Boston")
let p1 = col("_field")
.not_eq(lit("temp"))
.or(col("city").eq(lit("Boston")));
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_error = "Unsupported _field predicate";
run_read_filter_error_case(TwoMeasurementsManyFields {}, predicate, expected_error).await;
}
#[tokio::test]
async fn test_read_filter_on_field_single_measurement() {
test_helpers::maybe_start_logging();
// Predicate should pick 'temp' field from h2o
// (_field = 'temp' AND _measurement = 'h2o')
let p1 = col("_field")
.eq(lit("temp"))
.and(col("_measurement").eq(lit("h2o")));
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [50, 100000], values: [70.4, 70.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_multi_negation() {
// reproducer for https://github.com/influxdata/influxdb_iox/issues/4800
test_helpers::maybe_start_logging();
let host = make_empty_tag_ref_expr("host");
let p1 = host.clone().eq(lit("server01")).or(host.eq(lit("")));
let predicate = Predicate::default().with_expr(p1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_results = vec![
"Series tags={_field=color, _measurement=attributes}\n StringPoints timestamps: [8000], values: [\"blue\"]",
"Series tags={_field=value, _measurement=cpu_load_short, host=server01}\n FloatPoints timestamps: [1000], values: [27.99]",
"Series tags={_field=value, _measurement=cpu_load_short, host=server01, region=us-east}\n FloatPoints timestamps: [3000], values: [1234567.891011]",
"Series tags={_field=value, _measurement=cpu_load_short, host=server01, region=us-west}\n FloatPoints timestamps: [0, 4000], values: [0.64, 3e-6]",
"Series tags={_field=active, _measurement=status}\n BooleanPoints timestamps: [7000], values: [true]",
"Series tags={_field=in, _measurement=swap, host=server01, name=disk0}\n FloatPoints timestamps: [6000], values: [3.0]",
"Series tags={_field=out, _measurement=swap, host=server01, name=disk0}\n FloatPoints timestamps: [6000], values: [4.0]",
];
run_read_filter_test_case(EndToEndTest {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_on_field_multi_measurement() {
test_helpers::maybe_start_logging();
// Predicate should pick 'temp' field from h2o and 'other_temp' from o2
//
// (_field = 'other_temp' AND _measurement = 'h2o') OR (_field = 'temp' AND _measurement = 'o2')
let p1 = col("_field")
.eq(lit("other_temp"))
.and(col("_measurement").eq(lit("h2o")));
let p2 = col("_field")
.eq(lit("temp"))
.and(col("_measurement").eq(lit("o2")));
let predicate = Predicate::default().with_expr(p1.or(p2));
let predicate = InfluxRpcPredicate::new(None, predicate);
// SHOULD NOT contain temp from h2o
let expected_results = vec![
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=CA}\n FloatPoints timestamps: [350], values: [72.4]",
"Series tags={_field=other_temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [250], values: [70.4]",
"Series tags={_field=temp, _measurement=o2, state=CA}\n FloatPoints timestamps: [300], values: [79.0]",
"Series tags={_field=temp, _measurement=o2, city=Boston, state=MA}\n FloatPoints timestamps: [50], values: [53.4]",
];
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_with_periods() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().with_range(0, 1700000001000000000);
let predicate = InfluxRpcPredicate::new(None, predicate);
// Should return both series
let expected_results = vec![
"Series tags={_field=field.one, _measurement=measurement.one, tag.one=value, tag.two=other}\n FloatPoints timestamps: [1609459201000000001], values: [1.0]",
"Series tags={_field=field.two, _measurement=measurement.one, tag.one=value, tag.two=other}\n BooleanPoints timestamps: [1609459201000000001], values: [true]",
"Series tags={_field=field.one, _measurement=measurement.one, tag.one=value2, tag.two=other2}\n FloatPoints timestamps: [1609459201000000002], values: [1.0]",
"Series tags={_field=field.two, _measurement=measurement.one, tag.one=value2, tag.two=other2}\n BooleanPoints timestamps: [1609459201000000002], values: [false]",
];
run_read_filter_test_case(PeriodsInNames {}, predicate, expected_results).await;
}
#[tokio::test]
async fn test_read_filter_with_periods_predicates() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default()
.with_range(0, 1700000001000000000)
// tag.one = "value"
.with_expr("tag.one".as_expr().eq(lit("value")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// Should return both series
let expected_results = vec![
"Series tags={_field=field.one, _measurement=measurement.one, tag.one=value, tag.two=other}\n FloatPoints timestamps: [1609459201000000001], values: [1.0]",
"Series tags={_field=field.two, _measurement=measurement.one, tag.one=value, tag.two=other}\n BooleanPoints timestamps: [1609459201000000001], values: [true]",
];
run_read_filter_test_case(PeriodsInNames {}, predicate, expected_results).await;
}

View File

@ -14,7 +14,7 @@ use prost::Message;
pub struct GrpcRequestBuilder {
read_source: Option<generated_types::google::protobuf::Any>,
range: Option<TimestampRange>,
predicate: Option<Predicate>,
pub predicate: Option<Predicate>,
// for read_group requests
group: Option<Group>,
@ -126,6 +126,16 @@ impl GrpcRequestBuilder {
self.combine_predicate(Logical::And, node)
}
/// Add `_f!=field_name` to the predicate in the horrible gRPC structs
pub fn not_field_predicate(self, field_name: impl Into<String>) -> Self {
let node = comparison_expression_node(
tag_ref_node([255].to_vec()),
Comparison::NotEqual,
string_value_node(field_name),
);
self.combine_predicate(Logical::And, node)
}
/// Add `_m=measurement_name` to the predicate in the horrible gRPC structs
pub fn measurement_predicate(self, measurement_name: impl Into<String>) -> Self {
let node = comparison_expression_node(
@ -195,7 +205,7 @@ impl GrpcRequestBuilder {
/// Combine any existing predicate with the specified logical operator and node. If there is no
/// existing predicate, set the predicate to only the specified node.
fn combine_predicate(mut self, operator: Logical, new_node: Node) -> Self {
pub fn combine_predicate(mut self, operator: Logical, new_node: Node) -> Self {
let old_predicate = self.predicate.take();
let combined_predicate = match old_predicate {