From 57b5b639d6f1467d15b19c961e6898cc17a0706d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" <193874+carols10cents@users.noreply.github.com> Date: Wed, 25 Jan 2023 14:49:29 -0500 Subject: [PATCH] test: Port all field columns query_tests to end-to-end tests (#6707) * test: Port a test that's not actually supported through the full gRPC API * test: Port remaining field column/measurement fields tests * test: Remove unsupported measurement predicate and clarify purposes of tests Andrew confirmed that the only way to invoke a Measurement Fields request is with a measurement/table name specified: so testing with a `_measurement` predicate is not valid. I thought this test would become redundant with some other tests, but they're actually still different enough; I took this opportunity to better highlight the differences in the test names. * refactor: Move all measurement fields tests to their own file * test: Remove field columns tests that are now covered in end-to-end measurement fields tests --- .../querier/influxrpc/metadata.rs | 128 +------- .../influxrpc/metadata/measurement_fields.rs | 303 ++++++++++++++++++ influxdb_iox/tests/query_tests2/setups.rs | 37 +++ query_tests/src/influxrpc.rs | 1 - query_tests/src/influxrpc/field_columns.rs | 270 ---------------- 5 files changed, 344 insertions(+), 395 deletions(-) create mode 100644 influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata/measurement_fields.rs delete mode 100644 query_tests/src/influxrpc/field_columns.rs diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs index bc510de9ea..2f26576411 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata.rs @@ -1,15 +1,13 @@ use super::{run_data_test, run_no_data_test, InfluxRpcTest}; -use async_trait::async_trait; use futures::{prelude::*, FutureExt}; use generated_types::{ - google::protobuf::Empty, - measurement_fields_response::{FieldType, MessageField}, - offsets_response::PartitionOffsetResponse, - OffsetsResponse, + google::protobuf::Empty, offsets_response::PartitionOffsetResponse, OffsetsResponse, }; use influxdb_storage_client::tag_key_bytes_to_strings; use std::sync::Arc; -use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, MiniCluster, StepTestState}; +use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, StepTestState}; + +mod measurement_fields; #[tokio::test] /// Validate that capabilities storage endpoint is hooked up @@ -243,121 +241,3 @@ async fn measurement_tag_values() { ) .await } - -#[tokio::test] -async fn measurement_fields() { - let generator = Arc::new(DataGenerator::new()); - run_data_test( - Arc::clone(&generator), - Box::new(move |state: &mut StepTestState| { - let generator = Arc::clone(&generator); - async move { - let mut storage_client = state.cluster().querier_storage_client(); - - let measurement_fields_request = GrpcRequestBuilder::new() - .source(state.cluster()) - .timestamp_range(generator.min_time(), generator.max_time()) - .tag_predicate("host", "server01") - .build_measurement_fields("cpu_load_short"); - - let ns_since_epoch = generator.ns_since_epoch(); - let measurement_fields_response = storage_client - .measurement_fields(measurement_fields_request) - .await - .unwrap(); - let responses: Vec<_> = measurement_fields_response - .into_inner() - .try_collect() - .await - .unwrap(); - - let fields = &responses[0].fields; - assert_eq!(fields.len(), 1); - - let field = &fields[0]; - assert_eq!(field.key, "value"); - assert_eq!(field.r#type(), FieldType::Float); - assert_eq!(field.timestamp, ns_since_epoch + 4); - } - .boxed() - }), - ) - .await -} - -#[tokio::test] -async fn field_columns_nonexistent_table_with_predicate() { - Arc::new(MeasurementFieldsTest { - setup_name: "TwoMeasurementsManyFields", - table_name: "NoSuchTable", - request: GrpcRequestBuilder::new().tag_predicate("state", "MA"), - expected_fields: vec![], - }) - .run() - .await; -} - -#[tokio::test] -async fn field_columns_existing_table_with_predicate() { - Arc::new(MeasurementFieldsTest { - setup_name: "TwoMeasurementsManyFields", - table_name: "h2o", - request: GrpcRequestBuilder::new().tag_predicate("state", "MA"), - expected_fields: vec![ - MessageField { - key: "moisture".into(), - r#type: FieldType::Float.into(), - timestamp: 100000, - }, - MessageField { - key: "other_temp".into(), - r#type: FieldType::Float.into(), - timestamp: 250, - }, - MessageField { - key: "temp".into(), - r#type: FieldType::Float.into(), - timestamp: 100000, - }, - ], - }) - .run() - .await; -} - -#[derive(Debug)] -struct MeasurementFieldsTest { - setup_name: &'static str, - table_name: &'static str, - request: GrpcRequestBuilder, - expected_fields: Vec, -} - -#[async_trait] -impl InfluxRpcTest for MeasurementFieldsTest { - fn setup_name(&self) -> &'static str { - self.setup_name - } - - async fn request_and_assert(&self, cluster: &MiniCluster) { - let mut storage_client = cluster.querier_storage_client(); - - let measurement_fields_request = self - .request - .clone() - .source(cluster) - .build_measurement_fields(self.table_name); - - let measurement_fields_response = storage_client - .measurement_fields(measurement_fields_request) - .await - .unwrap(); - let responses: Vec<_> = measurement_fields_response - .into_inner() - .try_collect() - .await - .unwrap(); - - assert_eq!(responses[0].fields, self.expected_fields); - } -} diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata/measurement_fields.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata/measurement_fields.rs new file mode 100644 index 0000000000..e053d432e0 --- /dev/null +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/metadata/measurement_fields.rs @@ -0,0 +1,303 @@ +use super::{run_data_test, InfluxRpcTest}; +use async_trait::async_trait; +use data_types::{MAX_NANO_TIME, MIN_NANO_TIME}; +use futures::{prelude::*, FutureExt}; +use generated_types::measurement_fields_response::{FieldType, MessageField}; +use std::sync::Arc; +use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, MiniCluster, StepTestState}; + +#[tokio::test] +async fn measurement_fields() { + let generator = Arc::new(DataGenerator::new()); + run_data_test( + Arc::clone(&generator), + Box::new(move |state: &mut StepTestState| { + let generator = Arc::clone(&generator); + async move { + let mut storage_client = state.cluster().querier_storage_client(); + + let measurement_fields_request = GrpcRequestBuilder::new() + .source(state.cluster()) + .timestamp_range(generator.min_time(), generator.max_time()) + .tag_predicate("host", "server01") + .build_measurement_fields("cpu_load_short"); + + let ns_since_epoch = generator.ns_since_epoch(); + let measurement_fields_response = storage_client + .measurement_fields(measurement_fields_request) + .await + .unwrap(); + let responses: Vec<_> = measurement_fields_response + .into_inner() + .try_collect() + .await + .unwrap(); + + let fields = &responses[0].fields; + assert_eq!(fields.len(), 1); + + let field = &fields[0]; + assert_eq!(field.key, "value"); + assert_eq!(field.r#type(), FieldType::Float); + assert_eq!(field.timestamp, ns_since_epoch + 4); + } + .boxed() + }), + ) + .await +} + +#[tokio::test] +async fn field_columns_nonexistent_table_with_predicate() { + Arc::new(MeasurementFieldsTest { + setup_name: "TwoMeasurementsManyFields", + table_name: "NoSuchTable", + request: GrpcRequestBuilder::new().tag_predicate("state", "MA"), + expected_fields: vec![], + }) + .run() + .await; +} + +#[tokio::test] +async fn field_columns_existing_table_with_predicate() { + Arc::new(MeasurementFieldsTest { + setup_name: "TwoMeasurementsManyFields", + table_name: "h2o", + request: GrpcRequestBuilder::new().tag_predicate("state", "MA"), + expected_fields: vec![ + MessageField { + key: "moisture".into(), + r#type: FieldType::Float.into(), + timestamp: 100_000, + }, + MessageField { + key: "other_temp".into(), + r#type: FieldType::Float.into(), + timestamp: 250, + }, + MessageField { + key: "temp".into(), + r#type: FieldType::Float.into(), + timestamp: 100_000, + }, + ], + }) + .run() + .await; +} + +#[tokio::test] +async fn field_columns_timestamp_range_predicate() { + Arc::new(MeasurementFieldsTest { + setup_name: "TwoMeasurementsManyFields", + table_name: "h2o", + request: GrpcRequestBuilder::new().timestamp_range(i64::MIN, i64::MAX - 2), + expected_fields: vec![ + MessageField { + key: "moisture".into(), + r#type: FieldType::Float.into(), + timestamp: 100_000, + }, + MessageField { + key: "other_temp".into(), + r#type: FieldType::Float.into(), + timestamp: 350, + }, + MessageField { + key: "temp".into(), + r#type: FieldType::Float.into(), + timestamp: 100_000, + }, + ], + }) + .run() + .await; +} + +#[tokio::test] +async fn field_columns_no_predicate_full_time_range_returns_zero_timestamp() { + Arc::new(MeasurementFieldsTest { + setup_name: "TwoMeasurementsManyFields", + table_name: "h2o", + request: GrpcRequestBuilder::new(), + expected_fields: vec![ + MessageField { + key: "moisture".into(), + r#type: FieldType::Float.into(), + timestamp: 0, + }, + MessageField { + key: "other_temp".into(), + r#type: FieldType::Float.into(), + timestamp: 0, + }, + MessageField { + key: "temp".into(), + r#type: FieldType::Float.into(), + timestamp: 0, + }, + ], + }) + .run() + .await; +} + +#[tokio::test] +async fn field_columns_tag_and_timestamp_range_predicate() { + Arc::new(MeasurementFieldsTest { + setup_name: "TwoMeasurementsManyFields", + table_name: "h2o", + request: GrpcRequestBuilder::new() + .tag_predicate("state", "MA") + .timestamp_range(200, 300), + expected_fields: vec![MessageField { + key: "other_temp".into(), + r#type: FieldType::Float.into(), + timestamp: 250, + }], + }) + .run() + .await; +} + +#[tokio::test] +async fn field_names() { + Arc::new(MeasurementFieldsTest { + setup_name: "OneMeasurementManyFields", + table_name: "h2o", + request: GrpcRequestBuilder::new().timestamp_range(0, 2_000), + expected_fields: vec![ + MessageField { + key: "field1".into(), + r#type: FieldType::Float.into(), + timestamp: 1_000, + }, + MessageField { + key: "field2".into(), + r#type: FieldType::String.into(), + timestamp: 100, + }, + MessageField { + key: "field3".into(), + r#type: FieldType::Float.into(), + timestamp: 100, + }, + MessageField { + key: "field4".into(), + r#type: FieldType::Boolean.into(), + timestamp: 1_000, + }, + ], + }) + .run() + .await; +} + +#[tokio::test] +async fn list_field_columns_all_time() { + Arc::new(MeasurementFieldsTest { + setup_name: "MeasurementWithMaxTime", + table_name: "cpu", + request: GrpcRequestBuilder::new().timestamp_range(MIN_NANO_TIME, MAX_NANO_TIME), + expected_fields: vec![MessageField { + key: "value".into(), + r#type: FieldType::Float.into(), + timestamp: 0, + }], + }) + .run() + .await; +} + +#[tokio::test] +async fn list_field_columns_max_time_included() { + Arc::new(MeasurementFieldsTest { + setup_name: "MeasurementWithMaxTime", + table_name: "cpu", + // if the range started at i64:MIN, we would hit the optimized case for 'all time' + // and get the 'wrong' timestamp, since in the optimized case we don't check what timestamps + // exist + request: GrpcRequestBuilder::new().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME + 1), + expected_fields: vec![MessageField { + key: "value".into(), + r#type: FieldType::Float.into(), + timestamp: MAX_NANO_TIME, + }], + }) + .run() + .await; +} + +#[tokio::test] +async fn list_field_columns_max_time_excluded() { + Arc::new(MeasurementFieldsTest { + setup_name: "MeasurementWithMaxTime", + table_name: "cpu", + // one less than max timestamp + request: GrpcRequestBuilder::new().timestamp_range(MIN_NANO_TIME + 1, MAX_NANO_TIME), + expected_fields: vec![], + }) + .run() + .await; +} + +#[tokio::test] +async fn list_field_columns_with_periods() { + Arc::new(MeasurementFieldsTest { + setup_name: "PeriodsInNames", + table_name: "measurement.one", + request: GrpcRequestBuilder::new().timestamp_range(0, 1_700_000_001_000_000_000), + expected_fields: vec![ + MessageField { + key: "field.one".into(), + r#type: FieldType::Float.into(), + timestamp: 1_609_459_201_000_000_002, + }, + MessageField { + key: "field.two".into(), + r#type: FieldType::Boolean.into(), + timestamp: 1_609_459_201_000_000_002, + }, + ], + }) + .run() + .await; +} + +#[derive(Debug)] +struct MeasurementFieldsTest { + setup_name: &'static str, + table_name: &'static str, + request: GrpcRequestBuilder, + expected_fields: Vec, +} + +#[async_trait] +impl InfluxRpcTest for MeasurementFieldsTest { + fn setup_name(&self) -> &'static str { + self.setup_name + } + + async fn request_and_assert(&self, cluster: &MiniCluster) { + let mut storage_client = cluster.querier_storage_client(); + + let measurement_fields_request = self + .request + .clone() + .source(cluster) + .build_measurement_fields(self.table_name); + + let measurement_fields_response = storage_client + .measurement_fields(measurement_fields_request) + .await + .unwrap(); + let responses: Vec<_> = measurement_fields_response + .into_inner() + .try_collect() + .await + .unwrap(); + + assert_eq!(responses[0].fields, self.expected_fields); + } +} diff --git a/influxdb_iox/tests/query_tests2/setups.rs b/influxdb_iox/tests/query_tests2/setups.rs index 0af51e0f42..826ddeeb6e 100644 --- a/influxdb_iox/tests/query_tests2/setups.rs +++ b/influxdb_iox/tests/query_tests2/setups.rs @@ -246,6 +246,26 @@ pub static SETUPS: Lazy> = Lazy::new(|| { }) .collect::>(), ), + ( + "OneMeasurementManyFields", + vec![ + Step::RecordNumParquetFiles, + Step::WriteLineProtocol( + [ + "h2o,tag1=foo,tag2=bar field1=70.6,field3=2 100", + "h2o,tag1=foo,tag2=bar field1=70.4,field2=\"ss\" 100", + "h2o,tag1=foo,tag2=bar field1=70.5,field2=\"ss\" 100", + "h2o,tag1=foo,tag2=bar field1=70.6,field4=true 1000", + "h2o,tag1=foo,tag2=bar field1=70.3,field5=false 3000", + ] + .join("\n"), + ), + Step::Persist, + Step::WaitForPersisted2 { + expected_increase: 1, + }, + ], + ), ( "TwoMeasurementsManyFields", vec![ @@ -435,6 +455,23 @@ pub static SETUPS: Lazy> = Lazy::new(|| { Step::WriteLineProtocol("h2o,state=CA,city=Andover temp=67.3 500".into()), ], ), + ( + "MeasurementWithMaxTime", + vec![ + Step::RecordNumParquetFiles, + Step::WriteLineProtocol(format!( + "cpu,host=server01 value=100 {}", + // This is the maximum timestamp that can be represented in the InfluxDB data + // model: + // + i64::MAX - 1, // 9223372036854775806 + )), + Step::Persist, + Step::WaitForPersisted2 { + expected_increase: 1, + }, + ], + ), ( "OneMeasurementRealisticTimes", vec![ diff --git a/query_tests/src/influxrpc.rs b/query_tests/src/influxrpc.rs index 971b033b69..b1ab76b79a 100644 --- a/query_tests/src/influxrpc.rs +++ b/query_tests/src/influxrpc.rs @@ -1,4 +1,3 @@ -pub mod field_columns; pub mod read_filter; pub mod read_group; pub mod read_window_aggregate; diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs deleted file mode 100644 index 70a8021cac..0000000000 --- a/query_tests/src/influxrpc/field_columns.rs +++ /dev/null @@ -1,270 +0,0 @@ -use crate::scenarios::*; -use arrow::datatypes::DataType; -use data_types::{MAX_NANO_TIME, MIN_NANO_TIME}; -use datafusion::prelude::{col, lit}; -use iox_query::{ - exec::fieldlist::{Field, FieldList}, - frontend::influxrpc::InfluxRpcPlanner, -}; -use predicate::{rpc_predicate::InfluxRpcPredicate, Predicate}; - -/// Creates and loads several database scenarios using the db_setup -/// function. -/// -/// runs field_column_names(predicate) and compares it to the expected -/// output -async fn run_field_columns_test_case( - db_setup: D, - predicate: InfluxRpcPredicate, - expected_fields: FieldList, -) where - D: DbSetup, -{ - test_helpers::maybe_start_logging(); - - for scenario in db_setup.make().await { - let DbScenario { - scenario_name, db, .. - } = scenario; - println!("Running scenario '{}'", scenario_name); - println!("Predicate: '{:#?}'", predicate); - let ctx = db.new_query_context(None); - let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); - - let plan = planner - .field_columns(db.as_query_namespace_arc(), predicate.clone()) - .await - .expect("built plan successfully"); - let fields = ctx - .to_field_list(plan) - .await - .expect("converted plan to strings successfully"); - - assert_eq!( - fields, expected_fields, - "Error in scenario '{}'\n\nexpected:\n{:#?}\nactual:\n{:#?}", - scenario_name, expected_fields, fields - ); - } -} - -#[tokio::test] -async fn test_field_columns_no_predicate() { - let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA - let predicate = InfluxRpcPredicate::new_table("NoSuchTable", predicate); - let expected_fields = FieldList::default(); - run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn test_field_columns_with_pred() { - // get only fields from h20 (but both chunks) - let predicate = Predicate::default().with_expr(col("state").eq(lit("MA"))); // state=MA - let predicate = InfluxRpcPredicate::new_table("h2o", predicate); - - let expected_fields = FieldList { - fields: vec![ - Field { - name: "moisture".into(), - data_type: DataType::Float64, - last_timestamp: 100000, - }, - Field { - name: "other_temp".into(), - data_type: DataType::Float64, - last_timestamp: 250, - }, - Field { - name: "temp".into(), - data_type: DataType::Float64, - last_timestamp: 100000, - }, - ], - }; - - run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn test_field_columns_measurement_pred() { - // get only fields from h2o using a _measurement predicate - let predicate = Predicate::default() - .with_expr(col("_measurement").eq(lit("h2o"))) - .with_range(i64::MIN, i64::MAX - 2); - let predicate = InfluxRpcPredicate::new(None, predicate); - - let expected_fields = FieldList { - fields: vec![ - Field { - name: "moisture".into(), - data_type: DataType::Float64, - last_timestamp: 100000, - }, - Field { - name: "other_temp".into(), - data_type: DataType::Float64, - last_timestamp: 350, - }, - Field { - name: "temp".into(), - data_type: DataType::Float64, - last_timestamp: 100000, - }, - ], - }; - - run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn test_field_columns_measurement_pred_all_time() { - // get only fields from h2o using a _measurement predicate - let predicate = Predicate::default().with_expr(col("_measurement").eq(lit("h2o"))); - let predicate = InfluxRpcPredicate::new(None, predicate); - - // optimized all-time case returns zero last_timestamp - let expected_fields = FieldList { - fields: vec![ - Field { - name: "moisture".into(), - data_type: DataType::Float64, - last_timestamp: 0, - }, - Field { - name: "other_temp".into(), - data_type: DataType::Float64, - last_timestamp: 0, - }, - Field { - name: "temp".into(), - data_type: DataType::Float64, - last_timestamp: 0, - }, - ], - }; - - run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn test_field_columns_with_ts_pred() { - let predicate = Predicate::default() - .with_range(200, 300) - .with_expr(col("state").eq(lit("MA"))); // state=MA - let predicate = InfluxRpcPredicate::new_table("h2o", predicate); - - let expected_fields = FieldList { - fields: vec![Field { - name: "other_temp".into(), - data_type: DataType::Float64, - last_timestamp: 250, - }], - }; - - run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn test_field_name_plan() { - test_helpers::maybe_start_logging(); - - let predicate = Predicate::default().with_range(0, 2000); - let predicate = InfluxRpcPredicate::new(None, predicate); - - let expected_fields = FieldList { - fields: vec![ - Field { - name: "field1".into(), - data_type: DataType::Float64, - last_timestamp: 1000, - }, - Field { - name: "field2".into(), - data_type: DataType::Utf8, - last_timestamp: 100, - }, - Field { - name: "field3".into(), - data_type: DataType::Float64, - last_timestamp: 100, - }, - Field { - name: "field4".into(), - data_type: DataType::Boolean, - last_timestamp: 1000, - }, - ], - }; - - run_field_columns_test_case(OneMeasurementManyFields {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn list_field_columns_all_time() { - let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME); - let predicate = InfluxRpcPredicate::new(None, predicate); - - let expected_fields = FieldList { - fields: vec![Field { - name: "value".into(), - data_type: DataType::Float64, - last_timestamp: 0, // we hit the optimized case that ignores timestamps - }], - }; - - run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn list_field_columns_max_time_included() { - // if the range started at i64:MIN, we would hit the optimized case for 'all time' - // and get the 'wrong' timestamp, since in the optimized case we don't check what timestamps - // exist - let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME + 1); - let predicate = InfluxRpcPredicate::new(None, predicate); - - let expected_fields = FieldList { - fields: vec![Field { - name: "value".into(), - data_type: DataType::Float64, - last_timestamp: MAX_NANO_TIME, - }], - }; - - run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn list_field_columns_max_time_excluded() { - let predicate = Predicate::default() - // one less than max timestamp - .with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); - let predicate = InfluxRpcPredicate::new(None, predicate); - - let expected_fields = FieldList { fields: vec![] }; - - run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await; -} - -#[tokio::test] -async fn list_field_columns_with_periods() { - let predicate = Predicate::default().with_range(0, 1700000001000000000); - let predicate = InfluxRpcPredicate::new(None, predicate); - - let expected_fields = FieldList { - fields: vec![ - Field { - name: "field.one".into(), - data_type: DataType::Float64, - last_timestamp: 1609459201000000002, - }, - Field { - name: "field.two".into(), - data_type: DataType::Boolean, - last_timestamp: 1609459201000000002, - }, - ], - }; - - run_field_columns_test_case(PeriodsInNames {}, predicate, expected_fields).await; -}