test: Port read_window_aggregate query_tests to end-to-end tests (#6755)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Carol (Nichols || Goulding) 2023-01-31 07:15:50 -05:00 committed by GitHub
parent 7cadd38a3c
commit cff422b795
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 365 additions and 290 deletions

View File

@ -1,6 +1,8 @@
use super::dump::dump_data_frames;
use super::{dump::dump_data_frames, InfluxRpcTest};
use async_trait::async_trait;
use futures::{prelude::*, FutureExt};
use generated_types::aggregate::AggregateType;
use std::sync::Arc;
use test_helpers_end_to_end::{
maybe_skip_integration, GrpcRequestBuilder, MiniCluster, Step, StepTest, StepTestState,
};
@ -66,7 +68,7 @@ pub async fn read_window_aggregate_test_with_periods() {
.await
}
/// Sends the specified line protocol to a server, runs a read_grou
/// Sends the specified line protocol to a server, runs a read_window_aggregate
/// gRPC request, and compares it against expected frames
async fn do_read_window_aggregate_test(
input_lines: Vec<&str>,
@ -121,3 +123,262 @@ async fn do_read_window_aggregate_test(
.run()
.await
}
#[tokio::test]
async fn nanoseconds() {
Arc::new(ReadWindowAggregateTest {
setup_name: "MeasurementForWindowAggregate",
aggregate_type: AggregateType::Mean,
every: 200,
offset: 0,
request: GrpcRequestBuilder::new()
.or_tag_predicates([("city", "Boston"), ("city", "LA")].into_iter())
.timestamp_range(100, 450),
expected_results: vec![
// note the name of the field is "temp" even though it is the average
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [200, 400, 600], values: \"70,71.5,73\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 0",
"FloatPointsFrame, timestamps: [200, 400, 600], values: \"90,91.5,93\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn nanoseconds_measurement_predicate() {
Arc::new(ReadWindowAggregateTest {
setup_name: "MeasurementForWindowAggregate",
aggregate_type: AggregateType::Mean,
every: 200,
offset: 0,
request: GrpcRequestBuilder::new()
.not_measurement_predicate("other")
.tag_predicate("city", "LA")
.or_tag_predicates([("city", "Boston")].into_iter())
.timestamp_range(100, 450),
expected_results: vec![
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 0",
"FloatPointsFrame, timestamps: [200, 400, 600], values: \"70,71.5,73\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 0",
"FloatPointsFrame, timestamps: [200, 400, 600], values: \"90,91.5,93\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn nanoseconds_measurement_count() {
Arc::new(ReadWindowAggregateTest {
setup_name: "MeasurementForWindowAggregate",
aggregate_type: AggregateType::Count,
every: 200,
offset: 0,
request: GrpcRequestBuilder::new().timestamp_range(100, 450),
expected_results: vec![
// Expect that the type of `Count` is Integer
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Boston,state=MA, type: 1",
"IntegerPointsFrame, timestamps: [200, 400, 600], values: \"1,2,1\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=Cambridge,state=MA, type: 1",
"IntegerPointsFrame, timestamps: [200, 400, 600], values: \"1,2,1\"",
"SeriesFrame, tags: _field=temp,_measurement=h2o,city=LA,state=CA, type: 1",
"IntegerPointsFrame, timestamps: [200, 400, 600], values: \"1,2,1\"",
],
})
.run()
.await;
}
// See <https://github.com/influxdata/influxdb_iox/issues/2697>
#[tokio::test]
async fn min_defect_2697() {
Arc::new(ReadWindowAggregateTest {
setup_name: "MeasurementForDefect2697",
aggregate_type: AggregateType::Min,
every: 10,
offset: 0,
request: GrpcRequestBuilder::new()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1_609_459_201_000_000_001, 1_609_459_201_000_000_031),
expected_results: vec![
// Because the windowed aggregate is using a selector aggregate (one of MIN,
// MAX, FIRST, LAST) we need to run a plan that brings along the timestamps
// for the chosen aggregate in the window.
"SeriesFrame, tags: _field=bar,_measurement=mm,section=1a, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000011], values: \"5\"",
"SeriesFrame, tags: _field=foo,_measurement=mm,section=1a, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000001, 1609459201000000024], \
values: \"1,11.24\"",
"SeriesFrame, tags: _field=bar,_measurement=mm,section=2b, type: 0",
"FloatPointsFrame, \
timestamps: [1609459201000000009, 1609459201000000015, 1609459201000000022], \
values: \"4,6,1.2\"",
"SeriesFrame, tags: _field=foo,_measurement=mm,section=2b, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000002], values: \"2\"",
],
})
.run()
.await;
}
// See <https://github.com/influxdata/influxdb_iox/issues/2697>
#[tokio::test]
async fn sum_defect_2697() {
Arc::new(ReadWindowAggregateTest {
setup_name: "MeasurementForDefect2697",
aggregate_type: AggregateType::Sum,
every: 10,
offset: 0,
request: GrpcRequestBuilder::new()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1_609_459_201_000_000_001, 1_609_459_201_000_000_031),
expected_results: vec![
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAN).
// For each distinct series the window defines the `time` column
"SeriesFrame, tags: _field=bar,_measurement=mm,section=1a, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000020], values: \"5\"",
"SeriesFrame, tags: _field=foo,_measurement=mm,section=1a, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000010, 1609459201000000030], \
values: \"4,11.24\"",
"SeriesFrame, tags: _field=bar,_measurement=mm,section=2b, type: 0",
"FloatPointsFrame, \
timestamps: [1609459201000000010, 1609459201000000020, 1609459201000000030], \
values: \"4,6,1.2\"",
"SeriesFrame, tags: _field=foo,_measurement=mm,section=2b, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000010], values: \"2\"",
],
})
.run()
.await;
}
// See <https://github.com/influxdata/influxdb_iox/issues/2845>
//
// Adds coverage to window_aggregate plan for filtering on _field.
#[tokio::test]
async fn field_predicates() {
Arc::new(ReadWindowAggregateTest {
setup_name: "MeasurementForDefect2697",
aggregate_type: AggregateType::Sum,
every: 10,
offset: 0,
request: GrpcRequestBuilder::new()
.field_predicate("foo")
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.timestamp_range(1_609_459_201_000_000_001, 1_609_459_201_000_000_031),
expected_results: vec![
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAN).
// For each distinct series the window defines the `time` column
"SeriesFrame, tags: _field=foo,_measurement=mm,section=1a, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000010, 1609459201000000030], \
values: \"4,11.24\"",
"SeriesFrame, tags: _field=foo,_measurement=mm,section=2b, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000010], values: \"2\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn overflow() {
Arc::new(ReadWindowAggregateTest {
setup_name: "MeasurementForDefect2890",
aggregate_type: AggregateType::Max,
// Note the giant window
every: i64::MAX,
offset: 0,
request: GrpcRequestBuilder::new()
.timestamp_range(1_609_459_201_000_000_001, 1_609_459_201_000_000_024),
expected_results: vec![
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAN).
// For each distinct series the window defines the `time` column
"SeriesFrame, tags: _field=bar,_measurement=mm, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000015], values: \"6\"",
"SeriesFrame, tags: _field=foo,_measurement=mm, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000005], values: \"3\"",
],
})
.run()
.await;
}
#[tokio::test]
async fn periods() {
Arc::new(ReadWindowAggregateTest {
setup_name: "PeriodsInNames",
aggregate_type: AggregateType::Max,
every: 500_000_000_000,
offset: 0,
request: GrpcRequestBuilder::new().timestamp_range(0, 1_700_000_001_000_000_000),
expected_results: vec![
"SeriesFrame, tags: \
_field=field.one,_measurement=measurement.one,tag.one=value,tag.two=other, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000001], values: \"1\"",
"SeriesFrame, tags: \
_field=field.two,_measurement=measurement.one,tag.one=value,tag.two=other, type: 3",
"BooleanPointsFrame, timestamps: [1609459201000000001], values: true",
"SeriesFrame, tags: \
_field=field.one,_measurement=measurement.one,tag.one=value2,tag.two=other2, type: 0",
"FloatPointsFrame, timestamps: [1609459201000000002], values: \"1\"",
"SeriesFrame, tags: \
_field=field.two,_measurement=measurement.one,tag.one=value2,tag.two=other2, type: 3",
"BooleanPointsFrame, timestamps: [1609459201000000002], values: false",
],
})
.run()
.await;
}
#[derive(Debug)]
struct ReadWindowAggregateTest {
setup_name: &'static str,
aggregate_type: AggregateType,
every: i64,
offset: i64,
request: GrpcRequestBuilder,
expected_results: Vec<&'static str>,
}
#[async_trait]
impl InfluxRpcTest for ReadWindowAggregateTest {
fn setup_name(&self) -> &'static str {
self.setup_name
}
async fn request_and_assert(&self, cluster: &MiniCluster) {
let mut storage_client = cluster.querier_storage_client();
let read_window_aggregate_request = self
.request
.clone()
.source(cluster)
.aggregate_type(self.aggregate_type)
.window_every(self.every)
.offset(self.offset)
.build_read_window_aggregate();
let read_window_aggregate_response = storage_client
.read_window_aggregate(read_window_aggregate_request)
.await
.expect("successful read_window_aggregate call");
let responses: Vec<_> = read_window_aggregate_response
.into_inner()
.try_collect()
.await
.unwrap();
let frames: Vec<_> = responses
.into_iter()
.flat_map(|r| r.frames)
.flat_map(|f| f.data)
.collect();
let results = dump_data_frames(&frames);
assert_eq!(results, self.expected_results);
}
}

View File

@ -931,5 +931,97 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
},
],
),
(
"MeasurementForWindowAggregate",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Boston temp=70.0 100",
"h2o,state=MA,city=Boston temp=71.0 200",
"h2o,state=MA,city=Boston temp=72.0 300",
"h2o,state=MA,city=Boston temp=73.0 400",
"h2o,state=MA,city=Boston temp=74.0 500",
"h2o,state=MA,city=Cambridge temp=80.0 100",
"h2o,state=MA,city=Cambridge temp=81.0 200",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"h2o,state=MA,city=Cambridge temp=82.0 300",
"h2o,state=MA,city=Cambridge temp=83.0 400",
"h2o,state=MA,city=Cambridge temp=84.0 500",
"h2o,state=CA,city=LA temp=90.0 100",
"h2o,state=CA,city=LA temp=91.0 200",
"h2o,state=CA,city=LA temp=92.0 300",
"h2o,state=CA,city=LA temp=93.0 400",
"h2o,state=CA,city=LA temp=94.0 500",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
// Test data to validate fix for
// <https://github.com/influxdata/influxdb_iox/issues/2697>
"MeasurementForDefect2697",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"mm,section=1a bar=5.0 1609459201000000011",
"mm,section=1a bar=0.28 1609459201000000031",
"mm,section=2b bar=4.0 1609459201000000009",
"mm,section=2b bar=6.0 1609459201000000015",
"mm,section=2b bar=1.2 1609459201000000022",
"mm,section=1a foo=1.0 1609459201000000001",
"mm,section=1a foo=3.0 1609459201000000005",
"mm,section=1a foo=11.24 1609459201000000024",
"mm,section=2b foo=2.0 1609459201000000002",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
// Test data to validate fix for
// <https://github.com/influxdata/influxdb_iox/issues/2890>
"MeasurementForDefect2890",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"mm foo=2.0 1609459201000000001",
"mm foo=2.0 1609459201000000002",
"mm foo=3.0 1609459201000000005",
"mm foo=11.24 1609459201000000024",
"mm bar=4.0 1609459201000000009",
"mm bar=5.0 1609459201000000011",
"mm bar=6.0 1609459201000000015",
"mm bar=1.2 1609459201000000022",
"mm bar=2.8 1609459201000000031",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
])
});

