diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index 0eaa7860f0..7934576403 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -38,6 +38,7 @@ use std::time::{Duration, SystemTime}; use std::u32; use tempfile::TempDir; use test_helpers::*; +use tonic::transport::Channel; const HTTP_BASE: &str = "http://localhost:8080"; const API_BASE: &str = "http://localhost:8080/api/v2"; @@ -52,25 +53,34 @@ async fn read_and_write_data() { let server = TestServer::new().unwrap(); server.wait_until_ready().await; - let scenario = Scenario::default() - .set_org_id("0000111100001111") - .set_bucket_id("1111000011110000"); - let http_client = reqwest::Client::new(); let influxdb2 = influxdb2_client::Client::new(HTTP_BASE, TOKEN); + let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap(); - create_database(&http_client, &scenario.database_name()).await; + // These tests share data; TODO: a better way to indicate this + { + let scenario = Scenario::default() + .set_org_id("0000111100001111") + .set_bucket_id("1111000011110000"); - let expected_read_data = load_data(&influxdb2, &scenario).await; - let sql_query = "select * from cpu_load_short"; + create_database(&http_client, &scenario.database_name()).await; - test_read_api(&http_client, &scenario, sql_query, &expected_read_data).await; + let expected_read_data = load_data(&influxdb2, &scenario).await; + let sql_query = "select * from cpu_load_short"; - test_grpc_api(&influxdb2, &scenario).await; + test_read_api(&http_client, &scenario, sql_query, &expected_read_data).await; + test_grpc_api(&mut storage_client, &scenario).await; + } + // These tests manage their own data + test_grpc_read_group(&http_client, &influxdb2, &mut storage_client).await; + test_grpc_read_window_aggregate(&http_client, &influxdb2, &mut storage_client).await; test_http_error_messages(&influxdb2).await.unwrap(); } +// TODO: Randomly generate org and bucket ids to ensure test data independence +// where desired + #[derive(Debug)] struct Scenario { org_id_str: String, @@ -263,9 +273,7 @@ async fn test_read_api( ); } -async fn test_grpc_api(influxdb2: &influxdb2_client::Client, scenario: &Scenario) { - let mut storage_client = StorageClient::connect(GRPC_URL_BASE).await.unwrap(); - +async fn test_grpc_api(storage_client: &mut StorageClient, scenario: &Scenario) { // Validate that capabilities rpc endpoint is hooked up let capabilities_response = storage_client.capabilities(()).await.unwrap(); let capabilities_response = capabilities_response.into_inner(); @@ -366,13 +374,6 @@ async fn test_grpc_api(influxdb2: &influxdb2_client::Client, scenario: &Scenario assert_eq!(values, vec!["server01"]); - // Begin tests for read_group rpc call - load_read_group_data(&influxdb2, scenario).await; - test_read_group_none_agg(&mut storage_client, &read_source).await; - test_read_group_none_agg_with_predicate(&mut storage_client, &read_source).await; - test_read_group_sum_agg(&mut storage_client, &read_source).await; - test_read_group_last_agg(&mut storage_client, &read_source).await; - let measurement_names_request = tonic::Request::new(MeasurementNamesRequest { source: read_source.clone(), range: range.clone(), @@ -472,8 +473,27 @@ async fn test_grpc_api(influxdb2: &influxdb2_client::Client, scenario: &Scenario assert_eq!(field.key, "value"); assert_eq!(field.r#type, DataType::Float as i32); assert_eq!(field.timestamp, scenario.ns_since_epoch() + 4); +} - test_read_window_aggregate(&mut storage_client, &influxdb2, &read_source, scenario).await; +async fn test_grpc_read_group( + client: &reqwest::Client, + influxdb2: &influxdb2_client::Client, + storage_client: &mut StorageClient, +) { + let scenario = Scenario::default() + .set_org_id("0000111100001110") + .set_bucket_id("1111000011110001"); + + create_database(&client, &scenario.database_name()).await; + + load_read_group_data(&influxdb2, &scenario).await; + + let read_source = scenario.read_source(); + + test_read_group_none_agg(storage_client, &read_source).await; + test_read_group_none_agg_with_predicate(storage_client, &read_source).await; + test_read_group_sum_agg(storage_client, &read_source).await; + test_read_group_last_agg(storage_client, &read_source).await; } async fn read_data_as_sql( @@ -534,12 +554,18 @@ async fn test_http_error_messages(client: &influxdb2_client::Client) -> Result<( } // Standalone test that all the pipes are hooked up for read window aggregate -async fn test_read_window_aggregate( - storage_client: &mut StorageClient, - client: &influxdb2_client::Client, - read_source: &std::option::Option, - scenario: &Scenario, +async fn test_grpc_read_window_aggregate( + client: &reqwest::Client, + influxdb2: &influxdb2_client::Client, + storage_client: &mut StorageClient, ) { + let scenario = Scenario::default() + .set_org_id("0000111100001100") + .set_bucket_id("1111000011110011"); + let read_source = scenario.read_source(); + + create_database(&client, &scenario.database_name()).await; + let line_protocol = vec![ "h2o,state=MA,city=Boston temp=70.0 100", "h2o,state=MA,city=Boston temp=71.0 200", @@ -559,7 +585,7 @@ async fn test_read_window_aggregate( ] .join("\n"); - client + influxdb2 .write_line_protocol( scenario.org_id_str(), scenario.bucket_id_str(),