test: Start of porting InfluxRpc query_tests

Make a new trait, `InfluxRpcTest`, that types can implement to define
how to run a test on a specific Storage gRPC API. `InfluxRpcTest` takes
care of iterating through the two architectures, running the setups, and
creating the custom test step.

Implementers of the trait can define aspects of the tests that differ
per run, to make the parameters of the test clearer and highlight what
different tests are testing.
pull/24376/head
Carol (Nichols || Goulding) 2023-01-20 10:51:51 -05:00
parent 8ee6c1ec68
commit f310e01b1a
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
8 changed files with 172 additions and 11 deletions

1
Cargo.lock generated
View File

@ -2453,6 +2453,7 @@ dependencies = [
"arrow-flight",
"arrow_util",
"assert_cmd",
"async-trait",
"backtrace",
"bytes",
"clap 4.1.4",

View File

@ -81,6 +81,7 @@ workspace-hack = { path = "../workspace-hack"}
# In alphabetical order
arrow_util = { path = "../arrow_util" }
assert_cmd = "2.0.8"
async-trait = "0.1"
predicate = { path = "../predicate" }
predicates = "2.1.0"
tempfile = "3.1.0"

View File

@ -6,3 +6,4 @@
// The tests are defined in the submodules of [`end_to_end_cases`]
mod end_to_end_cases;
mod query_tests2;

View File

@ -6,9 +6,13 @@ mod read_filter;
mod read_group;
mod read_window_aggregate;
use crate::query_tests2::{framework::IoxArchitecture, setups::SETUPS};
use async_trait::async_trait;
use futures::FutureExt;
use observability_deps::tracing::*;
use std::sync::Arc;
use test_helpers_end_to_end::{
maybe_skip_integration, DataGenerator, FCustom, MiniCluster, Step, StepTest,
maybe_skip_integration, DataGenerator, FCustom, MiniCluster, Step, StepTest, StepTestState,
};
/// Runs the specified custom function on a cluster with no data
@ -56,3 +60,80 @@ pub(crate) fn read_group_data() -> Vec<&'static str> {
"cpu,cpu=cpu2,host=bar usage_user=52.0,usage_system=41.0 2000",
]
}
/// Perform an InfluxRPC test that creates a [`MiniCluster`] appropriate for the architecture(s)
/// under test, runs a setup defined in [`SETUPS`] and specified by the implementation of
/// `setup_name`, then performs actions and assertions defined by the implementation of
/// `request_and_assert` with the [`MiniCluster`].
#[async_trait]
trait InfluxRpcTest: Send + Sync + 'static {
/// The name of the setup in [`SETUPS`] that should be run on the cluster before running
/// `request_and_assert`.
fn setup_name(&self) -> &'static str;
/// Any requests and/or assertions that should be performed on the set up [`MiniCluster`].
async fn request_and_assert(&self, cluster: &MiniCluster);
/// Run the test on the appropriate architecture(s), using the setup specified by `setup_name`,
/// and calling `request_and_assert` in a custom step after the setup steps.
///
/// Note that this is defined on `Arc<Self>`, so a test using a type that implements this trait
/// will need to call:
///
/// ```ignore
/// Arc::new(ImplementingType {}).run().await;
/// ```
async fn run(self: Arc<Self>) {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let setup_name = self.setup_name();
for arch in [IoxArchitecture::Kafkaful, IoxArchitecture::Kafkaless] {
info!("Using IoxArchitecture::{arch:?} and setup {setup_name}");
// Set up the cluster ====================================
let mut cluster = match arch {
IoxArchitecture::Kafkaful => {
MiniCluster::create_non_shared_standard_never_persist(database_url.clone())
.await
}
IoxArchitecture::Kafkaless => {
MiniCluster::create_non_shared2_never_persist(database_url.clone()).await
}
};
let setup_steps = SETUPS
.get(setup_name)
.unwrap_or_else(|| panic!("Could not find setup with key `{setup_name}`"))
.iter()
// When we've switched over to the Kafkaless architecture, this map can be
// removed.
.flat_map(|step| match (arch, step) {
// If we're using the old architecture and the test steps include
// `WaitForPersist2`, swap it with `WaitForPersist` instead.
(IoxArchitecture::Kafkaful, Step::WaitForPersisted2 { .. }) => {
vec![&Step::WaitForPersisted]
}
// If we're using the old architecture and the test steps include
// `WriteLineProtocol`, wait for the data to be readable after writing.
(IoxArchitecture::Kafkaful, Step::WriteLineProtocol { .. }) => {
vec![step, &Step::WaitForReadable]
}
(_, other) => vec![other],
});
let cloned_self = Arc::clone(&self);
let test_step = Step::Custom(Box::new(move |state: &mut StepTestState| {
let cloned_self = Arc::clone(&cloned_self);
async move {
cloned_self.request_and_assert(state.cluster()).await;
}
.boxed()
}));
StepTest::new(&mut cluster, setup_steps.chain(std::iter::once(&test_step)))
.run()
.await;
}
}
}

