From e1395f4f355f7a0194e7622225897cd40950ed82 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 12 Jan 2023 11:09:33 -0500 Subject: [PATCH] fix: Move PersistNow to grpc/PersistHandler --- ingester2/src/persist/mod.rs | 1 - ingester2/src/server/grpc.rs | 9 +++++---- .../on_demand.rs => server/grpc/persist.rs} | 18 +++++++++--------- 3 files changed, 14 insertions(+), 14 deletions(-) rename ingester2/src/{persist/on_demand.rs => server/grpc/persist.rs} (74%) diff --git a/ingester2/src/persist/mod.rs b/ingester2/src/persist/mod.rs index 92d4e40463..aa3bff37f0 100644 --- a/ingester2/src/persist/mod.rs +++ b/ingester2/src/persist/mod.rs @@ -4,6 +4,5 @@ mod context; pub(crate) mod drain_buffer; pub(crate) mod handle; pub(crate) mod hot_partitions; -pub(crate) mod on_demand; pub mod queue; mod worker; diff --git a/ingester2/src/server/grpc.rs b/ingester2/src/server/grpc.rs index 599d82a427..21b4463875 100644 --- a/ingester2/src/server/grpc.rs +++ b/ingester2/src/server/grpc.rs @@ -1,5 +1,6 @@ //! gRPC service implementations for `ingester`. +mod persist; mod query; mod rpc_write; @@ -20,12 +21,12 @@ use crate::{ ingest_state::IngestState, init::IngesterRpcInterface, partition_iter::PartitionIter, - persist::{on_demand::PersistNow, queue::PersistQueue}, + persist::queue::PersistQueue, query::{response::QueryResponse, QueryExec}, timestamp_oracle::TimestampOracle, }; -use self::rpc_write::RpcWrite; +use self::{persist::PersistHandler, rpc_write::RpcWrite}; /// This type is responsible for injecting internal dependencies that SHOULD NOT /// leak outside of the ingester crate into public gRPC handlers. @@ -87,7 +88,7 @@ where { type CatalogHandler = CatalogService; type WriteHandler = RpcWrite>; - type PersistHandler = PersistNow, Arc

>; + type PersistHandler = PersistHandler, Arc

>; type FlightHandler = query::FlightService>; /// Acquire a [`CatalogService`] gRPC service implementation. @@ -112,7 +113,7 @@ where /// /// [`PersistService`]: generated_types::influxdata::iox::ingester::v1::persist_service_server::PersistService. fn persist_service(&self) -> PersistServiceServer { - PersistServiceServer::new(PersistNow::new( + PersistServiceServer::new(PersistHandler::new( Arc::clone(&self.buffer), Arc::clone(&self.persist_handle), )) diff --git a/ingester2/src/persist/on_demand.rs b/ingester2/src/server/grpc/persist.rs similarity index 74% rename from ingester2/src/persist/on_demand.rs rename to ingester2/src/server/grpc/persist.rs index 813c9688f0..5da485130d 100644 --- a/ingester2/src/persist/on_demand.rs +++ b/ingester2/src/server/grpc/persist.rs @@ -8,16 +8,16 @@ use generated_types::influxdata::iox::ingester::v1::{ use tonic::{Request, Response}; #[derive(Debug)] -pub(crate) struct PersistNow { +pub(crate) struct PersistHandler { buffer: T, persist_handle: P, } -impl PersistNow +impl PersistHandler where -T: PartitionIter + Sync + 'static, -P: PersistQueue + Clone + Sync + 'static, - { + T: PartitionIter + Sync + 'static, + P: PersistQueue + Clone + Sync + 'static, +{ pub(crate) fn new(buffer: T, persist_handle: P) -> Self { Self { buffer, @@ -27,11 +27,11 @@ P: PersistQueue + Clone + Sync + 'static, } #[tonic::async_trait] -impl PersistService for PersistNow +impl PersistService for PersistHandler where -T: PartitionIter + Sync + 'static, -P: PersistQueue + Clone + Sync + 'static, - { + T: PartitionIter + Sync + 'static, + P: PersistQueue + Clone + Sync + 'static, +{ /// Handle the RPC request to persist immediately. async fn persist( &self,