Merge pull request #6005 from influxdata/cn/add-catalog-service-everywhere

feat: Add the catalog service to ingester, querier, and compactor
pull/24376/head
kodiakhq[bot] 2022-10-28 16:30:46 +00:00 committed by GitHub
commit 71dd3b5fa5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 108 additions and 34 deletions

3
Cargo.lock generated
View File

@ -780,6 +780,7 @@ dependencies = [
"parquet_file",
"predicate",
"schema",
"service_grpc_catalog",
"snafu",
"test_helpers",
"thiserror",
@ -2307,6 +2308,7 @@ dependencies = [
"prost 0.11.0",
"rand",
"schema",
"service_grpc_catalog",
"snafu",
"test_helpers",
"thiserror",
@ -3928,6 +3930,7 @@ dependencies = [
"rand",
"schema",
"service_common",
"service_grpc_catalog",
"service_grpc_schema",
"sharder",
"snafu",

View File

@ -15,16 +15,17 @@ datafusion = { workspace = true }
futures = "0.3"
generated_types = { path = "../generated_types" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
object_store = "0.5.1"
observability_deps = { path = "../observability_deps" }
parquet_file = { path = "../parquet_file" }
predicate = { path = "../predicate" }
iox_query = { path = "../iox_query" }
schema = { path = "../schema" }
service_grpc_catalog = { path = "../service_grpc_catalog"}
snafu = "0.7"
thiserror = "1.0"
iox_time = { path = "../iox_time" }
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = { version = "0.7.4" }
tonic = { version = "0.8" }

View File

@ -9,7 +9,7 @@ pub mod grpc;
/// The [`CompactorServer`] manages the lifecycle and contains all state for a
/// `compactor` server instance.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct CompactorServer<C: CompactorHandler> {
metrics: Arc<metric::Registry>,

View File

@ -4,24 +4,33 @@ use crate::handler::{
CompactorHandler, DeleteSkippedCompactionsError, ListSkippedCompactionsError,
};
use data_types::PartitionId;
use generated_types::influxdata::iox::compactor::v1::{
self as proto,
compaction_service_server::{CompactionService, CompactionServiceServer},
use generated_types::influxdata::iox::{
catalog::v1::*,
compactor::v1::{
self as proto,
compaction_service_server::{CompactionService, CompactionServiceServer},
},
};
use iox_catalog::interface::Catalog;
use service_grpc_catalog::CatalogService;
use std::sync::Arc;
use tonic::{Request, Response};
/// This type is responsible for managing all gRPC services exposed by `compactor`.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct GrpcDelegate<I: CompactorHandler> {
catalog: Arc<dyn Catalog>,
compactor_handler: Arc<I>,
}
impl<I: CompactorHandler + Send + Sync + 'static> GrpcDelegate<I> {
/// Initialise a new [`GrpcDelegate`] passing valid requests to the specified
/// `compactor_handler`.
pub fn new(compactor_handler: Arc<I>) -> Self {
Self { compactor_handler }
pub fn new(catalog: Arc<dyn Catalog>, compactor_handler: Arc<I>) -> Self {
Self {
catalog,
compactor_handler,
}
}
/// Acquire a Compaction gRPC service implementation.
@ -30,6 +39,18 @@ impl<I: CompactorHandler + Send + Sync + 'static> GrpcDelegate<I> {
Arc::clone(&self.compactor_handler) as _,
))
}
/// Acquire a [`CatalogService`] gRPC service implementation.
///
/// [`CatalogService`]: generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService.
pub fn catalog_service(
&self,
) -> catalog_service_server::CatalogServiceServer<impl catalog_service_server::CatalogService>
{
catalog_service_server::CatalogServiceServer::new(CatalogService::new(Arc::clone(
&self.catalog,
)))
}
}
/// Implementation of skipped compaction

View File

@ -12,42 +12,43 @@ arrow_util = { path = "../arrow_util" }
async-trait = "0.1.58"
backoff = { path = "../backoff" }
bytes = "1.2"
chrono = { version = "0.4", default-features = false }
data_types = { path = "../data_types" }
datafusion = { workspace = true }
datafusion_util = { path = "../datafusion_util" }
data_types = { path = "../data_types" }
dml = { path = "../dml" }
flatbuffers = "2.1.2"
futures = "0.3"
generated_types = { path = "../generated_types" }
chrono = { version = "0.4", default-features = false }
dml = { path = "../dml" }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch"}
mutable_batch_lp = { path = "../mutable_batch_lp" }
object_store = "0.5.1"
observability_deps = { path = "../observability_deps" }
once_cell = "1"
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }
pin-project = "1.0"
predicate = { path = "../predicate" }
prost = "0.11"
iox_query = { path = "../iox_query" }
rand = "0.8.5"
schema = { path = "../schema" }
service_grpc_catalog = { path = "../service_grpc_catalog"}
snafu = "0.7"
thiserror = "1.0"
iox_time = { path = "../iox_time" }
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = { version = "0.7.4" }
tonic = { version = "0.8" }
trace = { path = "../trace" }
tracker = { path = "../tracker" }
uuid = { version = "1", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"}
write_buffer = { path = "../write_buffer" }
write_summary = { path = "../write_summary" }
tokio-util = { version = "0.7.4" }
trace = { path = "../trace" }
rand = "0.8.5"
once_cell = "1"
[dev-dependencies]
assert_matches = "1.5.0"

View File

@ -10,7 +10,7 @@ pub mod http;
/// The [`IngesterServer`] manages the lifecycle and contains all state for a
/// `ingester` server instance.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct IngesterServer<I: IngestHandler> {
metrics: Arc<metric::Registry>,

View File

@ -12,13 +12,18 @@ use arrow_flight::{
};
use flatbuffers::FlatBufferBuilder;
use futures::Stream;
use generated_types::influxdata::iox::ingester::v1::{
self as proto,
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
use generated_types::influxdata::iox::{
catalog::v1::*,
ingester::v1::{
self as proto,
write_info_service_server::{WriteInfoService, WriteInfoServiceServer},
},
};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::{debug, info, warn};
use pin_project::pin_project;
use prost::Message;
use service_grpc_catalog::CatalogService;
use snafu::{ResultExt, Snafu};
use std::{
pin::Pin,
@ -33,8 +38,9 @@ use trace::{ctx::SpanContext, span::SpanExt};
use write_summary::WriteSummary;
/// This type is responsible for managing all gRPC services exposed by `ingester`.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct GrpcDelegate<I: IngestHandler> {
catalog: Arc<dyn Catalog>,
ingest_handler: Arc<I>,
/// How many `do_get` flight requests should panic for testing purposes.
@ -45,8 +51,13 @@ pub struct GrpcDelegate<I: IngestHandler> {
impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
/// Initialise a new [`GrpcDelegate`] passing valid requests to the specified `ingest_handler`.
pub fn new(ingest_handler: Arc<I>, test_flight_do_get_panic: Arc<AtomicU64>) -> Self {
pub fn new(
catalog: Arc<dyn Catalog>,
ingest_handler: Arc<I>,
test_flight_do_get_panic: Arc<AtomicU64>,
) -> Self {
Self {
catalog,
ingest_handler,
test_flight_do_get_panic,
}
@ -66,6 +77,18 @@ impl<I: IngestHandler + Send + Sync + 'static> GrpcDelegate<I> {
Arc::clone(&self.ingest_handler) as _
))
}
/// Acquire a [`CatalogService`] gRPC service implementation.
///
/// [`CatalogService`]: generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService.
pub fn catalog_service(
&self,
) -> catalog_service_server::CatalogServiceServer<impl catalog_service_server::CatalogService>
{
catalog_service_server::CatalogServiceServer::new(CatalogService::new(Arc::clone(
&self.catalog,
)))
}
}
/// Implementation of write info

View File

@ -83,10 +83,13 @@ impl<C: CompactorHandler + std::fmt::Debug + 'static> ServerType for CompactorSe
Err(Box::new(IoxHttpError::NotFound))
}
/// Provide a placeholder gRPC service.
/// Configure the gRPC services.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.grpc().compaction_service());
add_service!(builder, self.server.grpc().catalog_service());
serve_builder!(builder);
Ok(())
@ -139,6 +142,7 @@ pub async fn create_compactor_server_type(
time_provider: Arc<dyn TimeProvider>,
compactor_config: CompactorConfig,
) -> Result<Arc<dyn ServerType>> {
let grpc_catalog = Arc::clone(&catalog);
let compactor = build_compactor_from_config(
compactor_config,
catalog,
@ -151,7 +155,7 @@ pub async fn create_compactor_server_type(
let compactor_handler = Arc::new(CompactorHandlerImpl::new(Arc::new(compactor)));
let grpc = GrpcDelegate::new(Arc::clone(&compactor_handler));
let grpc = GrpcDelegate::new(grpc_catalog, Arc::clone(&compactor_handler));
let compactor = CompactorServer::new(metric_registry, grpc, compactor_handler);
Ok(Arc::new(CompactorServerType::new(compactor, common_state)))

View File

@ -88,11 +88,14 @@ impl<I: IngestHandler + Sync + Send + Debug + 'static> ServerType for IngesterSe
Err(Box::new(IoxHttpError::NotFound))
}
/// Provide a placeholder gRPC service.
/// Configure the gRPC services.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.grpc().flight_service());
add_service!(builder, self.server.grpc().write_info_service());
add_service!(builder, self.server.grpc().catalog_service());
serve_builder!(builder);
Ok(())
@ -185,6 +188,7 @@ pub async fn create_ingester_server_type(
Duration::from_secs(ingester_config.persist_partition_cold_threshold_seconds),
ingester_config.persist_partition_rows_max,
);
let grpc_catalog = Arc::clone(&catalog);
let ingest_handler = Arc::new(
IngestHandlerImpl::new(
lifecycle_config,
@ -202,6 +206,7 @@ pub async fn create_ingester_server_type(
);
let http = HttpDelegate::new(Arc::clone(&ingest_handler));
let grpc = GrpcDelegate::new(
grpc_catalog,
Arc::clone(&ingest_handler),
Arc::new(AtomicU64::new(ingester_config.test_flight_do_get_panic)),
);

View File

@ -74,7 +74,7 @@ impl<C: QuerierHandler + std::fmt::Debug + 'static> ServerType for QuerierServer
Err(Box::new(IoxHttpError::NotFound))
}
/// Provide a placeholder gRPC service.
/// Configure the gRPC services.
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(
@ -94,6 +94,8 @@ impl<C: QuerierHandler + std::fmt::Debug + 'static> ServerType for QuerierServer
rpc::write_info::write_info_service(Arc::clone(&self.database))
);
add_service!(builder, self.server.handler().schema_service());
add_service!(builder, self.server.handler().catalog_service());
serve_builder!(builder);
Ok(())

View File

@ -19,6 +19,8 @@ futures = "0.3"
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
object_store = "0.5.1"
observability_deps = { path = "../observability_deps" }
@ -26,15 +28,14 @@ parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }
pin-project = "1.0"
predicate = { path = "../predicate" }
iox_query = { path = "../iox_query" }
rand = "0.8.3"
service_common = { path = "../service_common" }
service_grpc_catalog = { path = "../service_grpc_catalog"}
service_grpc_schema = { path = "../service_grpc_schema" }
schema = { path = "../schema" }
sharder = { path = "../sharder" }
snafu = "0.7"
thiserror = "1.0"
iox_time = { path = "../iox_time" }
tokio = { version = "1.21", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
tokio-util = { version = "0.7.4" }
tonic = { version = "0.8" }

View File

@ -6,9 +6,13 @@ use futures::{
stream::FuturesUnordered,
FutureExt, StreamExt, TryFutureExt,
};
use influxdb_iox_client::schema::generated_types::schema_service_server::SchemaServiceServer;
use influxdb_iox_client::{
catalog::generated_types::catalog_service_server::CatalogServiceServer,
schema::generated_types::schema_service_server::SchemaServiceServer,
};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::warn;
use service_grpc_catalog::CatalogService;
use service_grpc_schema::SchemaService;
use std::sync::Arc;
use thiserror::Error;
@ -29,6 +33,11 @@ pub trait QuerierHandler: Send + Sync {
/// [`SchemaService`]: generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService.
fn schema_service(&self) -> SchemaServiceServer<SchemaService>;
/// Acquire a [`CatalogService`] gRPC service implementation.
///
/// [`CatalogService`]: generated_types::influxdata::iox::catalog::v1::catalog_service_server::CatalogService.
fn catalog_service(&self) -> CatalogServiceServer<CatalogService>;
/// Wait until the handler finished to shutdown.
///
/// Use [`shutdown`](Self::shutdown) to trigger a shutdown.
@ -90,6 +99,10 @@ impl QuerierHandler for QuerierHandlerImpl {
SchemaServiceServer::new(SchemaService::new(Arc::clone(&self.catalog)))
}
fn catalog_service(&self) -> CatalogServiceServer<CatalogService> {
CatalogServiceServer::new(CatalogService::new(Arc::clone(&self.catalog)))
}
async fn join(&self) {
// Need to poll handlers unordered to detect early exists of any worker in the list.
let mut unordered: FuturesUnordered<_> = self

View File

@ -16,7 +16,6 @@ generated_types = { path = "../generated_types" }
hashbrown = "0.12"
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
service_grpc_catalog = { path = "../service_grpc_catalog"}
iox_time = { path = "../iox_time" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
@ -30,10 +29,11 @@ schema = { version = "0.1.0", path = "../schema" }
serde = "1.0"
serde_json = "1.0.87"
serde_urlencoded = "0.7"
service_grpc_catalog = { path = "../service_grpc_catalog"}
service_grpc_schema = { path = "../service_grpc_schema" }
service_grpc_object_store = { path = "../service_grpc_object_store" }
snafu = "0.7"
sharder = { path = "../sharder" }
snafu = "0.7"
thiserror = "1.0"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tonic = "0.8"

View File

@ -1,4 +1,4 @@
//! gRPC service for the Catalog. Used in router, but can be included in any gRPC server.
//! gRPC service for the Catalog.
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(