View File

@ -1,4 +1,3 @@
pub mod read_window_aggregate;
pub mod table_names;
pub mod tag_keys;
pub mod tag_values;

View File

@ -1,287 +0,0 @@
//! Tests for the Influx gRPC queries
use crate::{influxrpc::util::run_series_set_plan, scenarios::*};
use datafusion::prelude::*;
use iox_query::{frontend::influxrpc::InfluxRpcPlanner, Aggregate, WindowDuration};
use predicate::rpc_predicate::InfluxRpcPredicate;
use predicate::Predicate;
/// runs read_window_aggregate(predicate) and compares it to the expected
/// output
async fn run_read_window_aggregate_test_case<D>(
db_setup: D,
predicate: InfluxRpcPredicate,
agg: Aggregate,
every: WindowDuration,
offset: WindowDuration,
expected_results: Vec<&str>,
) where
D: DbSetup,
{
test_helpers::maybe_start_logging();
for scenario in db_setup.make().await {
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.read_window_aggregate(
db.as_query_namespace_arc(),
predicate.clone(),
agg,
every,
offset,
)
.await
.expect("built plan successfully");
let string_results = run_series_set_plan(&ctx, plan).await;
assert_eq!(
expected_results, string_results,
"Error in scenario '{}'\n\nexpected:\n{:#?}\n\nactual:\n{:#?}\n",
scenario_name, expected_results, string_results
);
}
}
#[tokio::test]
async fn test_read_window_aggregate_nanoseconds() {
let predicate = Predicate::default()
// city=Boston or city=LA
.with_expr(col("city").eq(lit("Boston")).or(col("city").eq(lit("LA"))))
.with_range(100, 450);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Mean;
let every = WindowDuration::from_nanoseconds(200);
let offset = WindowDuration::from_nanoseconds(0);
// note the name of the field is "temp" even though it is the average
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [200, 400, 600], values: [70.0, 71.5, 73.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 400, 600], values: [90.0, 91.5, 93.0]",
];
run_read_window_aggregate_test_case(
MeasurementForWindowAggregate {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_window_aggregate_nanoseconds_measurement_pred() {
let predicate = Predicate::default()
// city=Cambridge OR (_measurement != 'other' AND city = LA)
.with_expr(
col("city").eq(lit("Boston")).or(col("_measurement")
.not_eq(lit("other"))
.and(col("city").eq(lit("LA")))),
)
.with_range(100, 450);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Mean;
let every = WindowDuration::from_nanoseconds(200);
let offset = WindowDuration::from_nanoseconds(0);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n FloatPoints timestamps: [200, 400, 600], values: [70.0, 71.5, 73.0]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n FloatPoints timestamps: [200, 400, 600], values: [90.0, 91.5, 93.0]",
];
run_read_window_aggregate_test_case(
MeasurementForWindowAggregate {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_window_aggregate_nanoseconds_measurement_count() {
// Expect that the type of `Count` is Integer
let predicate = Predicate::default().with_range(100, 450);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Count;
let every = WindowDuration::from_nanoseconds(200);
let offset = WindowDuration::from_nanoseconds(0);
let expected_results = vec![
"Series tags={_field=temp, _measurement=h2o, city=Boston, state=MA}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
"Series tags={_field=temp, _measurement=h2o, city=Cambridge, state=MA}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
"Series tags={_field=temp, _measurement=h2o, city=LA, state=CA}\n IntegerPoints timestamps: [200, 400, 600], values: [1, 2, 1]",
];
run_read_window_aggregate_test_case(
MeasurementForWindowAggregate {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
// See https://github.com/influxdata/influxdb_iox/issues/2697
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_min_defect_2697() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.with_range(1609459201000000001, 1609459201000000031);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Min;
let every = WindowDuration::from_nanoseconds(10);
let offset = WindowDuration::from_nanoseconds(0);
// Because the windowed aggregate is using a selector aggregate (one of MIN,
// MAX, FIRST, LAST) we need to run a plan that brings along the timestamps
// for the chosen aggregate in the window.
let expected_results = vec![
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000011], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000001, 1609459201000000024], values: [1.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000009, 1609459201000000015, 1609459201000000022], values: [4.0, 6.0, 1.2]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000002], values: [2.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697 {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
// See https://github.com/influxdata/influxdb_iox/issues/2697
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_sum_defect_2697() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.with_range(1609459201000000001, 1609459201000000031);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
let every = WindowDuration::from_nanoseconds(10);
let offset = WindowDuration::from_nanoseconds(0);
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD).
// For each distinct series the window defines the `time` column
let expected_results = vec![
"Series tags={_field=bar, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000020], values: [5.0]",
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_field=bar, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000020, 1609459201000000030], values: [4.0, 6.0, 1.2]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697 {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
// See issue: https://github.com/influxdata/influxdb_iox/issues/2845
//
// Adds coverage to window_aggregate plan for filtering on _field.
#[tokio::test]
async fn test_grouped_series_set_plan_group_aggregate_filter_on_field() {
let predicate = Predicate::default()
// time >= '2021-01-01T00:00:01.000000001Z' AND time <= '2021-01-01T00:00:01.000000031Z'
.with_range(1609459201000000001, 1609459201000000031)
.with_expr(col("_field").eq(lit("foo")));
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Sum;
let every = WindowDuration::from_nanoseconds(10);
let offset = WindowDuration::from_nanoseconds(0);
// The windowed aggregate is using a non-selector aggregate (SUM, COUNT, MEAD).
// For each distinct series the window defines the `time` column
let expected_results = vec![
"Series tags={_field=foo, _measurement=mm, section=1a}\n FloatPoints timestamps: [1609459201000000010, 1609459201000000030], values: [4.0, 11.24]",
"Series tags={_field=foo, _measurement=mm, section=2b}\n FloatPoints timestamps: [1609459201000000010], values: [2.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2697 {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_window_aggregate_overflow() {
let predicate = Predicate::default().with_range(1609459201000000001, 1609459201000000024);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Max;
// Note the giant window (every=9223372036854775807)
let every = WindowDuration::from_nanoseconds(i64::MAX);
let offset = WindowDuration::from_nanoseconds(0);
let expected_results = vec![
"Series tags={_field=bar, _measurement=mm}\n FloatPoints timestamps: [1609459201000000015], values: [6.0]",
"Series tags={_field=foo, _measurement=mm}\n FloatPoints timestamps: [1609459201000000005], values: [3.0]",
];
run_read_window_aggregate_test_case(
MeasurementForDefect2890 {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}
#[tokio::test]
async fn test_read_window_aggregate_with_periods() {
let predicate = Predicate::default().with_range(0, 1700000001000000000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let agg = Aggregate::Max;
let every = WindowDuration::from_nanoseconds(500000000000);
let offset = WindowDuration::from_nanoseconds(0);
let expected_results = vec![
"Series tags={_field=field.one, _measurement=measurement.one, tag.one=value, tag.two=other}\n FloatPoints timestamps: [1609459201000000001], values: [1.0]",
"Series tags={_field=field.two, _measurement=measurement.one, tag.one=value, tag.two=other}\n BooleanPoints timestamps: [1609459201000000001], values: [true]",
"Series tags={_field=field.one, _measurement=measurement.one, tag.one=value2, tag.two=other2}\n FloatPoints timestamps: [1609459201000000002], values: [1.0]",
"Series tags={_field=field.two, _measurement=measurement.one, tag.one=value2, tag.two=other2}\n BooleanPoints timestamps: [1609459201000000002], values: [false]",
];
run_read_window_aggregate_test_case(
PeriodsInNames {},
predicate,
agg,
every,
offset,
expected_results,
)
.await;
}

View File

@ -215,6 +215,16 @@ impl GrpcRequestBuilder {
self.combine_predicate(Logical::And, node)
}
/// Add `_m!=measurement_name` to the predicate in the horrible gRPC structs
pub fn not_measurement_predicate(self, measurement_name: impl Into<String>) -> Self {
let node = comparison_expression_node(
tag_ref_node([00].to_vec()),
Comparison::NotEqual,
string_value_node(measurement_name),
);
self.combine_predicate(Logical::And, node)
}
/// Add `tag_name ~= /pattern/` to the predicate
pub fn regex_match_predicate(
self,