fix: Switch more tests over to the kafkaless architecture

pull/24376/head
Carol (Nichols || Goulding) 2023-02-03 14:38:49 -05:00
parent eb15e2e8d9
commit d7c59da46b
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
10 changed files with 64 additions and 243 deletions

View File

@ -60,16 +60,19 @@ async fn parquet_to_lp() {
// The test below assumes a specific partition id, so use a
// non-shared one here so concurrent tests don't interfere with
// each other
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
let line_protocol = "my_awesome_table,tag1=A,tag2=B val=42i 123456";
StepTest::new(
&mut cluster,
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(String::from(line_protocol)),
// wait for partitions to be persisted
Step::WaitForPersisted,
Step::WaitForPersisted2 {
expected_increase: 1,
},
// Run the 'remote partition' command
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
@ -185,16 +188,19 @@ async fn compact_and_get_remote_partition() {
// The test below assumes a specific partition id, so use a
// non-shared one here so concurrent tests don't interfere with
// each other
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(String::from(
"my_awesome_table,tag1=A,tag2=B val=42i 123456",
)),
// wait for partitions to be persisted
Step::WaitForPersisted,
Step::WaitForPersisted2 {
expected_increase: 1,
},
// Run the compactor
Step::Compact,
// Run the 'remote partition' command
@ -327,9 +333,13 @@ async fn schema_cli() {
StepTest::new(
&mut cluster,
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(String::from(
"my_awesome_table2,tag1=A,tag2=B val=42i 123456",
)),
Step::WaitForPersisted2 {
expected_increase: 1,
},
Step::Custom(Box::new(|state: &mut StepTestState| {
async {
// should be able to query both router and querier for the schema

View File

@ -8,7 +8,7 @@ use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, Step, StepTes
#[tokio::test]
pub async fn test_panic() {
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_shared(database_url).await;
let mut cluster = MiniCluster::create_shared2(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -1,7 +1,7 @@
use http::StatusCode;
use test_helpers_end_to_end::{maybe_skip_integration, MiniCluster, TestConfig};
/// Test the namespacea client
/// Test the namespace client
#[tokio::test]
async fn querier_namespace_client() {
test_helpers::maybe_start_logging();
@ -9,9 +9,9 @@ async fn querier_namespace_client() {
let table_name = "the_table";
let router_config = TestConfig::new_router(&database_url);
let ingester_config = TestConfig::new_ingester(&router_config);
let querier_config = TestConfig::new_querier(&ingester_config);
let ingester_config = TestConfig::new_ingester2(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
// Set up the cluster ====================================
let cluster = MiniCluster::new()

View File

@ -1,5 +1,4 @@
pub(crate) mod influxrpc;
mod multi_ingester;
use arrow::datatypes::{DataType, SchemaRef};
use arrow_flight::{

View File

@ -1,179 +0,0 @@
use arrow::{array::as_primitive_array, datatypes::Int64Type, record_batch::RecordBatch};
use data_types::ShardIndex;
use futures::FutureExt;
use influxdb_iox_client::write_info::generated_types::{GetWriteInfoResponse, ShardStatus};
use std::time::Duration;
use test_helpers::timeout::FutureTimeout;
use test_helpers_end_to_end::{
all_readable, combined_token_info, maybe_skip_integration, MiniCluster, Step, StepTest,
StepTestState, TestConfig,
};
#[tokio::test]
/// Test with multiple ingesters
async fn basic_multi_ingesters() {
let database_url = maybe_skip_integration!();
test_helpers::maybe_start_logging();
// write into two different shards: 0 and 1
let router_config = TestConfig::new_router(&database_url).with_new_write_buffer_shards(2);
// ingester gets partition 0
let ingester_config = TestConfig::new_ingester(&router_config).with_shard(ShardIndex::new(0));
let ingester2_config = TestConfig::new_ingester(&router_config).with_shard(ShardIndex::new(1));
let json = format!(
r#"{{
"ingesters": {{
"i1": {{
"addr": "{}"
}},
"i2": {{
"addr": "{}"
}}
}},
"shards": {{
"0": {{
"ingester": "i1"
}},
"1": {{
"ingester": "i2"
}}
}}
}}"#,
ingester_config.ingester_base(),
ingester2_config.ingester_base()
);
let querier_config = TestConfig::new_querier_without_ingester(&ingester_config)
// Configure to talk with both the ingesters
.with_shard_to_ingesters_mapping(&json);
// Set up the cluster ====================================
let mut cluster = MiniCluster::new()
.with_router(router_config)
.await
.with_ingester(ingester_config)
.await
// second ingester
.with_other(ingester2_config)
.await
.with_querier(querier_config)
.await;
// pick 100 table names to spread across both ingesters
let lp_data = (0..100)
.map(|i| format!("table_{i},tag1=A,tag2=B val={i}i 123456"))
.collect::<Vec<_>>()
.join("\n");
let test_steps = vec![
Step::WriteLineProtocol(lp_data),
// wait for data to be readable in ingester2
Step::Custom(Box::new(move |state: &mut StepTestState| {
async {
let combined_response = get_multi_ingester_readable_combined_response(state).await;
// make sure the data in all partitions is readable or
// persisted (and there is none that is unknown)
assert!(
combined_response.shard_infos.iter().all(|info| {
matches!(
info.status(),
ShardStatus::Persisted | ShardStatus::Readable
)
}),
"Not all shards were readable or persisted. Combined responses: {combined_response:?}"
);
}
.boxed()
})),
// spot check results (full validation is in verification_steps)
Step::Query {
sql: "select * from table_5".into(),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 5 |",
"+------+------+--------------------------------+-----+",
],
},
Step::Query {
sql: "select * from table_42".into(),
expected: vec![
"+------+------+--------------------------------+-----+",
"| tag1 | tag2 | time | val |",
"+------+------+--------------------------------+-----+",
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
"+------+------+--------------------------------+-----+",
],
},
]
.into_iter()
// read all the data back out
.chain((0..100).map(|i| Step::VerifiedQuery {
sql: format!("select * from table_{i}"),
verify: Box::new(move |batches: Vec<RecordBatch>| {
println!("Verifing contents of table_{i}");
// results look like this:
// "+------+------+--------------------------------+-----+",
// "| tag1 | tag2 | time | val |",
// "+------+------+--------------------------------+-----+",
// "| A | B | 1970-01-01T00:00:00.000123456Z | val |",
// "+------+------+--------------------------------+-----+",
assert_eq!(batches.len(), 1, "{batches:?}");
assert_eq!(
batches[0].schema().fields()[3].name(),
"val",
"{batches:?}"
);
let array = as_primitive_array::<Int64Type>(batches[0].column(3));
assert_eq!(array.len(), 1);
assert_eq!(array.value(0), i);
}),
}));
// Run the tests
StepTest::new(&mut cluster, test_steps).run().await
}
/// Use the WriteInfo API on the querier that will combine write info from all the ingesters it
/// knows about to get the status of data
async fn get_multi_ingester_readable_combined_response(
state: &mut StepTestState<'_>,
) -> GetWriteInfoResponse {
async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));
let cluster = state.cluster();
let ingester_connections = vec![
cluster.ingester().ingester_grpc_connection(),
cluster.other_servers()[0].ingester_grpc_connection(),
];
loop {
let combined_response =
combined_token_info(state.write_tokens().to_vec(), ingester_connections.clone())
.await;
match combined_response {
Ok(combined_response) => {
if all_readable(&combined_response) {
println!("All data is readable: {combined_response:?}");
return combined_response;
} else {
println!("retrying, not yet readable: {combined_response:?}");
}
}
Err(e) => {
println!("retrying, error getting token status: {e}");
}
}
interval.tick().await;
}
}
// run for at most 10 seconds
.with_timeout_panic(Duration::from_secs(10))
.await
}

View File

@ -244,16 +244,19 @@ async fn remote_partition_and_get_from_store_and_pull() {
// The test below assumes a specific partition id, so use a
// non-shared one here so concurrent tests don't interfere with
// each other
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
let mut cluster = MiniCluster::create_non_shared2(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(String::from(
"my_awesome_table,tag1=A,tag2=B val=42i 123456",
)),
// wait for partitions to be persisted
Step::WaitForPersisted,
Step::WaitForPersisted2 {
expected_increase: 1,
},
// Run the 'remote partition' command
Step::Custom(Box::new(|state: &mut StepTestState| {
async {

View File

@ -173,6 +173,17 @@ impl TestConfig {
.with_default_compactor_options()
}
/// Create a minimal compactor configuration, using the dsn configuration from other
pub fn new_compactor2(other: &TestConfig) -> Self {
Self::new(
ServerType::Compactor2,
other.dsn().to_owned(),
other.catalog_schema_name(),
)
.with_existing_object_store(other)
.with_default_compactor_options()
}
/// Create a minimal querier configuration from the specified
/// ingester configuration, using the same dsn and object store
pub fn new_querier_without_ingester(ingester_config: &TestConfig) -> Self {
@ -204,7 +215,6 @@ impl TestConfig {
/// Create a minimal all in one configuration
pub fn new_all_in_one(dsn: Option<String>) -> Self {
Self::new(ServerType::AllInOne, dsn, random_catalog_schema_name())
.with_new_write_buffer()
.with_new_object_store()
.with_default_ingester_options()
}

View File

@ -69,11 +69,11 @@ impl MiniCluster {
}
}
/// Create a new MiniCluster that shares the same underlying
/// servers but has a new unique namespace and set of connections
/// Create a new MiniCluster that shares the same underlying servers but has a new unique
/// namespace and set of connections
///
/// Note this is an internal implementation -- please use
/// [`create_shared`](Self::create_shared), and [`new`](Self::new) to create new MiniClusters.
/// [`create_shared2`](Self::create_shared2) and [`new`](Self::new) to create new MiniClusters.
fn new_from_fixtures(
router: Option<ServerFixture>,
ingester: Option<ServerFixture>,
@ -98,48 +98,6 @@ impl MiniCluster {
}
}
/// Create a "standard" shared MiniCluster that has a router, ingester,
/// querier (but no compactor as that should be run on-demand in tests)
///
/// Note: Since the underlying server processes are shared across multiple
/// tests so all users of this MiniCluster should only modify
/// their namespace
pub async fn create_shared(database_url: String) -> Self {
let start = Instant::now();
let mut shared_servers = GLOBAL_SHARED_SERVERS.lock().await;
debug!(mutex_wait=?start.elapsed(), "creating standard cluster");
// try to reuse existing server processes
if let Some(shared) = shared_servers.take() {
if let Some(cluster) = shared.creatable_cluster().await {
debug!("Reusing existing cluster");
// Put the server back
*shared_servers = Some(shared);
let start = Instant::now();
// drop the lock prior to calling create() to allow
// others to proceed
std::mem::drop(shared_servers);
let new_self = cluster.create().await;
info!(
total_wait=?start.elapsed(),
"created new new mini cluster from existing cluster"
);
return new_self;
} else {
info!("some server proceses of previous cluster have already returned");
}
}
// Have to make a new one
info!("Create a new server");
let new_cluster = Self::create_non_shared_standard(database_url).await;
// Update the shared servers to point at the newly created server proesses
*shared_servers = Some(SharedServers::new(&new_cluster));
new_cluster
}
/// Create a "standard" shared MiniCluster that has a router, ingester, and querier (but no
/// compactor as that should be run on-demand in tests)
///
@ -262,11 +220,14 @@ impl MiniCluster {
.with_compactor_config(compactor_config)
}
/// Create a non-shared "version 2" "standard" MiniCluster that has a router, ingester, querier.
/// Create a non-shared "version 2" "standard" MiniCluster that has a router, ingester,
/// querier. Save config for a compactor, but the compactor should be run on-demand in tests
/// using `compactor run-once` rather than using `run compactor`.
pub async fn create_non_shared2(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester2(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
let compactor_config = TestConfig::new_compactor2(&ingester_config);
// Set up the cluster ====================================
Self::new()
@ -276,14 +237,18 @@ impl MiniCluster {
.await
.with_querier(querier_config)
.await
.with_compactor_config(compactor_config)
}
/// Create a non-shared "version 2" MiniCluster that has a router, ingester set to essentially
/// never persist data (except on-demand), and querier.
/// never persist data (except on-demand), and querier. Save config for a compactor, but the
/// compactor should be run on-demand in tests using `compactor run-once` rather than using
/// `run compactor`.
pub async fn create_non_shared2_never_persist(database_url: String) -> Self {
let ingester_config = TestConfig::new_ingester2_never_persist(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
let compactor_config = TestConfig::new_compactor2(&ingester_config);
// Set up the cluster ====================================
Self::new()
@ -293,15 +258,16 @@ impl MiniCluster {
.await
.with_querier(querier_config)
.await
.with_compactor_config(compactor_config)
}
/// Create an all-(minus compactor)-in-one server with the specified configuration
pub async fn create_all_in_one(test_config: TestConfig) -> Self {
Self::new()
.with_router(test_config.clone())
.await
.with_ingester(test_config.clone())
.await
.with_router(test_config.clone())
.await
.with_querier(test_config.clone())
.await
}
@ -672,7 +638,6 @@ fn server_from_weak(server: Option<&Weak<TestServer>>) -> Option<Option<Arc<Test
}
}
static GLOBAL_SHARED_SERVERS: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(|| Mutex::new(None));
// For the new server versions. `GLOBAL_SHARED_SERVERS` can be removed and this can be renamed
// when the migration to router2/etc is complete.
static GLOBAL_SHARED_SERVERS2: Lazy<Mutex<Option<SharedServers>>> = Lazy::new(|| Mutex::new(None));

View File

@ -479,7 +479,7 @@ impl TestServer {
}
match server_type {
ServerType::Compactor => {
ServerType::Compactor | ServerType::Compactor2 => {
unimplemented!(
"Don't use a long-running compactor and gRPC in e2e tests; use \
`influxdb_iox compactor run-once` instead"

View File

@ -10,6 +10,7 @@ pub enum ServerType {
Querier,
Querier2,
Compactor,
Compactor2,
}
impl ServerType {
@ -24,6 +25,7 @@ impl ServerType {
Self::Querier => "querier",
Self::Querier2 => "querier",
Self::Compactor => "compactor",
Self::Compactor2 => "compactor2",
}
}
}
@ -146,5 +148,16 @@ fn addr_envs(server_type: ServerType, addrs: &BindAddresses) -> Vec<(&'static st
addrs.compactor_grpc_api().bind_addr().to_string(),
),
],
ServerType::Compactor2 => vec![
(
"INFLUXDB_IOX_BIND_ADDR",
addrs.router_http_api().bind_addr().to_string(),
),
(
"INFLUXDB_IOX_GRPC_BIND_ADDR",
addrs.compactor_grpc_api().bind_addr().to_string(),
),
("INFLUXDB_IOX_RPC_MODE", "2".to_string()),
],
}
}