test: Port all field columns query_tests to end-to-end tests (#6707)

* test: Port a test that's not actually supported through the full gRPC API

* test: Port remaining field column/measurement fields tests

* test: Remove unsupported measurement predicate and clarify purposes of tests

Andrew confirmed that the only way to invoke a Measurement Fields
request is with a measurement/table name specified: <0249b5018e/generated_types/protos/influxdata/platform/storage/service.proto (L43)>

so testing with a `_measurement` predicate is not valid.

I thought this test would become redundant with some other tests, but
they're actually still different enough; I took this opportunity to
better highlight the differences in the test names.

* refactor: Move all measurement fields tests to their own file

* test: Remove field columns tests that are now covered in end-to-end measurement fields tests
pull/24376/head
Carol (Nichols || Goulding) 2023-01-25 14:49:29 -05:00 committed by GitHub
parent 0249b5018e
commit 57b5b639d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 344 additions and 395 deletions

View File

@ -1,15 +1,13 @@
use super::{run_data_test, run_no_data_test, InfluxRpcTest};
use async_trait::async_trait;
use futures::{prelude::*, FutureExt};
use generated_types::{
google::protobuf::Empty,
measurement_fields_response::{FieldType, MessageField},
offsets_response::PartitionOffsetResponse,
OffsetsResponse,
google::protobuf::Empty, offsets_response::PartitionOffsetResponse, OffsetsResponse,
};
use influxdb_storage_client::tag_key_bytes_to_strings;
use std::sync::Arc;
use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, MiniCluster, StepTestState};
use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, StepTestState};
mod measurement_fields;
#[tokio::test]
/// Validate that capabilities storage endpoint is hooked up
@ -243,121 +241,3 @@ async fn measurement_tag_values() {
)
.await
}
#[tokio::test]
async fn measurement_fields() {
let generator = Arc::new(DataGenerator::new());
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
let measurement_fields_request = GrpcRequestBuilder::new()
.source(state.cluster())
.timestamp_range(generator.min_time(), generator.max_time())
.tag_predicate("host", "server01")
.build_measurement_fields("cpu_load_short");
let ns_since_epoch = generator.ns_since_epoch();
let measurement_fields_response = storage_client
.measurement_fields(measurement_fields_request)
.await
.unwrap();
let responses: Vec<_> = measurement_fields_response
.into_inner()
.try_collect()
.await
.unwrap();
let fields = &responses[0].fields;
assert_eq!(fields.len(), 1);
let field = &fields[0];
assert_eq!(field.key, "value");
assert_eq!(field.r#type(), FieldType::Float);
assert_eq!(field.timestamp, ns_since_epoch + 4);
}
.boxed()
}),
)
.await
}
#[tokio::test]
async fn field_columns_nonexistent_table_with_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "NoSuchTable",
request: GrpcRequestBuilder::new().tag_predicate("state", "MA"),
expected_fields: vec![],
})
.run()
.await;
}
#[tokio::test]
async fn field_columns_existing_table_with_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "h2o",
request: GrpcRequestBuilder::new().tag_predicate("state", "MA"),
expected_fields: vec![
MessageField {
key: "moisture".into(),
r#type: FieldType::Float.into(),
timestamp: 100000,
},
MessageField {
key: "other_temp".into(),
r#type: FieldType::Float.into(),
timestamp: 250,
},
MessageField {
key: "temp".into(),
r#type: FieldType::Float.into(),
timestamp: 100000,
},
],
})
.run()
.await;
}
#[derive(Debug)]
struct MeasurementFieldsTest {
setup_name: &'static str,
table_name: &'static str,
request: GrpcRequestBuilder,
expected_fields: Vec<MessageField>,
}
#[async_trait]
impl InfluxRpcTest for MeasurementFieldsTest {
fn setup_name(&self) -> &'static str {
self.setup_name
}
async fn request_and_assert(&self, cluster: &MiniCluster) {
let mut storage_client = cluster.querier_storage_client();
let measurement_fields_request = self
.request
.clone()
.source(cluster)
.build_measurement_fields(self.table_name);
let measurement_fields_response = storage_client
.measurement_fields(measurement_fields_request)
.await
.unwrap();
let responses: Vec<_> = measurement_fields_response
.into_inner()
.try_collect()
.await
.unwrap();
assert_eq!(responses[0].fields, self.expected_fields);
}
}

View File

