feat: share server processes in end to end test (#4387)

* feat: share server processes in end to end test

* fix: Apply suggestions from code review

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>

Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-04-28 13:51:14 -04:00 committed by GitHub
parent 5688bd63a3
commit 63df3ceb6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 222 additions and 54 deletions

1
Cargo.lock generated
View File

@ -6074,6 +6074,7 @@ dependencies = [
"http",
"hyper",
"influxdb_iox_client",
"lazy_static",
"nix 0.24.0",
"observability_deps",
"once_cell",

View File

@ -16,8 +16,10 @@ async fn remote_partition_and_get_from_store() {
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_quickly_persisting(database_url).await;
// The test below assumes a specific partition id, so use a
// non-shared one here so concurrent tests don't interfere with
// each other
let mut cluster = MiniCluster::create_non_shared_standard(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -17,7 +17,7 @@ async fn ingester_flight_api() {
let table_name = "mytable";
// Set up cluster
let cluster = MiniCluster::create_standard(database_url).await;
let cluster = MiniCluster::create_shared(database_url).await;
// Write some data into the v2 HTTP API ==============
let lp = format!("{},tag1=A,tag2=B val=42i 123456", table_name);
@ -94,7 +94,7 @@ async fn ingester_flight_api_namespace_not_found() {
let table_name = "mytable";
// Set up cluster
let cluster = MiniCluster::create_standard(database_url).await;
let cluster = MiniCluster::create_shared(database_url).await;
let mut querier_flight = influxdb_iox_client::flight::Client::<
influxdb_iox_client::flight::generated_types::IngesterQueryRequest,
@ -124,7 +124,7 @@ async fn ingester_flight_api_table_not_found() {
let database_url = maybe_skip_integration!();
// Set up cluster
let cluster = MiniCluster::create_standard(database_url).await;
let cluster = MiniCluster::create_shared(database_url).await;
// Write some data into the v2 HTTP API ==============
let lp = String::from("my_table,tag1=A,tag2=B val=42i 123456");

View File

@ -10,7 +10,7 @@ async fn querier_namespace_client() {
let table_name = "the_table";
let router2_config = TestConfig::new_router2(&database_url);
let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
let ingester_config = TestConfig::new_ingester(&router2_config);
let querier_config = TestConfig::new_querier(&ingester_config);
// Set up the cluster ====================================

View File

@ -14,7 +14,7 @@ async fn basic_ingester() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_standard(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -51,7 +51,7 @@ async fn basic_on_parquet() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_quickly_persisting(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -84,7 +84,7 @@ async fn basic_no_ingester_connection() {
let router2_config = TestConfig::new_router2(&database_url);
// fast parquet
let ingester_config = TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
let ingester_config = TestConfig::new_ingester(&router2_config);
// specially create a querier config that is NOT connected to the ingester
let querier_config = TestConfig::new_querier_without_ingester(&ingester_config);
@ -129,7 +129,7 @@ async fn table_not_found_on_ingester() {
let table_name = "the_table";
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_quickly_persisting(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -11,10 +11,11 @@ use test_helpers_end_to_end_ng::{
/// Runs the specified custom function on a cluster with no data
pub(crate) async fn run_no_data_test(custom: FCustom) {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_standard(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(&mut cluster, vec![Step::Custom(custom)])
.run()
@ -23,10 +24,11 @@ pub(crate) async fn run_no_data_test(custom: FCustom) {
/// Run the custom test function with a cluster that has had the data from `generator` loaded
pub(crate) async fn run_data_test(generator: Arc<DataGenerator>, custom: FCustom) {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_standard(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -75,7 +75,7 @@ async fn write_via_grpc() {
}];
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_quickly_persisting(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -12,7 +12,7 @@ async fn ingester_schema_client() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_standard(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
@ -59,7 +59,7 @@ async fn ingester_schema_cli() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let mut cluster = MiniCluster::create_standard(database_url).await;
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -14,6 +14,7 @@ http = "0.2.0"
hyper = "0.14"
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
nix = "0.24"
lazy_static = "1.4.0"
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.10.0", features = ["parking_lot"] }
parking_lot = "0.12"

View File

@ -135,7 +135,7 @@ pub async fn token_is_persisted(
all_persisted(&res)
}
const MAX_QUERY_RETRY_TIME_SEC: u64 = 10;
const MAX_QUERY_RETRY_TIME_SEC: u64 = 20;
/// Waits for the specified predicate to return true
pub async fn wait_for_token<F>(

View File

@ -100,13 +100,6 @@ impl TestConfig {
.with_new_write_buffer()
.with_new_object_store()
.with_default_ingester_options()
.with_fast_parquet_generation()
}
/// Create a configuraton for aggressive creation of parquet files
pub fn with_fast_parquet_generation(self) -> Self {
self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2")
.with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "1")
}
/// Configure tracing capture
@ -140,7 +133,7 @@ impl TestConfig {
/// Adds default ingester options
fn with_default_ingester_options(self) -> Self {
self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "20")
self.with_env("INFLUXDB_IOX_PAUSE_INGEST_SIZE_BYTES", "2000000")
.with_env("INFLUXDB_IOX_PERSIST_MEMORY_THRESHOLD_BYTES", "10")
.with_kafka_partition(0)
}

View File

@ -1,7 +1,17 @@
use crate::{rand_id, write_to_router, write_to_router_grpc, ServerFixture, TestConfig};
use crate::{
rand_id, write_to_router, write_to_router_grpc, ServerFixture, TestConfig, TestServer,
};
use http::Response;
use hyper::Body;
use influxdb_iox_client::write::generated_types::{TableBatch, WriteResponse};
use std::{
sync::{Arc, Weak},
time::Instant,
};
use futures::{stream::FuturesOrdered, StreamExt};
use observability_deps::tracing::{debug, info};
use tokio::sync::Mutex;
/// Structure that holds NG services and helpful accessors
#[derive(Debug, Default)]
@ -41,34 +51,78 @@ impl MiniCluster {
}
}
/// Create a "standard" MiniCluster that has a router, ingester,
/// querier
/// Create a new MiniCluster that shares the same underlying
/// servers but has a new unique namespace and set of connections
///
/// Long term plan is that this will be shared across multiple tests if possible
pub async fn create_standard(database_url: String) -> Self {
let router2_config = TestConfig::new_router2(&database_url);
// fast parquet
let ingester_config =
TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
let querier_config = TestConfig::new_querier(&ingester_config);
/// Note this is an internal implementation -- please use
/// [create_shared], and [new] to create new MiniClusters.
fn new_from_fixtures(
router2: Option<ServerFixture>,
ingester: Option<ServerFixture>,
querier: Option<ServerFixture>,
compactor: Option<ServerFixture>,
) -> Self {
let org_id = rand_id();
let bucket_id = rand_id();
let namespace = format!("{}_{}", org_id, bucket_id);
// Set up the cluster ====================================
Self::new()
.with_router2(router2_config)
.await
.with_ingester(ingester_config)
.await
.with_querier(querier_config)
.await
Self {
router2,
ingester,
querier,
compactor,
other_servers: vec![],
org_id,
bucket_id,
namespace,
}
}
/// return a "standard" MiniCluster that has a router, ingester,
/// querier and quickly persists files to parquet
pub async fn create_quickly_persisting(database_url: String) -> Self {
/// Create a "standard" shared MiniCluster that has a router, ingester,
/// querier
///
/// Note: Since the underlying server processes are shared across multiple
/// tests so all users of this MiniCluster should only modify
/// their namespace
pub async fn create_shared(database_url: String) -> Self {
let start = Instant::now();
let mut shared_servers = GLOBAL_SHARED_SERVERS.lock().await;
debug!(mutex_wait=?start.elapsed(), "creating standard cluster");
// try to reuse existing server processes
if let Some(shared) = shared_servers.take() {
if let Some(cluster) = shared.creatable_cluster().await {
debug!("Reusing existing cluster");
// Put the server back
*shared_servers = Some(shared);
let start = Instant::now();
// drop the lock prior to calling create() to allow
// others to proceed
std::mem::drop(shared_servers);
let new_self = cluster.create().await;
info!(total_wait=?start.elapsed(), "created new new mini cluster from existing cluster");
return new_self;
} else {
info!("some server proceses of previous cluster have already returned");
}
}
// Have to make a new one
info!("Create a new server");
let new_cluster = Self::create_non_shared_standard(database_url).await;
// Update the shared servers to point at the newly created server proesses
*shared_servers = Some(SharedServers::new(&new_cluster));
new_cluster
}
/// Create a non shared "standard" MiniCluster that has a router, ingester,
/// querier
pub async fn create_non_shared_standard(database_url: String) -> Self {
let router2_config = TestConfig::new_router2(&database_url);
// fast parquet
let ingester_config =
TestConfig::new_ingester(&router2_config).with_fast_parquet_generation();
let ingester_config = TestConfig::new_ingester(&router2_config);
let querier_config = TestConfig::new_querier(&ingester_config);
// Set up the cluster ====================================
@ -200,3 +254,108 @@ impl MiniCluster {
self.other_servers.as_ref()
}
}
/// holds shared server processes to share across tests
#[derive(Clone)]
struct SharedServers {
router2: Option<Weak<TestServer>>,
ingester: Option<Weak<TestServer>>,
querier: Option<Weak<TestServer>>,
compactor: Option<Weak<TestServer>>,
}
/// Deferred creaton of a mini cluster
struct CreatableMiniCluster {
router2: Option<Arc<TestServer>>,
ingester: Option<Arc<TestServer>>,
querier: Option<Arc<TestServer>>,
compactor: Option<Arc<TestServer>>,
}
async fn create_if_needed(server: Option<Arc<TestServer>>) -> Option<ServerFixture> {
if let Some(server) = server {
Some(ServerFixture::create_from_existing(server).await)
} else {
None
}
}
impl CreatableMiniCluster {
async fn create(self) -> MiniCluster {
let Self {
router2,
ingester,
querier,
compactor,
} = self;
let mut servers = [
create_if_needed(router2),
create_if_needed(ingester),
create_if_needed(querier),
create_if_needed(compactor),
]
.into_iter()
// Use futures ordered to run them all in parallel (hopfully)
.collect::<FuturesOrdered<_>>()
.collect::<Vec<Option<ServerFixture>>>()
.await
.into_iter();
// ServerFixtures go in the same order as they came out
MiniCluster::new_from_fixtures(
servers.next().unwrap(),
servers.next().unwrap(),
servers.next().unwrap(),
servers.next().unwrap(),
)
}
}
impl SharedServers {
/// Save the server processes in this shared servers as weak references
pub fn new(cluster: &MiniCluster) -> Self {
assert!(
cluster.other_servers.is_empty(),
"other servers not yet handled in shared mini clusters"
);
Self {
router2: cluster.router2.as_ref().map(|c| c.weak()),
ingester: cluster.ingester.as_ref().map(|c| c.weak()),
querier: cluster.querier.as_ref().map(|c| c.weak()),
compactor: cluster.compactor.as_ref().map(|c| c.weak()),
}
}
/// Returns a creatable MiniCluster that will reuse the existing
/// [TestServer]s. Return None if they are no longer active
async fn creatable_cluster(&self) -> Option<CreatableMiniCluster> {
// The goal of the following code is to bail out (return None
// from the function) if any of the optional weak references
// aren't present so that the cluster is recreated correctly
Some(CreatableMiniCluster {
router2: server_from_weak(self.router2.as_ref())?,
ingester: server_from_weak(self.ingester.as_ref())?,
querier: server_from_weak(self.querier.as_ref())?,
compactor: server_from_weak(self.compactor.as_ref())?,
})
}
}
/// Returns None if there was a weak server but we couldn't upgrade.
/// Returns Some(None) if there was no weak server
/// Returns Some(Some(fixture)) if there was a weak server that we can upgrade and make a fixture from
fn server_from_weak(server: Option<&Weak<TestServer>>) -> Option<Option<Arc<TestServer>>> {
if let Some(server) = server.as_ref() {
// return None if can't upgrade
let server = server.upgrade()?;
Some(Some(server))
} else {
Some(None)
}
}
lazy_static::lazy_static! {
static ref GLOBAL_SHARED_SERVERS: Mutex<Option<SharedServers>> = Mutex::new(None);
}

View File

@ -8,7 +8,7 @@ use std::{
path::Path,
process::{Child, Command},
str,
sync::Arc,
sync::{Arc, Weak},
time::Duration,
};
use tempfile::NamedTempFile;
@ -34,15 +34,20 @@ pub struct ServerFixture {
impl ServerFixture {
/// Create a new server fixture and wait for it to be ready. This
/// is called "create" rather than new because it is async and
/// waits. The server is not shared with any other tests.
/// waits.
pub async fn create(test_config: TestConfig) -> Self {
let mut server = TestServer::new(test_config).await;
let server = TestServer::new(test_config).await;
Self::create_from_existing(Arc::new(server)).await
}
/// Create a new server fixture that shares the same TestServer,
/// but has its own connections
pub(crate) async fn create_from_existing(server: Arc<TestServer>) -> Self {
// ensure the server is ready
let connections = server.wait_until_ready().await;
ServerFixture {
server: Arc::new(server),
server,
connections,
}
}
@ -95,6 +100,11 @@ impl ServerFixture {
pub fn router_grpc_base(&self) -> Arc<str> {
self.server.addrs().router_grpc_api().client_base()
}
/// Get a weak reference to the underlying `TestServer`
pub(crate) fn weak(&self) -> Weak<TestServer> {
Arc::downgrade(&self.server)
}
}
/// Represents the current known state of a TestServer
@ -257,7 +267,7 @@ impl TestServer {
}
/// Restarts the tests server process, but does not reconnect clients
async fn restart(&self) {
async fn restart(&mut self) {
let mut ready_guard = self.ready.lock().await;
let mut server_process = self.server_process.lock().await;
kill_politely(&mut server_process.child, Duration::from_secs(5));
@ -335,7 +345,7 @@ impl TestServer {
/// Polls the various services to ensure the server is
/// operational, reestablishing grpc connections, and returning
/// those active connections
async fn wait_until_ready(&mut self) -> Connections {
async fn wait_until_ready(&self) -> Connections {
let mut need_wait_for_startup = false;
{
let mut ready = self.ready.lock().await;