View File

@ -1,12 +1,15 @@
use super::{run_data_test, run_no_data_test};
use super::{run_data_test, run_no_data_test, InfluxRpcTest};
use async_trait::async_trait;
use futures::{prelude::*, FutureExt};
use generated_types::{
google::protobuf::Empty, measurement_fields_response::FieldType,
offsets_response::PartitionOffsetResponse, OffsetsResponse,
google::protobuf::Empty,
measurement_fields_response::{FieldType, MessageField},
offsets_response::PartitionOffsetResponse,
OffsetsResponse,
};
use influxdb_storage_client::tag_key_bytes_to_strings;
use std::sync::Arc;
use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, StepTestState};
use test_helpers_end_to_end::{DataGenerator, GrpcRequestBuilder, MiniCluster, StepTestState};
#[tokio::test]
/// Validate that capabilities storage endpoint is hooked up
@ -281,3 +284,80 @@ async fn measurement_fields() {
)
.await
}
#[tokio::test]
async fn field_columns_nonexistent_table_with_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "NoSuchTable",
request: GrpcRequestBuilder::new().tag_predicate("state", "MA"),
expected_fields: vec![],
})
.run()
.await;
}
#[tokio::test]
async fn field_columns_existing_table_with_predicate() {
Arc::new(MeasurementFieldsTest {
setup_name: "TwoMeasurementsManyFields",
table_name: "h2o",
request: GrpcRequestBuilder::new().tag_predicate("state", "MA"),
expected_fields: vec![
MessageField {
key: "moisture".into(),
r#type: FieldType::Float.into(),
timestamp: 100000,
},
MessageField {
key: "other_temp".into(),
r#type: FieldType::Float.into(),
timestamp: 250,
},
MessageField {
key: "temp".into(),
r#type: FieldType::Float.into(),
timestamp: 100000,
},
],
})
.run()
.await;
}
#[derive(Debug)]
struct MeasurementFieldsTest {
setup_name: &'static str,
table_name: &'static str,
request: GrpcRequestBuilder,
expected_fields: Vec<MessageField>,
}
#[async_trait]
impl InfluxRpcTest for MeasurementFieldsTest {
fn setup_name(&self) -> &'static str {
self.setup_name
}
async fn request_and_assert(&self, cluster: &MiniCluster) {
let mut storage_client = cluster.querier_storage_client();
let measurement_fields_request = self
.request
.clone()
.source(cluster)
.build_measurement_fields(self.table_name);
let measurement_fields_response = storage_client
.measurement_fields(measurement_fields_request)
.await
.unwrap();
let responses: Vec<_> = measurement_fields_response
.into_inner()
.try_collect()
.await
.unwrap();
assert_eq!(responses[0].fields, self.expected_fields);
}
}

View File

@ -1,3 +0,0 @@
//! Tests of various queries for data in various states.
mod query_tests2;

View File

@ -41,7 +41,7 @@ impl IntoIterator for ChunkStage {
/// Which architecture is being used in this test run. This enum and running the tests twice is temporary until the Kafkaful architecture is retired.
#[derive(Debug, Copy, Clone)]
enum IoxArchitecture {
pub enum IoxArchitecture {
/// Use the "standard" MiniCluster that uses ingester, router, querier, compactor with a write
/// buffer (aka Kafka). This is slated for retirement soon.
Kafkaful,

View File

@ -1,3 +1,3 @@
mod cases;
mod framework;
mod setups;
pub mod framework;
pub mod setups;