@ -0,0 +1,303 @@
use super::{run_data_test, InfluxRpcTest};
use async_trait::async_trait;
use data_types::{MAX_NANO_TIME, MIN_NANO_TIME};
use futures::{prelude::*, FutureExt};
use generated_types::measurement_fields_response::{FieldType, MessageField};
use std::sync::Arc;
use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, MiniCluster, StepTestState};
#[tokio::test]
async fn measurement_fields() {
let generator = Arc::new(DataGenerator::new());
run_data_test(
Arc::clone(&generator),
Box::new(move |state: &mut StepTestState| {
let generator = Arc::clone(&generator);
async move {
let mut storage_client = state.cluster().querier_storage_client();
let measurement_fields_request = GrpcRequestBuilder::new()
.source(state.cluster())
.timestamp_range(generator.min_time(), generator.max_time())
.tag_predicate("host", "server01")
.build_measurement_fields("cpu_load_short");
let ns_since_epoch = generator.ns_since_epoch();
let measurement_fields_response = storage_client
.measurement_fields(measurement_fields_request)
.await
.unwrap();
let responses: Vec<_> = measurement_fields_response
.into_inner()
.try_collect()
.await
.unwrap();
let fields = &responses[0].fields;
assert_eq!(fields.len(), 1);
let field = &fields[0];
assert_eq!(field.key, "value");
assert_eq!(field.r#type(), FieldType::Float);
assert_eq!(field.timestamp, ns_since_epoch + 4);
}
.boxed()
}),
)
.await
}
#[tokio::test]
async fn field_columns_nonexistent_table_with_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "NoSuchTable",
request: GrpcRequestBuilder::new().tag_predicate("state", "MA"),
expected_fields: vec![],
})
.run()
.await;
}
#[tokio::test]
async fn field_columns_existing_table_with_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "h2o",
request: GrpcRequestBuilder::new().tag_predicate("state", "MA"),
expected_fields: vec![
MessageField {
key: "moisture".into(),
r#type: FieldType::Float.into(),
timestamp: 100_000,
},
MessageField {
key: "other_temp".into(),
r#type: FieldType::Float.into(),
timestamp: 250,
},
MessageField {
key: "temp".into(),
r#type: FieldType::Float.into(),
timestamp: 100_000,
},
],
})
.run()
.await;
}
#[tokio::test]
async fn field_columns_timestamp_range_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "h2o",
request: GrpcRequestBuilder::new().timestamp_range(i64::MIN, i64::MAX - 2),
expected_fields: vec![
MessageField {
key: "moisture".into(),
r#type: FieldType::Float.into(),
timestamp: 100_000,
},
MessageField {
key: "other_temp".into(),
r#type: FieldType::Float.into(),
timestamp: 350,
},
MessageField {
key: "temp".into(),
r#type: FieldType::Float.into(),
timestamp: 100_000,
},
],
})
.run()
.await;
}
#[tokio::test]
async fn field_columns_no_predicate_full_time_range_returns_zero_timestamp() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "h2o",
request: GrpcRequestBuilder::new(),
expected_fields: vec![
MessageField {
key: "moisture".into(),
r#type: FieldType::Float.into(),
timestamp: 0,
},
MessageField {
key: "other_temp".into(),
r#type: FieldType::Float.into(),
timestamp: 0,
},
MessageField {
key: "temp".into(),
r#type: FieldType::Float.into(),
timestamp: 0,
},
],
})
.run()
.await;
}
#[tokio::test]
async fn field_columns_tag_and_timestamp_range_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "h2o",
request: GrpcRequestBuilder::new()
.tag_predicate("state", "MA")
.timestamp_range(200, 300),
expected_fields: vec![MessageField {
key: "other_temp".into(),
r#type: FieldType::Float.into(),
timestamp: 250,
}],
})
.run()
.await;
}
#[tokio::test]
async fn field_names() {
Arc::new(MeasurementFieldsTest {
setup_name: "OneMeasurementManyFields",
table_name: "h2o",
request: GrpcRequestBuilder::new().timestamp_range(0, 2_000),
expected_fields: vec![
MessageField {
key: "field1".into(),
r#type: FieldType::Float.into(),
timestamp: 1_000,
},
MessageField {
key: "field2".into(),
r#type: FieldType::String.into(),
timestamp: 100,
},
MessageField {
key: "field3".into(),
r#type: FieldType::Float.into(),
timestamp: 100,
},
MessageField {
key: "field4".into(),
r#type: FieldType::Boolean.into(),
timestamp: 1_000,
},
],
})
.run()
.await;
}
#[tokio::test]
async fn list_field_columns_all_time() {
Arc::new(MeasurementFieldsTest {
setup_name: "MeasurementWithMaxTime",
table_name: "cpu",
request: GrpcRequestBuilder::new().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME),
expected_fields: vec![MessageField {
key: "value".into(),
r#type: FieldType::Float.into(),
timestamp: 0,
}],
})
.run()
.await;
}
#[tokio::test]
async fn list_field_columns_max_time_included() {
Arc::new(MeasurementFieldsTest {
setup_name: "MeasurementWithMaxTime",
table_name: "cpu",
// if the range started at i64:MIN, we would hit the optimized case for 'all time'
// and get the 'wrong' timestamp, since in the optimized case we don't check what timestamps
// exist
request: GrpcRequestBuilder::new().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME + 1),
expected_fields: vec![MessageField {
key: "value".into(),
r#type: FieldType::Float.into(),
timestamp: MAX_NANO_TIME,
}],
})
.run()
.await;
}
#[tokio::test]
async fn list_field_columns_max_time_excluded() {
Arc::new(MeasurementFieldsTest {
setup_name: "MeasurementWithMaxTime",
table_name: "cpu",
// one less than max timestamp
request: GrpcRequestBuilder::new().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME),
expected_fields: vec![],
})
.run()
.await;
}
#[tokio::test]
async fn list_field_columns_with_periods() {
Arc::new(MeasurementFieldsTest {
setup_name: "PeriodsInNames",
table_name: "measurement.one",
request: GrpcRequestBuilder::new().timestamp_range(0, 1_700_000_001_000_000_000),
expected_fields: vec![
MessageField {
key: "field.one".into(),
r#type: FieldType::Float.into(),
timestamp: 1_609_459_201_000_000_002,
},
MessageField {
key: "field.two".into(),
r#type: FieldType::Boolean.into(),
timestamp: 1_609_459_201_000_000_002,
},
],
})
.run()
.await;
}
#[derive(Debug)]
struct MeasurementFieldsTest {
setup_name: &'static str,
table_name: &'static str,
request: GrpcRequestBuilder,
expected_fields: Vec<MessageField>,
}
#[async_trait]
impl InfluxRpcTest for MeasurementFieldsTest {
fn setup_name(&self) -> &'static str {
self.setup_name
}
async fn request_and_assert(&self, cluster: &MiniCluster) {
let mut storage_client = cluster.querier_storage_client();
let measurement_fields_request = self
.request
.clone()
.source(cluster)
.build_measurement_fields(self.table_name);
let measurement_fields_response = storage_client
.measurement_fields(measurement_fields_request)
.await
.unwrap();
let responses: Vec<_> = measurement_fields_response
.into_inner()
.try_collect()
.await
.unwrap();
assert_eq!(responses[0].fields, self.expected_fields);
}
}

