refactor: Extract gRPC read group and read window agg tests because they manage their own data

pull/24376/head
Carol (Nichols || Goulding) 2021-01-27 16:08:36 -05:00
parent cfe18e3e61
commit 41e0fff039
1 changed files with 52 additions and 26 deletions

View File

@ -38,6 +38,7 @@ use std::time::{Duration, SystemTime};
use std::u32;
use tempfile::TempDir;
use test_helpers::*;
use tonic::transport::Channel;
const HTTP_BASE: &str = "http://localhost:8080";
const API_BASE: &str = "http://localhost:8080/api/v2";
@ -52,25 +53,34 @@ async fn read_and_write_data() {
let server = TestServer::new().unwrap();
server.wait_until_ready().await;
let scenario = Scenario::default()
.set_org_id("0000111100001111")
.set_bucket_id("1111000011110000");
let http_client = reqwest::Client::new();
let influxdb2 = influxdb2_client::Client::new(HTTP_BASE, TOKEN);
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap();
create_database(&http_client, &scenario.database_name()).await;
// These tests share data; TODO: a better way to indicate this
{
let scenario = Scenario::default()
.set_org_id("0000111100001111")
.set_bucket_id("1111000011110000");
let expected_read_data = load_data(&influxdb2, &scenario).await;
let sql_query = "select * from cpu_load_short";
create_database(&http_client, &scenario.database_name()).await;
test_read_api(&http_client, &scenario, sql_query, &expected_read_data).await;
let expected_read_data = load_data(&influxdb2, &scenario).await;
let sql_query = "select * from cpu_load_short";
test_grpc_api(&influxdb2, &scenario).await;
test_read_api(&http_client, &scenario, sql_query, &expected_read_data).await;
test_grpc_api(&mut storage_client, &scenario).await;
}
// These tests manage their own data
test_grpc_read_group(&http_client, &influxdb2, &mut storage_client).await;
test_grpc_read_window_aggregate(&http_client, &influxdb2, &mut storage_client).await;
test_http_error_messages(&influxdb2).await.unwrap();
}
// TODO: Randomly generate org and bucket ids to ensure test data independence
// where desired
#[derive(Debug)]
struct Scenario {
org_id_str: String,
@ -263,9 +273,7 @@ async fn test_read_api(
);
}
async fn test_grpc_api(influxdb2: &influxdb2_client::Client, scenario: &Scenario) {
let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap();
async fn test_grpc_api(storage_client: &mut StorageClient<Channel>, scenario: &Scenario) {
// Validate that capabilities rpc endpoint is hooked up
let capabilities_response = storage_client.capabilities(()).await.unwrap();
let capabilities_response = capabilities_response.into_inner();
@ -366,13 +374,6 @@ async fn test_grpc_api(influxdb2: &influxdb2_client::Client, scenario: &Scenario
assert_eq!(values, vec!["server01"]);
// Begin tests for read_group rpc call
load_read_group_data(&influxdb2, scenario).await;
test_read_group_none_agg(&mut storage_client, &read_source).await;
test_read_group_none_agg_with_predicate(&mut storage_client, &read_source).await;
test_read_group_sum_agg(&mut storage_client, &read_source).await;
test_read_group_last_agg(&mut storage_client, &read_source).await;
let measurement_names_request = tonic::Request::new(MeasurementNamesRequest {
source: read_source.clone(),
range: range.clone(),
@ -472,8 +473,27 @@ async fn test_grpc_api(influxdb2: &influxdb2_client::Client, scenario: &Scenario
assert_eq!(field.key, "value");
assert_eq!(field.r#type, DataType::Float as i32);
assert_eq!(field.timestamp, scenario.ns_since_epoch() + 4);
}
test_read_window_aggregate(&mut storage_client, &influxdb2, &read_source, scenario).await;
async fn test_grpc_read_group(
client: &reqwest::Client,
influxdb2: &influxdb2_client::Client,
storage_client: &mut StorageClient<Channel>,
) {
let scenario = Scenario::default()
.set_org_id("0000111100001110")
.set_bucket_id("1111000011110001");
create_database(&client, &scenario.database_name()).await;
load_read_group_data(&influxdb2, &scenario).await;
let read_source = scenario.read_source();
test_read_group_none_agg(storage_client, &read_source).await;
test_read_group_none_agg_with_predicate(storage_client, &read_source).await;
test_read_group_sum_agg(storage_client, &read_source).await;
test_read_group_last_agg(storage_client, &read_source).await;
}
async fn read_data_as_sql(
@ -534,12 +554,18 @@ async fn test_http_error_messages(client: &influxdb2_client::Client) -> Result<(
}
// Standalone test that all the pipes are hooked up for read window aggregate
async fn test_read_window_aggregate(
storage_client: &mut StorageClient<tonic::transport::Channel>,
client: &influxdb2_client::Client,
read_source: &std::option::Option<prost_types::Any>,
scenario: &Scenario,
async fn test_grpc_read_window_aggregate(
client: &reqwest::Client,
influxdb2: &influxdb2_client::Client,
storage_client: &mut StorageClient<Channel>,
) {
let scenario = Scenario::default()
.set_org_id("0000111100001100")
.set_bucket_id("1111000011110011");
let read_source = scenario.read_source();
create_database(&client, &scenario.database_name()).await;
let line_protocol = vec![
"h2o,state=MA,city=Boston temp=70.0 100",
"h2o,state=MA,city=Boston temp=71.0 200",
@ -559,7 +585,7 @@ async fn test_read_window_aggregate(
]
.join("\n");
client
influxdb2
.write_line_protocol(
scenario.org_id_str(),
scenario.bucket_id_str(),