fix: Move PersistNow to grpc/PersistHandler

pull/24376/head
Carol (Nichols || Goulding) 2023-01-12 11:09:33 -05:00
parent 642bab5db3
commit e1395f4f35
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 14 additions and 14 deletions

View File

@ -4,6 +4,5 @@ mod context;
pub(crate) mod drain_buffer;
pub(crate) mod handle;
pub(crate) mod hot_partitions;
pub(crate) mod on_demand;
pub mod queue;
mod worker;

View File

@ -1,5 +1,6 @@
//! gRPC service implementations for `ingester`.
mod persist;
mod query;
mod rpc_write;
@ -20,12 +21,12 @@ use crate::{
ingest_state::IngestState,
init::IngesterRpcInterface,
partition_iter::PartitionIter,
persist::{on_demand::PersistNow, queue::PersistQueue},
persist::queue::PersistQueue,
query::{response::QueryResponse, QueryExec},
timestamp_oracle::TimestampOracle,
};
use self::rpc_write::RpcWrite;
use self::{persist::PersistHandler, rpc_write::RpcWrite};
/// This type is responsible for injecting internal dependencies that SHOULD NOT
/// leak outside of the ingester crate into public gRPC handlers.
@ -87,7 +88,7 @@ where
{
type CatalogHandler = CatalogService;
type WriteHandler = RpcWrite<Arc<D>>;
type PersistHandler = PersistNow<Arc<T>, Arc<P>>;
type PersistHandler = PersistHandler<Arc<T>, Arc<P>>;
type FlightHandler = query::FlightService<Arc<Q>>;
/// Acquire a [`CatalogService`] gRPC service implementation.
@ -112,7 +113,7 @@ where
///
/// [`PersistService`]: generated_types::influxdata::iox::ingester::v1::persist_service_server::PersistService.
fn persist_service(&self) -> PersistServiceServer<Self::PersistHandler> {
PersistServiceServer::new(PersistNow::new(
PersistServiceServer::new(PersistHandler::new(
Arc::clone(&self.buffer),
Arc::clone(&self.persist_handle),
))

View File

@ -8,16 +8,16 @@ use generated_types::influxdata::iox::ingester::v1::{
use tonic::{Request, Response};
#[derive(Debug)]
pub(crate) struct PersistNow<T, P> {
pub(crate) struct PersistHandler<T, P> {
buffer: T,
persist_handle: P,
}
impl<T, P> PersistNow<T, P>
impl<T, P> PersistHandler<T, P>
where
T: PartitionIter + Sync + 'static,
P: PersistQueue + Clone + Sync + 'static,
{
T: PartitionIter + Sync + 'static,
P: PersistQueue + Clone + Sync + 'static,
{
pub(crate) fn new(buffer: T, persist_handle: P) -> Self {
Self {
buffer,
@ -27,11 +27,11 @@ P: PersistQueue + Clone + Sync + 'static,
}
#[tonic::async_trait]
impl<T, P> PersistService for PersistNow<T, P>
impl<T, P> PersistService for PersistHandler<T, P>
where
T: PartitionIter + Sync + 'static,
P: PersistQueue + Clone + Sync + 'static,
{
T: PartitionIter + Sync + 'static,
P: PersistQueue + Clone + Sync + 'static,
{
/// Handle the RPC request to persist immediately.
async fn persist(
&self,