View File

@ -246,6 +246,26 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
})
.collect::<Vec<_>>(),
),
(
"OneMeasurementManyFields",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
[
"h2o,tag1=foo,tag2=bar field1=70.6,field3=2 100",
"h2o,tag1=foo,tag2=bar field1=70.4,field2=\"ss\" 100",
"h2o,tag1=foo,tag2=bar field1=70.5,field2=\"ss\" 100",
"h2o,tag1=foo,tag2=bar field1=70.6,field4=true 1000",
"h2o,tag1=foo,tag2=bar field1=70.3,field5=false 3000",
]
.join("\n"),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"TwoMeasurementsManyFields",
vec![
@ -435,6 +455,23 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
Step::WriteLineProtocol("h2o,state=CA,city=Andover temp=67.3 500".into()),
],
),
(
"MeasurementWithMaxTime",
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(format!(
"cpu,host=server01 value=100 {}",
// This is the maximum timestamp that can be represented in the InfluxDB data
// model:
// <https://github.com/influxdata/influxdb/blob/540bb66e1381a48a6d1ede4fc3e49c75a7d9f4af/models/time.go#L12-L34>
i64::MAX - 1, // 9223372036854775806
)),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
],
),
(
"OneMeasurementRealisticTimes",
vec![

View File

@ -1,4 +1,3 @@
pub mod field_columns;
pub mod read_filter;
pub mod read_group;
pub mod read_window_aggregate;

View File

@ -1,270 +0,0 @@
use crate::scenarios::*;
use arrow::datatypes::DataType;
use data_types::{MAX_NANO_TIME, MIN_NANO_TIME};
use datafusion::prelude::{col, lit};
use iox_query::{
exec::fieldlist::{Field, FieldList},
frontend::influxrpc::InfluxRpcPlanner,
};
use predicate::{rpc_predicate::InfluxRpcPredicate, Predicate};
/// Creates and loads several database scenarios using the db_setup
/// function.
///
/// runs field_column_names(predicate) and compares it to the expected
/// output
async fn run_field_columns_test_case<D>(
db_setup: D,
predicate: InfluxRpcPredicate,
expected_fields: FieldList,
) where
D: DbSetup,
{
test_helpers::maybe_start_logging();
for scenario in db_setup.make().await {
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("Predicate: '{:#?}'", predicate);
let ctx = db.new_query_context(None);
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.field_columns(db.as_query_namespace_arc(), predicate.clone())
.await
.expect("built plan successfully");
let fields = ctx
.to_field_list(plan)
.await
.expect("converted plan to strings successfully");
assert_eq!(
fields, expected_fields,
"Error in scenario '{}'\n\nexpected:\n{:#?}\nactual:\n{:#?}",
scenario_name, expected_fields, fields
);
}
}
#[tokio::test]
async fn test_field_columns_no_predicate() {
let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new_table("NoSuchTable", predicate);
let expected_fields = FieldList::default();
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn test_field_columns_with_pred() {
// get only fields from h20 (but both chunks)
let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new_table("h2o", predicate);
let expected_fields = FieldList {
fields: vec![
Field {
name: "moisture".into(),
data_type: DataType::Float64,
last_timestamp: 100000,
},
Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 250,
},
Field {
name: "temp".into(),
data_type: DataType::Float64,
last_timestamp: 100000,
},
],
};
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn test_field_columns_measurement_pred() {
// get only fields from h2o using a _measurement predicate
let predicate = Predicate::default()
.with_expr(col("_measurement").eq(lit("h2o")))
.with_range(i64::MIN, i64::MAX - 2);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![
Field {
name: "moisture".into(),
data_type: DataType::Float64,
last_timestamp: 100000,
},
Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 350,
},
Field {
name: "temp".into(),
data_type: DataType::Float64,
last_timestamp: 100000,
},
],
};
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn test_field_columns_measurement_pred_all_time() {
// get only fields from h2o using a _measurement predicate
let predicate = Predicate::default().with_expr(col("_measurement").eq(lit("h2o")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// optimized all-time case returns zero last_timestamp
let expected_fields = FieldList {
fields: vec![
Field {
name: "moisture".into(),
data_type: DataType::Float64,
last_timestamp: 0,
},
Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 0,
},
Field {
name: "temp".into(),
data_type: DataType::Float64,
last_timestamp: 0,
},
],
};
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn test_field_columns_with_ts_pred() {
let predicate = Predicate::default()
.with_range(200, 300)
.with_expr(col("state").eq(lit("MA"))); // state=MA
let predicate = InfluxRpcPredicate::new_table("h2o", predicate);
let expected_fields = FieldList {
fields: vec![Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 250,
}],
};
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn test_field_name_plan() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().with_range(0, 2000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![
Field {
name: "field1".into(),
data_type: DataType::Float64,
last_timestamp: 1000,
},
Field {
name: "field2".into(),
data_type: DataType::Utf8,
last_timestamp: 100,
},
Field {
name: "field3".into(),
data_type: DataType::Float64,
last_timestamp: 100,
},
Field {
name: "field4".into(),
data_type: DataType::Boolean,
last_timestamp: 1000,
},
],
};
run_field_columns_test_case(OneMeasurementManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn list_field_columns_all_time() {
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![Field {
name: "value".into(),
data_type: DataType::Float64,
last_timestamp: 0, // we hit the optimized case that ignores timestamps
}],
};
run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn list_field_columns_max_time_included() {
// if the range started at i64:MIN, we would hit the optimized case for 'all time'
// and get the 'wrong' timestamp, since in the optimized case we don't check what timestamps
// exist
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME + 1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![Field {
name: "value".into(),
data_type: DataType::Float64,
last_timestamp: MAX_NANO_TIME,
}],
};
run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn list_field_columns_max_time_excluded() {
let predicate = Predicate::default()
// one less than max timestamp
.with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList { fields: vec![] };
run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn list_field_columns_with_periods() {
let predicate = Predicate::default().with_range(0, 1700000001000000000);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![
Field {
name: "field.one".into(),
data_type: DataType::Float64,
last_timestamp: 1609459201000000002,
},
Field {
name: "field.two".into(),
data_type: DataType::Boolean,
last_timestamp: 1609459201000000002,
},
],
};
run_field_columns_test_case(PeriodsInNames {}, predicate, expected_fields).await;
}