diff --git a/generated_types/protos/influxdata/iox/ingester/v1/write.proto b/generated_types/protos/influxdata/iox/ingester/v1/write.proto index db545d7a1f..2737124889 100644 --- a/generated_types/protos/influxdata/iox/ingester/v1/write.proto +++ b/generated_types/protos/influxdata/iox/ingester/v1/write.proto @@ -18,6 +18,9 @@ service PersistService { rpc Persist(PersistRequest) returns (PersistResponse); } -message PersistRequest {} +message PersistRequest { + // The namespace to persist + string namespace = 1; +} message PersistResponse {} diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index 2280561945..fb145b97f0 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -903,16 +903,7 @@ mod kafkaless_rpc_write { let table_name = "the_table"; // Set up the cluster ==================================== - let ingester_config = TestConfig::new_ingester2_never_persist(&database_url); - let router_config = TestConfig::new_router2(&ingester_config); - let querier_config = TestConfig::new_querier2(&ingester_config); - let mut cluster = MiniCluster::new() - .with_ingester(ingester_config) - .await - .with_router(router_config) - .await - .with_querier(querier_config) - .await; + let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await; StepTest::new( &mut cluster, @@ -951,16 +942,7 @@ mod kafkaless_rpc_write { let table_name = "the_table"; // Set up the cluster ==================================== - let ingester_config = TestConfig::new_ingester2_never_persist(&database_url); - let router_config = TestConfig::new_router2(&ingester_config); - let querier_config = TestConfig::new_querier2(&ingester_config); - let mut cluster = MiniCluster::new() - .with_ingester(ingester_config) - .await - .with_router(router_config) - .await - .with_querier(querier_config) - .await; + let mut cluster = MiniCluster::create_shared2_never_persist(database_url).await; StepTest::new( &mut cluster, diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc.rs index 8facdb19bb..d8e9156e4f 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc.rs @@ -98,7 +98,7 @@ trait InfluxRpcTest: Send + Sync + 'static { .await } IoxArchitecture::Kafkaless => { - MiniCluster::create_non_shared2_never_persist(database_url.clone()).await + MiniCluster::create_shared2_never_persist(database_url.clone()).await } }; diff --git a/influxdb_iox/tests/query_tests2/framework.rs b/influxdb_iox/tests/query_tests2/framework.rs index 67e566e55d..da5f9adc73 100644 --- a/influxdb_iox/tests/query_tests2/framework.rs +++ b/influxdb_iox/tests/query_tests2/framework.rs @@ -66,10 +66,10 @@ impl TestCase { for chunk_stage in self.chunk_stage { info!("Using IoxArchitecture::{arch:?} and ChunkStage::{chunk_stage:?}"); - // Setup that differs by architecture and chunk stage. These need to be non-shared - // clusters; if they're shared, then the tests that run in parallel and persist at - // particular times mess with each other because persistence applies to everything in - // the ingester. + // Setup that differs by architecture and chunk stage. In the Kafka architecture, + // these need to be non-shared clusters; if they're shared, then the tests that run + // in parallel and persist at particular times mess with each other because + // persistence applies to everything in the ingester. let mut cluster = match (arch, chunk_stage) { (IoxArchitecture::Kafkaful, ChunkStage::Ingester) => { MiniCluster::create_non_shared_standard_never_persist(database_url.clone()) @@ -79,10 +79,10 @@ impl TestCase { MiniCluster::create_non_shared_standard(database_url.clone()).await } (IoxArchitecture::Kafkaless, ChunkStage::Ingester) => { - MiniCluster::create_non_shared2_never_persist(database_url.clone()).await + MiniCluster::create_shared2_never_persist(database_url.clone()).await } (IoxArchitecture::Kafkaless, ChunkStage::Parquet) => { - MiniCluster::create_non_shared2(database_url.clone()).await + MiniCluster::create_shared2(database_url.clone()).await } (_, ChunkStage::All) => unreachable!("See `impl IntoIterator for ChunkStage`"), }; diff --git a/influxdb_iox_client/src/client/ingester.rs b/influxdb_iox_client/src/client/ingester.rs index 3c550538a9..0d33f5bac0 100644 --- a/influxdb_iox_client/src/client/ingester.rs +++ b/influxdb_iox_client/src/client/ingester.rs @@ -21,11 +21,11 @@ impl Client { } } - /// Instruct the ingester to persist its data to Parquet. Will block until the data has - /// persisted, which is useful in tests asserting on persisted data. May behave in unexpected - /// ways if used concurrently with writes and ingester WAL rotations. - pub async fn persist(&mut self) -> Result<(), Error> { - self.inner.persist(PersistRequest {}).await?; + /// Instruct the ingester to persist its data for the specified namespace to Parquet. Useful in + /// tests asserting on persisted data. May behave in unexpected ways if used concurrently with + /// writes and ingester WAL rotations. + pub async fn persist(&mut self, namespace: String) -> Result<(), Error> { + self.inner.persist(PersistRequest { namespace }).await?; Ok(()) } diff --git a/ingester/src/server/grpc/persist.rs b/ingester/src/server/grpc/persist.rs index 9eaee5850b..bccfa52e31 100644 --- a/ingester/src/server/grpc/persist.rs +++ b/ingester/src/server/grpc/persist.rs @@ -25,6 +25,9 @@ impl PersistService for PersistHandler { &self, _request: Request, ) -> Result, tonic::Status> { + // Even though the request specifies the namespace, persist everything. This means tests + // that use this API need to be using non-shared MiniClusters in order to avoid messing + // with each others' states. self.ingest_handler.persist_all().await; Ok(Response::new(proto::PersistResponse {})) diff --git a/ingester2/src/server/grpc.rs b/ingester2/src/server/grpc.rs index 400392c551..69f0d58158 100644 --- a/ingester2/src/server/grpc.rs +++ b/ingester2/src/server/grpc.rs @@ -120,6 +120,7 @@ where PersistServiceServer::new(PersistHandler::new( Arc::clone(&self.buffer), Arc::clone(&self.persist_handle), + Arc::clone(&self.catalog), )) } diff --git a/ingester2/src/server/grpc/persist.rs b/ingester2/src/server/grpc/persist.rs index a9586dd437..2431132fa9 100644 --- a/ingester2/src/server/grpc/persist.rs +++ b/ingester2/src/server/grpc/persist.rs @@ -5,12 +5,15 @@ use crate::{ use generated_types::influxdata::iox::ingester::v1::{ self as proto, persist_service_server::PersistService, }; +use iox_catalog::interface::Catalog; +use std::sync::Arc; use tonic::{Request, Response}; #[derive(Debug)] pub(crate) struct PersistHandler { buffer: T, persist_handle: P, + catalog: Arc, } impl PersistHandler @@ -18,10 +21,11 @@ where T: PartitionIter + Sync + 'static, P: PersistQueue + Clone + Sync + 'static, { - pub(crate) fn new(buffer: T, persist_handle: P) -> Self { + pub(crate) fn new(buffer: T, persist_handle: P, catalog: Arc) -> Self { Self { buffer, persist_handle, + catalog, } } } @@ -37,9 +41,27 @@ where /// concurrently with writes and ingester WAL rotations. async fn persist( &self, - _request: Request, + request: Request, ) -> Result, tonic::Status> { - persist_partitions(self.buffer.partition_iter(), &self.persist_handle).await; + let request = request.into_inner(); + + let namespace = self + .catalog + .repositories() + .await + .namespaces() + .get_by_name(&request.namespace) + .await + .map_err(|e| tonic::Status::internal(e.to_string()))? + .ok_or_else(|| tonic::Status::not_found(&request.namespace))?; + + persist_partitions( + self.buffer + .partition_iter() + .filter(|p| p.lock().namespace_id() == namespace.id), + &self.persist_handle, + ) + .await; Ok(Response::new(proto::PersistResponse {})) } diff --git a/test_helpers_end_to_end/src/mini_cluster.rs b/test_helpers_end_to_end/src/mini_cluster.rs index b14feec9d4..b4c8b87fb0 100644 --- a/test_helpers_end_to_end/src/mini_cluster.rs +++ b/test_helpers_end_to_end/src/mini_cluster.rs @@ -494,7 +494,10 @@ impl MiniCluster { let mut ingester_client = influxdb_iox_client::ingester::Client::new(self.ingester().ingester_grpc_connection()); - ingester_client.persist().await.unwrap(); + ingester_client + .persist(self.namespace().into()) + .await + .unwrap(); } /// Get a reference to the mini cluster's other servers.