fix: For Ingester2, persist a particular namespace on demand and share MiniClusters

This should hopefully help CI from running out of Postgres
connections 😬

The old architecture will still need to be non-shared and persist
everything.
pull/24376/head
Carol (Nichols || Goulding) 2023-01-20 16:47:03 -05:00
parent f310e01b1a
commit 4658510102
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
9 changed files with 51 additions and 37 deletions

View File

@ -18,6 +18,9 @@ service PersistService {
rpc Persist(PersistRequest) returns (PersistResponse);
}
message PersistRequest {}
message PersistRequest {
// The namespace to persist
string namespace = 1;
}
message PersistResponse {}

View File

@ -903,16 +903,7 @@ mod kafkaless_rpc_write {
let table_name = "the_table";
// Set up the cluster ====================================
let ingester_config = TestConfig::new_ingester2_never_persist(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
let mut cluster = MiniCluster::new()
.with_ingester(ingester_config)
.await
.with_router(router_config)
.await
.with_querier(querier_config)
.await;
let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await;
StepTest::new(
&mut cluster,
@ -951,16 +942,7 @@ mod kafkaless_rpc_write {
let table_name = "the_table";
// Set up the cluster ====================================
let ingester_config = TestConfig::new_ingester2_never_persist(&database_url);
let router_config = TestConfig::new_router2(&ingester_config);
let querier_config = TestConfig::new_querier2(&ingester_config);
let mut cluster = MiniCluster::new()
.with_ingester(ingester_config)
.await
.with_router(router_config)
.await
.with_querier(querier_config)
.await;
let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await;
StepTest::new(
&mut cluster,

View File

@ -98,7 +98,7 @@ trait InfluxRpcTest: Send + Sync + 'static {
.await
}
IoxArchitecture::Kafkaless => {
MiniCluster::create_non_shared2_never_persist(database_url.clone()).await
MiniCluster::create_shared2_never_persist(database_url.clone()).await
}
};

View File

@ -66,10 +66,10 @@ impl TestCase {
for chunk_stage in self.chunk_stage {
info!("Using IoxArchitecture::{arch:?} and ChunkStage::{chunk_stage:?}");
// Setup that differs by architecture and chunk stage. These need to be non-shared
// clusters; if they're shared, then the tests that run in parallel and persist at
// particular times mess with each other because persistence applies to everything in
// the ingester.
// Setup that differs by architecture and chunk stage. In the Kafka architecture,
// these need to be non-shared clusters; if they're shared, then the tests that run
// in parallel and persist at particular times mess with each other because
// persistence applies to everything in the ingester.
let mut cluster = match (arch, chunk_stage) {
(IoxArchitecture::Kafkaful, ChunkStage::Ingester) => {
MiniCluster::create_non_shared_standard_never_persist(database_url.clone())
@ -79,10 +79,10 @@ impl TestCase {
MiniCluster::create_non_shared_standard(database_url.clone()).await
}
(IoxArchitecture::Kafkaless, ChunkStage::Ingester) => {
MiniCluster::create_non_shared2_never_persist(database_url.clone()).await
MiniCluster::create_shared2_never_persist(database_url.clone()).await
}
(IoxArchitecture::Kafkaless, ChunkStage::Parquet) => {
MiniCluster::create_non_shared2(database_url.clone()).await
MiniCluster::create_shared2(database_url.clone()).await
}
(_, ChunkStage::All) => unreachable!("See `impl IntoIterator for ChunkStage`"),
};

View File

@ -21,11 +21,11 @@ impl Client {
}
}
/// Instruct the ingester to persist its data to Parquet. Will block until the data has
/// persisted, which is useful in tests asserting on persisted data. May behave in unexpected
/// ways if used concurrently with writes and ingester WAL rotations.
pub async fn persist(&mut self) -> Result<(), Error> {
self.inner.persist(PersistRequest {}).await?;
/// Instruct the ingester to persist its data for the specified namespace to Parquet. Useful in
/// tests asserting on persisted data. May behave in unexpected ways if used concurrently with
/// writes and ingester WAL rotations.
pub async fn persist(&mut self, namespace: String) -> Result<(), Error> {
self.inner.persist(PersistRequest { namespace }).await?;
Ok(())
}

View File

@ -25,6 +25,9 @@ impl<I: IngestHandler + 'static> PersistService for PersistHandler<I> {
&self,
_request: Request<proto::PersistRequest>,
) -> Result<Response<proto::PersistResponse>, tonic::Status> {
// Even though the request specifies the namespace, persist everything. This means tests
// that use this API need to be using non-shared MiniClusters in order to avoid messing
// with each others' states.
self.ingest_handler.persist_all().await;
Ok(Response::new(proto::PersistResponse {}))

View File

@ -120,6 +120,7 @@ where
PersistServiceServer::new(PersistHandler::new(
Arc::clone(&self.buffer),
Arc::clone(&self.persist_handle),
Arc::clone(&self.catalog),
))
}

View File

@ -5,12 +5,15 @@ use crate::{
use generated_types::influxdata::iox::ingester::v1::{
self as proto, persist_service_server::PersistService,
};
use iox_catalog::interface::Catalog;
use std::sync::Arc;
use tonic::{Request, Response};
#[derive(Debug)]
pub(crate) struct PersistHandler<T, P> {
buffer: T,
persist_handle: P,
catalog: Arc<dyn Catalog>,
}
impl<T, P> PersistHandler<T, P>
@ -18,10 +21,11 @@ where
T: PartitionIter + Sync + 'static,
P: PersistQueue + Clone + Sync + 'static,
{
pub(crate) fn new(buffer: T, persist_handle: P) -> Self {
pub(crate) fn new(buffer: T, persist_handle: P, catalog: Arc<dyn Catalog>) -> Self {
Self {
buffer,
persist_handle,
catalog,
}
}
}
@ -37,9 +41,27 @@ where
/// concurrently with writes and ingester WAL rotations.
async fn persist(
&self,
_request: Request<proto::PersistRequest>,
request: Request<proto::PersistRequest>,
) -> Result<Response<proto::PersistResponse>, tonic::Status> {
persist_partitions(self.buffer.partition_iter(), &self.persist_handle).await;
let request = request.into_inner();
let namespace = self
.catalog
.repositories()
.await
.namespaces()
.get_by_name(&request.namespace)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
.ok_or_else(|| tonic::Status::not_found(&request.namespace))?;
persist_partitions(
self.buffer
.partition_iter()
.filter(|p| p.lock().namespace_id() == namespace.id),
&self.persist_handle,
)
.await;
Ok(Response::new(proto::PersistResponse {}))
}

View File

@ -494,7 +494,10 @@ impl MiniCluster {
let mut ingester_client =
influxdb_iox_client::ingester::Client::new(self.ingester().ingester_grpc_connection());
ingester_client.persist().await.unwrap();
ingester_client
.persist(self.namespace().into())
.await
.unwrap();
}
/// Get a reference to the mini cluster's other servers.