feat: expose ShardService over gRPC

Plumbs in the ShardService impl, and exposes it over the router's gRPC
interface.
pull/24376/head
Dom Dwyer 2022-08-24 15:18:47 +02:00
parent 3594e5d095
commit 7afa3bfaec
4 changed files with 94 additions and 25 deletions

View File

@ -26,9 +26,13 @@ use router::{
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache, metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
}, },
sequencer::Sequencer, sequencer::Sequencer,
server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer}, server::{
grpc::{sharder::ShardService, GrpcDelegate},
http::HttpDelegate,
RouterServer,
},
}; };
use sharder::JumpHash; use sharder::{JumpHash, Sharder};
use std::{ use std::{
collections::BTreeSet, collections::BTreeSet,
fmt::{Debug, Display}, fmt::{Debug, Display},
@ -52,18 +56,24 @@ pub enum Error {
#[error("No sequencer shards found in Catalog")] #[error("No sequencer shards found in Catalog")]
Sharder, Sharder,
#[error("Failed to find topic with name '{topic_name}' in catalog")]
TopicCatalogLookup { topic_name: String },
#[error("Failed to init shard grpc service: {0}")]
ShardServiceInit(iox_catalog::interface::Error),
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
pub struct RouterServerType<D> { pub struct RouterServerType<D, S> {
server: RouterServer<D>, server: RouterServer<D, S>,
shutdown: CancellationToken, shutdown: CancellationToken,
trace_collector: Option<Arc<dyn TraceCollector>>, trace_collector: Option<Arc<dyn TraceCollector>>,
} }
impl<D> RouterServerType<D> { impl<D, S> RouterServerType<D, S> {
pub fn new(server: RouterServer<D>, common_state: &CommonServerState) -> Self { pub fn new(server: RouterServer<D, S>, common_state: &CommonServerState) -> Self {
Self { Self {
server, server,
shutdown: CancellationToken::new(), shutdown: CancellationToken::new(),
@ -72,16 +82,17 @@ impl<D> RouterServerType<D> {
} }
} }
impl<D> std::fmt::Debug for RouterServerType<D> { impl<D, S> std::fmt::Debug for RouterServerType<D, S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Router") write!(f, "Router")
} }
} }
#[async_trait] #[async_trait]
impl<D> ServerType for RouterServerType<D> impl<D, S> ServerType for RouterServerType<D, S>
where where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static, D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
S: Sharder<(), Item = Arc<Sequencer>> + Clone + 'static,
{ {
/// Return the [`metric::Registry`] used by the router. /// Return the [`metric::Registry`] used by the router.
fn metric_registry(&self) -> Arc<Registry> { fn metric_registry(&self) -> Arc<Registry> {
@ -117,6 +128,7 @@ where
add_service!(builder, self.server.grpc().schema_service()); add_service!(builder, self.server.grpc().schema_service());
add_service!(builder, self.server.grpc().catalog_service()); add_service!(builder, self.server.grpc().catalog_service());
add_service!(builder, self.server.grpc().object_store_service()); add_service!(builder, self.server.grpc().object_store_service());
add_service!(builder, self.server.grpc().shard_service());
serve_builder!(builder); serve_builder!(builder);
Ok(()) Ok(())
@ -163,7 +175,7 @@ pub async fn create_router_server_type(
) -> Result<Arc<dyn ServerType>> { ) -> Result<Arc<dyn ServerType>> {
// Initialise the sharded write buffer and instrument it with DML handler // Initialise the sharded write buffer and instrument it with DML handler
// metrics. // metrics.
let write_buffer = init_write_buffer( let (write_buffer, sharder) = init_write_buffer(
write_buffer_config, write_buffer_config,
Arc::clone(&metrics), Arc::clone(&metrics),
common_state.trace_collector(), common_state.trace_collector(),
@ -242,7 +254,7 @@ pub async fn create_router_server_type(
txn.commit().await?; txn.commit().await?;
let ns_creator = NamespaceAutocreation::new( let ns_creator = NamespaceAutocreation::new(
catalog, Arc::clone(&catalog),
ns_cache, ns_cache,
topic_id, topic_id,
query_id, query_id,
@ -274,6 +286,9 @@ pub async fn create_router_server_type(
// Record the overall request handling latency // Record the overall request handling latency
let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack); let handler_stack = InstrumentationDecorator::new("request", &*metrics, handler_stack);
// Initialise the shard-mapping gRPC service.
let shard_service = init_shard_service(sharder, write_buffer_config, catalog).await?;
// Initialise the API delegates, sharing the handler stack between them. // Initialise the API delegates, sharing the handler stack between them.
let handler_stack = Arc::new(handler_stack); let handler_stack = Arc::new(handler_stack);
let http = HttpDelegate::new( let http = HttpDelegate::new(
@ -287,6 +302,7 @@ pub async fn create_router_server_type(
schema_catalog, schema_catalog,
object_store, object_store,
Arc::clone(&metrics), Arc::clone(&metrics),
shard_service,
); );
let router_server = RouterServer::new(http, grpc, metrics, common_state.trace_collector()); let router_server = RouterServer::new(http, grpc, metrics, common_state.trace_collector());
@ -297,11 +313,16 @@ pub async fn create_router_server_type(
/// Initialise the [`ShardedWriteBuffer`] with one shard per Kafka partition, /// Initialise the [`ShardedWriteBuffer`] with one shard per Kafka partition,
/// using [`JumpHash`] to shard operations by their destination namespace & /// using [`JumpHash`] to shard operations by their destination namespace &
/// table name. /// table name.
///
/// Returns both the DML handler and the sharder it uses.
async fn init_write_buffer( async fn init_write_buffer(
write_buffer_config: &WriteBufferConfig, write_buffer_config: &WriteBufferConfig,
metrics: Arc<metric::Registry>, metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>, trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<ShardedWriteBuffer<JumpHash<Arc<Sequencer>>>> { ) -> Result<(
ShardedWriteBuffer<Arc<JumpHash<Arc<Sequencer>>>>,
Arc<JumpHash<Arc<Sequencer>>>,
)> {
let write_buffer = Arc::new( let write_buffer = Arc::new(
write_buffer_config write_buffer_config
.writing(Arc::clone(&metrics), trace_collector) .writing(Arc::clone(&metrics), trace_collector)
@ -326,7 +347,9 @@ async fn init_write_buffer(
return Err(Error::Sharder); return Err(Error::Sharder);
} }
Ok(ShardedWriteBuffer::new(JumpHash::new( // Initialise the sharder that maps (table, namespace, payload) to kafka
// partitions.
let sharder = Arc::new(JumpHash::new(
shards shards
.into_iter() .into_iter()
.map(|id| { .map(|id| {
@ -337,7 +360,34 @@ async fn init_write_buffer(
) )
}) })
.map(Arc::new), .map(Arc::new),
))) ));
Ok((ShardedWriteBuffer::new(Arc::clone(&sharder)), sharder))
}
async fn init_shard_service<S>(
sharder: S,
write_buffer_config: &WriteBufferConfig,
catalog: Arc<dyn Catalog>,
) -> Result<ShardService<S>>
where
S: Send + Sync,
{
// Get the KafkaTopic from the catalog for the configured topic.
let topic = catalog
.repositories()
.await
.kafka_topics()
.get_by_name(write_buffer_config.topic())
.await?
.ok_or_else(|| Error::TopicCatalogLookup {
topic_name: write_buffer_config.topic().to_string(),
})?;
// Initialise the sharder
ShardService::new(sharder, topic, catalog)
.await
.map_err(Error::ShardServiceInit)
} }
/// Pre-populate `cache` with the all existing schemas in `catalog`. /// Pre-populate `cache` with the all existing schemas in `catalog`.

View File

@ -13,20 +13,20 @@ pub mod http;
/// The [`RouterServer`] manages the lifecycle and contains all state for a /// The [`RouterServer`] manages the lifecycle and contains all state for a
/// `router` server instance. /// `router` server instance.
#[derive(Debug)] #[derive(Debug)]
pub struct RouterServer<D> { pub struct RouterServer<D, S> {
metrics: Arc<metric::Registry>, metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>, trace_collector: Option<Arc<dyn TraceCollector>>,
http: HttpDelegate<D>, http: HttpDelegate<D>,
grpc: GrpcDelegate<D>, grpc: GrpcDelegate<D, S>,
} }
impl<D> RouterServer<D> { impl<D, S> RouterServer<D, S> {
/// Initialise a new [`RouterServer`] using the provided HTTP and gRPC /// Initialise a new [`RouterServer`] using the provided HTTP and gRPC
/// handlers. /// handlers.
pub fn new( pub fn new(
http: HttpDelegate<D>, http: HttpDelegate<D>,
grpc: GrpcDelegate<D>, grpc: GrpcDelegate<D, S>,
metrics: Arc<metric::Registry>, metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>, trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self { ) -> Self {
@ -49,7 +49,7 @@ impl<D> RouterServer<D> {
} }
} }
impl<D> RouterServer<D> impl<D, S> RouterServer<D, S>
where where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>>, D: DmlHandler<WriteInput = HashMap<String, MutableBatch>>,
{ {
@ -59,7 +59,7 @@ where
} }
/// Get a reference to the router grpc delegate. /// Get a reference to the router grpc delegate.
pub fn grpc(&self) -> &GrpcDelegate<D> { pub fn grpc(&self) -> &GrpcDelegate<D, S> {
&self.grpc &self.grpc
} }
} }

View File

@ -2,11 +2,15 @@
pub mod sharder; pub mod sharder;
use crate::dml_handlers::{DmlError, DmlHandler, PartitionError}; use crate::{
dml_handlers::{DmlError, DmlHandler, PartitionError},
sequencer::Sequencer,
};
use ::sharder::Sharder;
use generated_types::{ use generated_types::{
google::FieldViolation, google::FieldViolation,
influxdata::{ influxdata::{
iox::{catalog::v1::*, object_store::v1::*, schema::v1::*}, iox::{catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*},
pbdata::v1::*, pbdata::v1::*,
}, },
}; };
@ -25,39 +29,45 @@ use tonic::{metadata::AsciiMetadataValue, Request, Response, Status};
use trace::ctx::SpanContext; use trace::ctx::SpanContext;
use write_summary::WriteSummary; use write_summary::WriteSummary;
use self::sharder::ShardService;
// HERE BE DRAGONS: Uppercase characters in this constant cause a panic. Insert them and // HERE BE DRAGONS: Uppercase characters in this constant cause a panic. Insert them and
// investigate the cause if you dare. // investigate the cause if you dare.
const WRITE_TOKEN_GRPC_HEADER: &str = "x-iox-write-token"; const WRITE_TOKEN_GRPC_HEADER: &str = "x-iox-write-token";
/// This type is responsible for managing all gRPC services exposed by `router`. /// This type is responsible for managing all gRPC services exposed by `router`.
#[derive(Debug)] #[derive(Debug)]
pub struct GrpcDelegate<D> { pub struct GrpcDelegate<D, S> {
dml_handler: Arc<D>, dml_handler: Arc<D>,
catalog: Arc<dyn Catalog>, catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>, object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>, metrics: Arc<metric::Registry>,
shard_service: ShardService<S>,
} }
impl<D> GrpcDelegate<D> { impl<D, S> GrpcDelegate<D, S> {
/// Initialise a new gRPC handler, dispatching DML operations to `dml_handler`. /// Initialise a new gRPC handler, dispatching DML operations to `dml_handler`.
pub fn new( pub fn new(
dml_handler: Arc<D>, dml_handler: Arc<D>,
catalog: Arc<dyn Catalog>, catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>, object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>, metrics: Arc<metric::Registry>,
shard_service: ShardService<S>,
) -> Self { ) -> Self {
Self { Self {
dml_handler, dml_handler,
catalog, catalog,
object_store, object_store,
metrics, metrics,
shard_service,
} }
} }
} }
impl<D> GrpcDelegate<D> impl<D, S> GrpcDelegate<D, S>
where where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static, D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
S: Sharder<(), Item = Arc<Sequencer>> + Clone + 'static,
{ {
/// Acquire a [`WriteService`] gRPC service implementation. /// Acquire a [`WriteService`] gRPC service implementation.
/// ///
@ -105,6 +115,15 @@ where
Arc::clone(&self.object_store), Arc::clone(&self.object_store),
)) ))
} }
/// Return a gRPC [`ShardService`] handler.
///
/// [`ShardService`]: generated_types::influxdata::iox::sharder::v1::shard_service_server::ShardService
pub fn shard_service(
&self,
) -> shard_service_server::ShardServiceServer<impl shard_service_server::ShardService> {
shard_service_server::ShardServiceServer::new(self.shard_service.clone())
}
} }
#[derive(Debug)] #[derive(Debug)]

View File

@ -26,7 +26,7 @@ use crate::sequencer::Sequencer;
/// ///
/// [gRPC endpoint]: generated_types::influxdata::iox::sharder::v1::shard_service_server::ShardService /// [gRPC endpoint]: generated_types::influxdata::iox::sharder::v1::shard_service_server::ShardService
/// [`ShardedWriteBuffer`]: crate::dml_handlers::ShardedWriteBuffer /// [`ShardedWriteBuffer`]: crate::dml_handlers::ShardedWriteBuffer
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct ShardService<S> { pub struct ShardService<S> {
sharder: S, sharder: S,