refactor: Use declarative steps to reduce duplication in end to end testing (#4301)
* refactor: Use declarative steps to reduce duplication in end to end testing, * fix: improve whitespace formatting Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
2d75133ce8
commit
85f3e696e8
|
@ -6002,6 +6002,7 @@ name = "test_helpers_end_to_end_ng"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow_util",
|
||||
"assert_cmd",
|
||||
"futures",
|
||||
"http",
|
||||
|
|
|
@ -1,9 +1,4 @@
|
|||
use arrow_util::assert_batches_sorted_eq;
|
||||
use http::StatusCode;
|
||||
use test_helpers_end_to_end_ng::{
|
||||
get_write_token, maybe_skip_integration, run_query, wait_for_persisted, wait_for_readable,
|
||||
MiniCluster, TestConfig,
|
||||
};
|
||||
use test_helpers_end_to_end_ng::{maybe_skip_integration, MiniCluster, Step, StepTest, TestConfig};
|
||||
|
||||
#[tokio::test]
|
||||
async fn basic_ingester() {
|
||||
|
@ -24,37 +19,31 @@ async fn basic_ingester() {
|
|||
.with_querier(querier_config)
|
||||
.await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!(
|
||||
"{},tag1=A,tag2=B val=42i 123456\n\
|
||||
{},tag1=A,tag2=C val=43i 123457",
|
||||
table_name, table_name
|
||||
);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Wait for data to be readable
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_readable(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
|
||||
// run query
|
||||
let sql = format!("select * from {}", table_name);
|
||||
let batches = run_query(
|
||||
sql,
|
||||
cluster.namespace(),
|
||||
cluster.querier().querier_grpc_connection(),
|
||||
StepTest::new(
|
||||
&cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!(
|
||||
"{},tag1=A,tag2=B val=42i 123456\n\
|
||||
{},tag1=A,tag2=C val=43i 123457",
|
||||
table_name, table_name
|
||||
)),
|
||||
Step::WaitForReadable,
|
||||
Step::AssertNotPersisted,
|
||||
Step::Query {
|
||||
sql: format!("select * from {}", table_name),
|
||||
expected: vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"| A | C | 1970-01-01T00:00:00.000123457Z | 43 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -77,32 +66,26 @@ async fn basic_on_parquet() {
|
|||
.with_querier(querier_config)
|
||||
.await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Wait for data to be persisted to parquet
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_persisted(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
|
||||
// run query
|
||||
let sql = format!("select * from {}", table_name);
|
||||
let batches = run_query(
|
||||
sql,
|
||||
cluster.namespace(),
|
||||
cluster.querier().querier_grpc_connection(),
|
||||
StepTest::new(
|
||||
&cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
Step::WaitForPersisted,
|
||||
Step::Query {
|
||||
sql: format!("select * from {}", table_name),
|
||||
expected: vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -128,29 +111,24 @@ async fn basic_no_ingster_connection() {
|
|||
.await;
|
||||
|
||||
// Write some data into the v2 HTTP API ==============
|
||||
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
|
||||
let response = cluster.write_to_router(lp).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
|
||||
// Wait for data to be persisted to parquet
|
||||
let write_token = get_write_token(&response);
|
||||
wait_for_persisted(write_token, cluster.ingester().ingester_grpc_connection()).await;
|
||||
|
||||
// run query
|
||||
let sql = format!("select * from {}", table_name);
|
||||
let batches = run_query(
|
||||
sql,
|
||||
cluster.namespace(),
|
||||
cluster.querier().querier_grpc_connection(),
|
||||
StepTest::new(
|
||||
&cluster,
|
||||
vec![
|
||||
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123456", table_name)),
|
||||
// Wait for data to be persisted to parquet
|
||||
Step::WaitForPersisted,
|
||||
Step::Query {
|
||||
sql: format!("select * from {}", table_name),
|
||||
expected: vec![
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
# Workspace dependencies, in alphabetical order
|
||||
arrow_util = { path = "../arrow_util" }
|
||||
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
|
||||
test_helpers = { path = "../test_helpers" }
|
||||
|
||||
|
|
|
@ -48,6 +48,19 @@ pub fn get_write_token(response: &Response<Body>) -> String {
|
|||
.to_string()
|
||||
}
|
||||
|
||||
/// returns true if the write for this token is persisted, false if it
|
||||
/// is not persisted. panic's on error
|
||||
pub async fn token_is_persisted(
|
||||
write_token: impl AsRef<str>,
|
||||
ingester_connection: Connection,
|
||||
) -> bool {
|
||||
influxdb_iox_client::write_info::Client::new(ingester_connection)
|
||||
.get_write_info(write_token.as_ref())
|
||||
.await
|
||||
.expect("Error fetching write info for token")
|
||||
.persisted
|
||||
}
|
||||
|
||||
const MAX_QUERY_RETRY_TIME_SEC: u64 = 10;
|
||||
|
||||
/// Waits for the specified predicate to return true
|
||||
|
|
|
@ -7,12 +7,14 @@ mod database;
|
|||
mod mini_cluster;
|
||||
mod server_fixture;
|
||||
mod server_type;
|
||||
mod steps;
|
||||
|
||||
pub use client::*;
|
||||
pub use config::TestConfig;
|
||||
pub use mini_cluster::MiniCluster;
|
||||
pub use server_fixture::{ServerFixture, TestServer};
|
||||
pub use server_type::ServerType;
|
||||
pub use steps::{Step, StepTest};
|
||||
|
||||
/// Return a random string suitable for use as a database name
|
||||
pub fn rand_name() -> String {
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
use http::StatusCode;
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
|
||||
use crate::{
|
||||
get_write_token, run_query, token_is_persisted, wait_for_persisted, wait_for_readable,
|
||||
MiniCluster,
|
||||
};
|
||||
|
||||
/// Test harness for end to end tests that are comprised of several steps
|
||||
pub struct StepTest<'a> {
|
||||
cluster: &'a MiniCluster,
|
||||
|
||||
/// The test steps to perform
|
||||
steps: Vec<Step>,
|
||||
}
|
||||
|
||||
/// Possible test steps that a test can perform
|
||||
pub enum Step {
|
||||
/// Writes the specified line protocol to the `/api/v2/write`
|
||||
/// endpoint, assert the data was written successfully
|
||||
WriteLineProtocol(String),
|
||||
|
||||
/// Wait for all previously written data to be readable
|
||||
WaitForReadable,
|
||||
|
||||
/// Assert that all previously written data is NOT persisted yet
|
||||
AssertNotPersisted,
|
||||
|
||||
/// Wait for all previously written data to be persisted
|
||||
WaitForPersisted,
|
||||
|
||||
/// Run a query and verify that the results are as expected
|
||||
Query {
|
||||
sql: String,
|
||||
expected: Vec<&'static str>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<'a> StepTest<'a> {
|
||||
/// Create a new test that runs each `step`, in sequence, against
|
||||
/// `cluster` panic'ing if any step fails
|
||||
pub fn new(cluster: &'a MiniCluster, steps: Vec<Step>) -> Self {
|
||||
Self { cluster, steps }
|
||||
}
|
||||
|
||||
/// run the test.
|
||||
pub async fn run(self) {
|
||||
let Self { cluster, steps } = self;
|
||||
|
||||
// Tokens for all writes performed in this test
|
||||
let mut write_tokens = vec![];
|
||||
|
||||
let ingester_grpc_connection = cluster.ingester().ingester_grpc_connection();
|
||||
|
||||
for step in steps {
|
||||
match step {
|
||||
Step::WriteLineProtocol(line_protocol) => {
|
||||
println!(
|
||||
"====Begin writing line protocol to v2 HTTP API:\n{}",
|
||||
line_protocol
|
||||
);
|
||||
let response = cluster.write_to_router(line_protocol).await;
|
||||
assert_eq!(response.status(), StatusCode::NO_CONTENT);
|
||||
let write_token = get_write_token(&response);
|
||||
println!("====Done writing line protocol, got token {}", write_token);
|
||||
write_tokens.push(write_token);
|
||||
}
|
||||
Step::WaitForReadable => {
|
||||
println!("====Begin waiting for all write tokens to be readable");
|
||||
for write_token in &write_tokens {
|
||||
wait_for_readable(write_token, ingester_grpc_connection.clone()).await;
|
||||
}
|
||||
println!("====Done waiting for all write tokens to be readable");
|
||||
}
|
||||
Step::WaitForPersisted => {
|
||||
println!("====Begin waiting for all write tokens to be persisted");
|
||||
for write_token in &write_tokens {
|
||||
wait_for_persisted(write_token, ingester_grpc_connection.clone()).await;
|
||||
}
|
||||
println!("====Done waiting for all write tokens to be persisted");
|
||||
}
|
||||
Step::AssertNotPersisted => {
|
||||
println!("====Begin checking all tokens not persisted");
|
||||
for write_token in &write_tokens {
|
||||
let persisted =
|
||||
token_is_persisted(write_token, ingester_grpc_connection.clone()).await;
|
||||
assert!(!persisted);
|
||||
}
|
||||
println!("====Done checking all tokens not persisted");
|
||||
}
|
||||
Step::Query { sql, expected } => {
|
||||
println!("====Begin running query: {}", sql);
|
||||
// run query
|
||||
let batches = run_query(
|
||||
sql,
|
||||
cluster.namespace(),
|
||||
cluster.querier().querier_grpc_connection(),
|
||||
)
|
||||
.await;
|
||||
// convert String --> str
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
println!("====Done running");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue