From d7c59da46b674c5524f1b4acac7ed9de44574f55 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 3 Feb 2023 14:38:49 -0500 Subject: [PATCH] fix: Switch more tests over to the kafkaless architecture --- influxdb_iox/tests/end_to_end_cases/cli.rs | 18 +- influxdb_iox/tests/end_to_end_cases/error.rs | 2 +- .../tests/end_to_end_cases/namespace.rs | 8 +- .../tests/end_to_end_cases/querier.rs | 1 - .../querier/multi_ingester.rs | 179 ------------------ influxdb_iox/tests/end_to_end_cases/remote.rs | 7 +- test_helpers_end_to_end/src/config.rs | 12 +- test_helpers_end_to_end/src/mini_cluster.rs | 65 ++----- test_helpers_end_to_end/src/server_fixture.rs | 2 +- test_helpers_end_to_end/src/server_type.rs | 13 ++ 10 files changed, 64 insertions(+), 243 deletions(-) delete mode 100644 influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 2336531740..a03fce321f 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -60,16 +60,19 @@ async fn parquet_to_lp() { // The test below assumes a specific partition id, so use a // non-shared one here so concurrent tests don't interfere with // each other - let mut cluster = MiniCluster::create_non_shared_standard(database_url).await; + let mut cluster = MiniCluster::create_non_shared2(database_url).await; let line_protocol = "my_awesome_table,tag1=A,tag2=B val=42i 123456"; StepTest::new( &mut cluster, vec![ + Step::RecordNumParquetFiles, Step::WriteLineProtocol(String::from(line_protocol)), // wait for partitions to be persisted - Step::WaitForPersisted, + Step::WaitForPersisted2 { + expected_increase: 1, + }, // Run the 'remote partition' command Step::Custom(Box::new(move |state: &mut StepTestState| { async move { @@ -185,16 +188,19 @@ async fn compact_and_get_remote_partition() { // The test below assumes a specific partition id, so use a // non-shared one here so concurrent tests don't interfere with // each other - let mut cluster = MiniCluster::create_non_shared_standard(database_url).await; + let mut cluster = MiniCluster::create_non_shared2(database_url).await; StepTest::new( &mut cluster, vec![ + Step::RecordNumParquetFiles, Step::WriteLineProtocol(String::from( "my_awesome_table,tag1=A,tag2=B val=42i 123456", )), // wait for partitions to be persisted - Step::WaitForPersisted, + Step::WaitForPersisted2 { + expected_increase: 1, + }, // Run the compactor Step::Compact, // Run the 'remote partition' command @@ -327,9 +333,13 @@ async fn schema_cli() { StepTest::new( &mut cluster, vec![ + Step::RecordNumParquetFiles, Step::WriteLineProtocol(String::from( "my_awesome_table2,tag1=A,tag2=B val=42i 123456", )), + Step::WaitForPersisted2 { + expected_increase: 1, + }, Step::Custom(Box::new(|state: &mut StepTestState| { async { // should be able to query both router and querier for the schema diff --git a/influxdb_iox/tests/end_to_end_cases/error.rs b/influxdb_iox/tests/end_to_end_cases/error.rs index f88961d6a8..dd7e6b4213 100644 --- a/influxdb_iox/tests/end_to_end_cases/error.rs +++ b/influxdb_iox/tests/end_to_end_cases/error.rs @@ -8,7 +8,7 @@ use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTes #[tokio::test] pub async fn test_panic() { let database_url = maybe_skip_integration!(); - let mut cluster = MiniCluster::create_shared(database_url).await; + let mut cluster = MiniCluster::create_shared2(database_url).await; StepTest::new( &mut cluster, diff --git a/influxdb_iox/tests/end_to_end_cases/namespace.rs b/influxdb_iox/tests/end_to_end_cases/namespace.rs index 09b5c79bd1..1f19955f50 100644 --- a/influxdb_iox/tests/end_to_end_cases/namespace.rs +++ b/influxdb_iox/tests/end_to_end_cases/namespace.rs @@ -1,7 +1,7 @@ use http::StatusCode; use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, TestConfig}; -/// Test the namespacea client +/// Test the namespace client #[tokio::test] async fn querier_namespace_client() { test_helpers::maybe_start_logging(); @@ -9,9 +9,9 @@ async fn querier_namespace_client() { let table_name = "the_table"; - let router_config = TestConfig::new_router(&database_url); - let ingester_config = TestConfig::new_ingester(&router_config); - let querier_config = TestConfig::new_querier(&ingester_config); + let ingester_config = TestConfig::new_ingester2(&database_url); + let router_config = TestConfig::new_router2(&ingester_config); + let querier_config = TestConfig::new_querier2(&ingester_config); // Set up the cluster ==================================== let cluster = MiniCluster::new() diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index 84e3602d9d..347b2eb971 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -1,5 +1,4 @@ pub(crate) mod influxrpc; -mod multi_ingester; use arrow::datatypes::{DataType, SchemaRef}; use arrow_flight::{ diff --git a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs b/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs deleted file mode 100644 index df5faa4cff..0000000000 --- a/influxdb_iox/tests/end_to_end_cases/querier/multi_ingester.rs +++ /dev/null @@ -1,179 +0,0 @@ -use arrow::{array::as_primitive_array, datatypes::Int64Type, record_batch::RecordBatch}; -use data_types::ShardIndex; -use futures::FutureExt; -use influxdb_iox_client::write_info::generated_types::{GetWriteInfoResponse, ShardStatus}; -use std::time::Duration; -use test_helpers::timeout::FutureTimeout; -use test_helpers_end_to_end::{ - all_readable, combined_token_info, maybe_skip_integration, MiniCluster, Step, StepTest, - StepTestState, TestConfig, -}; - -#[tokio::test] -/// Test with multiple ingesters -async fn basic_multi_ingesters() { - let database_url = maybe_skip_integration!(); - test_helpers::maybe_start_logging(); - - // write into two different shards: 0 and 1 - let router_config = TestConfig::new_router(&database_url).with_new_write_buffer_shards(2); - - // ingester gets partition 0 - let ingester_config = TestConfig::new_ingester(&router_config).with_shard(ShardIndex::new(0)); - let ingester2_config = TestConfig::new_ingester(&router_config).with_shard(ShardIndex::new(1)); - - let json = format!( - r#"{{ - "ingesters": {{ - "i1": {{ - "addr": "{}" - }}, - "i2": {{ - "addr": "{}" - }} - }}, - "shards": {{ - "0": {{ - "ingester": "i1" - }}, - "1": {{ - "ingester": "i2" - }} - }} - }}"#, - ingester_config.ingester_base(), - ingester2_config.ingester_base() - ); - - let querier_config = TestConfig::new_querier_without_ingester(&ingester_config) - // Configure to talk with both the ingesters - .with_shard_to_ingesters_mapping(&json); - - // Set up the cluster ==================================== - let mut cluster = MiniCluster::new() - .with_router(router_config) - .await - .with_ingester(ingester_config) - .await - // second ingester - .with_other(ingester2_config) - .await - .with_querier(querier_config) - .await; - - // pick 100 table names to spread across both ingesters - let lp_data = (0..100) - .map(|i| format!("table_{i},tag1=A,tag2=B val={i}i 123456")) - .collect::>() - .join("\n"); - - let test_steps = vec![ - Step::WriteLineProtocol(lp_data), - // wait for data to be readable in ingester2 - Step::Custom(Box::new(move |state: &mut StepTestState| { - async { - let combined_response = get_multi_ingester_readable_combined_response(state).await; - - // make sure the data in all partitions is readable or - // persisted (and there is none that is unknown) - assert!( - combined_response.shard_infos.iter().all(|info| { - matches!( - info.status(), - ShardStatus::Persisted | ShardStatus::Readable - ) - }), - "Not all shards were readable or persisted. Combined responses: {combined_response:?}" - ); - } - .boxed() - })), - // spot check results (full validation is in verification_steps) - Step::Query { - sql: "select * from table_5".into(), - expected: vec![ - "+------+------+--------------------------------+-----+", - "| tag1 | tag2 | time | val |", - "+------+------+--------------------------------+-----+", - "| A | B | 1970-01-01T00:00:00.000123456Z | 5 |", - "+------+------+--------------------------------+-----+", - ], - }, - Step::Query { - sql: "select * from table_42".into(), - expected: vec![ - "+------+------+--------------------------------+-----+", - "| tag1 | tag2 | time | val |", - "+------+------+--------------------------------+-----+", - "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", - "+------+------+--------------------------------+-----+", - ], - }, - ] - .into_iter() - // read all the data back out - .chain((0..100).map(|i| Step::VerifiedQuery { - sql: format!("select * from table_{i}"), - verify: Box::new(move |batches: Vec| { - println!("Verifing contents of table_{i}"); - // results look like this: - // "+------+------+--------------------------------+-----+", - // "| tag1 | tag2 | time | val |", - // "+------+------+--------------------------------+-----+", - // "| A | B | 1970-01-01T00:00:00.000123456Z | val |", - // "+------+------+--------------------------------+-----+", - assert_eq!(batches.len(), 1, "{batches:?}"); - assert_eq!( - batches[0].schema().fields()[3].name(), - "val", - "{batches:?}" - ); - let array = as_primitive_array::(batches[0].column(3)); - assert_eq!(array.len(), 1); - assert_eq!(array.value(0), i); - }), - })); - - // Run the tests - StepTest::new(&mut cluster, test_steps).run().await -} - -/// Use the WriteInfo API on the querier that will combine write info from all the ingesters it -/// knows about to get the status of data -async fn get_multi_ingester_readable_combined_response( - state: &mut StepTestState<'_>, -) -> GetWriteInfoResponse { - async move { - let mut interval = tokio::time::interval(Duration::from_millis(500)); - let cluster = state.cluster(); - - let ingester_connections = vec![ - cluster.ingester().ingester_grpc_connection(), - cluster.other_servers()[0].ingester_grpc_connection(), - ]; - - loop { - let combined_response = - combined_token_info(state.write_tokens().to_vec(), ingester_connections.clone()) - .await; - - match combined_response { - Ok(combined_response) => { - if all_readable(&combined_response) { - println!("All data is readable: {combined_response:?}"); - return combined_response; - } else { - println!("retrying, not yet readable: {combined_response:?}"); - } - } - Err(e) => { - println!("retrying, error getting token status: {e}"); - } - } - interval.tick().await; - } - } - // run for at most 10 seconds - .with_timeout_panic(Duration::from_secs(10)) - .await -} diff --git a/influxdb_iox/tests/end_to_end_cases/remote.rs b/influxdb_iox/tests/end_to_end_cases/remote.rs index d9d97a496d..9ea96783ca 100644 --- a/influxdb_iox/tests/end_to_end_cases/remote.rs +++ b/influxdb_iox/tests/end_to_end_cases/remote.rs @@ -244,16 +244,19 @@ async fn remote_partition_and_get_from_store_and_pull() { // The test below assumes a specific partition id, so use a // non-shared one here so concurrent tests don't interfere with // each other - let mut cluster = MiniCluster::create_non_shared_standard(database_url).await; + let mut cluster = MiniCluster::create_non_shared2(database_url).await; StepTest::new( &mut cluster, vec![ + Step::RecordNumParquetFiles, Step::WriteLineProtocol(String::from( "my_awesome_table,tag1=A,tag2=B val=42i 123456", )), // wait for partitions to be persisted - Step::WaitForPersisted, + Step::WaitForPersisted2 { + expected_increase: 1, + }, // Run the 'remote partition' command Step::Custom(Box::new(|state: &mut StepTestState| { async { diff --git a/test_helpers_end_to_end/src/config.rs b/test_helpers_end_to_end/src/config.rs index fa55409fe7..dcb5b095bd 100644 --- a/test_helpers_end_to_end/src/config.rs +++ b/test_helpers_end_to_end/src/config.rs @@ -173,6 +173,17 @@ impl TestConfig { .with_default_compactor_options() } + /// Create a minimal compactor configuration, using the dsn configuration from other + pub fn new_compactor2(other: &TestConfig) -> Self { + Self::new( + ServerType::Compactor2, + other.dsn().to_owned(), + other.catalog_schema_name(), + ) + .with_existing_object_store(other) + .with_default_compactor_options() + } + /// Create a minimal querier configuration from the specified /// ingester configuration, using the same dsn and object store pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self { @@ -204,7 +215,6 @@ impl TestConfig { /// Create a minimal all in one configuration pub fn new_all_in_one(dsn: Option) -> Self { Self::new(ServerType::AllInOne, dsn, random_catalog_schema_name()) - .with_new_write_buffer() .with_new_object_store() .with_default_ingester_options() } diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index 8945a77f5b..508c7c3c49 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -69,11 +69,11 @@ impl MiniCluster { } } - /// Create a new MiniCluster that shares the same underlying - /// servers but has a new unique namespace and set of connections + /// Create a new MiniCluster that shares the same underlying servers but has a new unique + /// namespace and set of connections /// /// Note this is an internal implementation -- please use - /// [`create_shared`](Self::create_shared), and [`new`](Self::new) to create new MiniClusters. + /// [`create_shared2`](Self::create_shared2) and [`new`](Self::new) to create new MiniClusters. fn new_from_fixtures( router: Option, ingester: Option, @@ -98,48 +98,6 @@ impl MiniCluster { } } - /// Create a "standard" shared MiniCluster that has a router, ingester, - /// querier (but no compactor as that should be run on-demand in tests) - /// - /// Note: Since the underlying server processes are shared across multiple - /// tests so all users of this MiniCluster should only modify - /// their namespace - pub async fn create_shared(database_url: String) -> Self { - let start = Instant::now(); - let mut shared_servers = GLOBAL_SHARED_SERVERS.lock().await; - debug!(mutex_wait=?start.elapsed(), "creating standard cluster"); - - // try to reuse existing server processes - if let Some(shared) = shared_servers.take() { - if let Some(cluster) = shared.creatable_cluster().await { - debug!("Reusing existing cluster"); - - // Put the server back - *shared_servers = Some(shared); - let start = Instant::now(); - // drop the lock prior to calling create() to allow - // others to proceed - std::mem::drop(shared_servers); - let new_self = cluster.create().await; - info!( - total_wait=?start.elapsed(), - "created new new mini cluster from existing cluster" - ); - return new_self; - } else { - info!("some server proceses of previous cluster have already returned"); - } - } - - // Have to make a new one - info!("Create a new server"); - let new_cluster = Self::create_non_shared_standard(database_url).await; - - // Update the shared servers to point at the newly created server proesses - *shared_servers = Some(SharedServers::new(&new_cluster)); - new_cluster - } - /// Create a "standard" shared MiniCluster that has a router, ingester, and querier (but no /// compactor as that should be run on-demand in tests) /// @@ -262,11 +220,14 @@ impl MiniCluster { .with_compactor_config(compactor_config) } - /// Create a non-shared "version 2" "standard" MiniCluster that has a router, ingester, querier. + /// Create a non-shared "version 2" "standard" MiniCluster that has a router, ingester, + /// querier. Save config for a compactor, but the compactor should be run on-demand in tests + /// using `compactor run-once` rather than using `run compactor`. pub async fn create_non_shared2(database_url: String) -> Self { let ingester_config = TestConfig::new_ingester2(&database_url); let router_config = TestConfig::new_router2(&ingester_config); let querier_config = TestConfig::new_querier2(&ingester_config); + let compactor_config = TestConfig::new_compactor2(&ingester_config); // Set up the cluster ==================================== Self::new() @@ -276,14 +237,18 @@ impl MiniCluster { .await .with_querier(querier_config) .await + .with_compactor_config(compactor_config) } /// Create a non-shared "version 2" MiniCluster that has a router, ingester set to essentially - /// never persist data (except on-demand), and querier. + /// never persist data (except on-demand), and querier. Save config for a compactor, but the + /// compactor should be run on-demand in tests using `compactor run-once` rather than using + /// `run compactor`. pub async fn create_non_shared2_never_persist(database_url: String) -> Self { let ingester_config = TestConfig::new_ingester2_never_persist(&database_url); let router_config = TestConfig::new_router2(&ingester_config); let querier_config = TestConfig::new_querier2(&ingester_config); + let compactor_config = TestConfig::new_compactor2(&ingester_config); // Set up the cluster ==================================== Self::new() @@ -293,15 +258,16 @@ impl MiniCluster { .await .with_querier(querier_config) .await + .with_compactor_config(compactor_config) } /// Create an all-(minus compactor)-in-one server with the specified configuration pub async fn create_all_in_one(test_config: TestConfig) -> Self { Self::new() - .with_router(test_config.clone()) - .await .with_ingester(test_config.clone()) .await + .with_router(test_config.clone()) + .await .with_querier(test_config.clone()) .await } @@ -672,7 +638,6 @@ fn server_from_weak(server: Option<&Weak>) -> Option>> = Lazy::new(|| Mutex::new(None)); // For the new server versions. `GLOBAL_SHARED_SERVERS` can be removed and this can be renamed // when the migration to router2/etc is complete. static GLOBAL_SHARED_SERVERS2: Lazy>> = Lazy::new(|| Mutex::new(None)); diff --git a/test_helpers_end_to_end/src/server_fixture.rs b/test_helpers_end_to_end/src/server_fixture.rs index 1ee03a4975..366b475aaf 100644 --- a/test_helpers_end_to_end/src/server_fixture.rs +++ b/test_helpers_end_to_end/src/server_fixture.rs @@ -479,7 +479,7 @@ impl TestServer { } match server_type { - ServerType::Compactor => { + ServerType::Compactor | ServerType::Compactor2 => { unimplemented!( "Don't use a long-running compactor and gRPC in e2e tests; use \ `influxdb_iox compactor run-once` instead" diff --git a/test_helpers_end_to_end/src/server_type.rs b/test_helpers_end_to_end/src/server_type.rs index cdeac1f893..c59bf545bc 100644 --- a/test_helpers_end_to_end/src/server_type.rs +++ b/test_helpers_end_to_end/src/server_type.rs @@ -10,6 +10,7 @@ pub enum ServerType { Querier, Querier2, Compactor, + Compactor2, } impl ServerType { @@ -24,6 +25,7 @@ impl ServerType { Self::Querier => "querier", Self::Querier2 => "querier", Self::Compactor => "compactor", + Self::Compactor2 => "compactor2", } } } @@ -146,5 +148,16 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st addrs.compactor_grpc_api().bind_addr().to_string(), ), ], + ServerType::Compactor2 => vec![ + ( + "INFLUXDB_IOX_BIND_ADDR", + addrs.router_http_api().bind_addr().to_string(), + ), + ( + "INFLUXDB_IOX_GRPC_BIND_ADDR", + addrs.compactor_grpc_api().bind_addr().to_string(), + ), + ("INFLUXDB_IOX_RPC_MODE", "2".to_string()), + ], } }