refactor: Remove protobuf based write service (#5750)

* refactor: Remove grpc WriteService

* fix: update end to end test

* fix: Update generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-09-30 06:55:03 -04:00 committed by GitHub
parent 9feb27b3b0
commit 04ae0aee80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 307 deletions

View File

@ -118,9 +118,14 @@ message Column {
bytes null_mask = 4;
}
service WriteService {
rpc Write (WriteRequest) returns (WriteResponse);
}
// Note there used to be a service that would load this internal protobuf format.
// See https://github.com/influxdata/influxdb_iox/pull/5750 and
// https://github.com/influxdata/influxdb_iox/issues/4866
// for rationale of why it was removed
// service WriteService {
// rpc Write (WriteRequest) returns (WriteResponse);
// }
message WriteRequest {
DatabaseBatch database_batch = 1;

View File

@ -124,7 +124,6 @@ where
/// [`GrpcDelegate`]: router::server::grpc::GrpcDelegate
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
add_service!(builder, self.server.grpc().write_service());
add_service!(builder, self.server.grpc().schema_service());
add_service!(builder, self.server.grpc().catalog_service());
add_service!(builder, self.server.grpc().object_store_service());
@ -284,7 +283,7 @@ pub async fn create_router_server_type(
// Initialise the shard-mapping gRPC service.
let shard_service = init_shard_service(sharder, write_buffer_config, catalog).await?;
// Initialise the API delegates, sharing the handler stack between them.
// Initialise the API delegates
let handler_stack = Arc::new(handler_stack);
let http = HttpDelegate::new(
common_state.run_config().max_http_request_size,
@ -292,13 +291,7 @@ pub async fn create_router_server_type(
Arc::clone(&handler_stack),
&metrics,
);
let grpc = GrpcDelegate::new(
handler_stack,
schema_catalog,
object_store,
Arc::clone(&metrics),
shard_service,
);
let grpc = GrpcDelegate::new(schema_catalog, object_store, shard_service);
let router_server = RouterServer::new(http, grpc, metrics, common_state.trace_collector());
let server_type = Arc::new(RouterServerType::new(router_server, common_state));

View File

@ -18,7 +18,7 @@ pub struct RouterServer<D, S> {
trace_collector: Option<Arc<dyn TraceCollector>>,
http: HttpDelegate<D>,
grpc: GrpcDelegate<D, S>,
grpc: GrpcDelegate<S>,
}
impl<D, S> RouterServer<D, S> {
@ -26,7 +26,7 @@ impl<D, S> RouterServer<D, S> {
/// handlers.
pub fn new(
http: HttpDelegate<D>,
grpc: GrpcDelegate<D, S>,
grpc: GrpcDelegate<S>,
metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self {
@ -59,7 +59,7 @@ where
}
/// Get a reference to the router grpc delegate.
pub fn grpc(&self) -> &GrpcDelegate<D, S> {
pub fn grpc(&self) -> &GrpcDelegate<S> {
&self.grpc
}
}

View File

@ -3,83 +3,45 @@
pub mod sharder;
use self::sharder::ShardService;
use crate::{
dml_handlers::{DmlError, DmlHandler, PartitionError},
shard::Shard,
};
use crate::shard::Shard;
use ::sharder::Sharder;
use generated_types::{
google::FieldViolation,
influxdata::{
iox::{catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*},
pbdata::v1::*,
},
use generated_types::influxdata::iox::{
catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*,
};
use hashbrown::HashMap;
use iox_catalog::interface::Catalog;
use metric::U64Counter;
use mutable_batch::MutableBatch;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use schema::selection::Selection;
use service_grpc_catalog::CatalogService;
use service_grpc_object_store::ObjectStoreService;
use service_grpc_schema::SchemaService;
use std::sync::Arc;
use tonic::{metadata::AsciiMetadataValue, Request, Response, Status};
use trace::ctx::SpanContext;
use write_summary::WriteSummary;
// HERE BE DRAGONS: Uppercase characters in this constant cause a panic. Insert them and
// investigate the cause if you dare.
const WRITE_TOKEN_GRPC_HEADER: &str = "x-iox-write-token";
/// This type is responsible for managing all gRPC services exposed by `router`.
#[derive(Debug)]
pub struct GrpcDelegate<D, S> {
dml_handler: Arc<D>,
pub struct GrpcDelegate<S> {
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>,
shard_service: ShardService<S>,
}
impl<D, S> GrpcDelegate<D, S> {
impl<S> GrpcDelegate<S> {
/// Initialise a new gRPC handler, dispatching DML operations to `dml_handler`.
pub fn new(
dml_handler: Arc<D>,
catalog: Arc<dyn Catalog>,
object_store: Arc<DynObjectStore>,
metrics: Arc<metric::Registry>,
shard_service: ShardService<S>,
) -> Self {
Self {
dml_handler,
catalog,
object_store,
metrics,
shard_service,
}
}
}
impl<D, S> GrpcDelegate<D, S>
impl<S> GrpcDelegate<S>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
S: Sharder<(), Item = Arc<Shard>> + Clone + 'static,
{
/// Acquire a [`WriteService`] gRPC service implementation.
///
/// [`WriteService`]: generated_types::influxdata::pbdata::v1::write_service_server::WriteService.
pub fn write_service(
&self,
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService> {
write_service_server::WriteServiceServer::new(WriteService::new(
Arc::clone(&self.dml_handler),
&*self.metrics,
))
}
/// Acquire a [`SchemaService`] gRPC service implementation.
///
/// [`SchemaService`]: generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService.
@ -124,241 +86,3 @@ where
shard_service_server::ShardServiceServer::new(self.shard_service.clone())
}
}
#[derive(Debug)]
struct WriteService<D> {
dml_handler: Arc<D>,
write_metric_rows: U64Counter,
write_metric_columns: U64Counter,
write_metric_tables: U64Counter,
}
impl<D> WriteService<D> {
fn new(dml_handler: Arc<D>, metrics: &metric::Registry) -> Self {
let write_metric_rows = metrics
.register_metric::<U64Counter>(
"grpc_write_rows_total",
"cumulative number of rows successfully routed",
)
.recorder(&[]);
let write_metric_columns = metrics
.register_metric::<U64Counter>(
"grpc_write_fields_total",
"cumulative number of fields successfully routed",
)
.recorder(&[]);
let write_metric_tables = metrics
.register_metric::<U64Counter>(
"grpc_write_tables_total",
"cumulative number of tables in each write request",
)
.recorder(&[]);
Self {
dml_handler,
write_metric_rows,
write_metric_columns,
write_metric_tables,
}
}
}
#[tonic::async_trait]
impl<D> write_service_server::WriteService for WriteService<D>
where
D: DmlHandler<WriteInput = HashMap<String, MutableBatch>, WriteOutput = WriteSummary> + 'static,
{
/// Receive a gRPC [`WriteRequest`] and dispatch it to the DML handler.
async fn write(
&self,
request: Request<WriteRequest>,
) -> Result<Response<WriteResponse>, Status> {
let span_ctx: Option<SpanContext> = request.extensions().get().cloned();
let database_batch = request
.into_inner()
.database_batch
.ok_or_else(|| FieldViolation::required("database_batch"))?;
let tables =
mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| {
FieldViolation {
field: "database_batch".into(),
description: format!("Invalid DatabaseBatch: {}", e),
}
})?;
let (row_count, column_count) =
tables.values().fold((0, 0), |(acc_rows, acc_cols), batch| {
let cols = batch
.schema(Selection::All)
.expect("failed to get schema")
.len();
let rows = batch.rows();
(acc_rows + rows, acc_cols + cols)
});
let namespace = database_batch
.database_name
.try_into()
.map_err(|e| FieldViolation {
field: "database_name".into(),
description: format!("Invalid namespace: {}", e),
})?;
let num_tables = tables.len();
debug!(
num_tables,
%namespace,
"routing grpc write",
);
let summary = self
.dml_handler
.write(&namespace, tables, span_ctx)
.await
.map_err(|e| match e.into() {
e @ DmlError::DatabaseNotFound(_) => Status::not_found(e.to_string()),
e @ DmlError::Schema(_) => Status::aborted(e.to_string()),
e @ (DmlError::Internal(_)
| DmlError::WriteBuffer(_)
| DmlError::NamespaceCreation(_)
| DmlError::Partition(PartitionError::BatchWrite(_))) => {
Status::internal(e.to_string())
}
})?;
self.write_metric_rows.inc(row_count as _);
self.write_metric_columns.inc(column_count as _);
self.write_metric_tables.inc(num_tables as _);
let mut response = Response::new(WriteResponse {});
let metadata = response.metadata_mut();
metadata.insert(
WRITE_TOKEN_GRPC_HEADER,
AsciiMetadataValue::try_from(&summary.to_token()).map_err(|e| {
Status::internal(format!(
"Could not convert WriteSummary token to AsciiMetadataValue: {e}"
))
})?,
);
Ok(response)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dml_handlers::{mock::MockDmlHandler, DmlError};
use generated_types::influxdata::pbdata::v1::write_service_server::WriteService;
use std::sync::Arc;
fn summary() -> WriteSummary {
WriteSummary::default()
}
#[tokio::test]
async fn test_write_no_batch() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest::default();
let err = grpc
.write(Request::new(req))
.await
.expect_err("rpc request should fail");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert!(err.message().contains("database_batch"));
}
#[tokio::test]
async fn test_write_no_namespace() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
let err = grpc
.write(Request::new(req))
.await
.expect_err("rpc request should fail");
assert_eq!(err.code(), tonic::Code::InvalidArgument);
assert!(err.message().contains("database_name"));
}
#[tokio::test]
async fn test_write_ok() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
grpc.write(Request::new(req))
.await
.expect("rpc request should succeed");
}
#[tokio::test]
async fn test_write_ok_with_partition_key() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: "platanos".to_owned(),
}),
};
grpc.write(Request::new(req))
.await
.expect("rpc request should succeed");
}
#[tokio::test]
async fn test_write_dml_handler_error() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(
MockDmlHandler::default()
.with_write_return([Err(DmlError::DatabaseNotFound("nope".to_string()))]),
);
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
let err = grpc
.write(Request::new(req))
.await
.expect_err("rpc request should fail");
assert_eq!(err.code(), tonic::Code::NotFound);
assert!(err.message().contains("nope"));
}
}

View File

@ -492,8 +492,11 @@ impl TestServer {
);
}
ServerType::Router => {
if check_write_service_health(server_type, connections.router_grpc_connection())
.await
if check_catalog_service_health(
server_type,
connections.router_grpc_connection(),
)
.await
{
return;
}
@ -521,8 +524,11 @@ impl TestServer {
ServerType::AllInOne => {
// ensure we can write and query
// TODO also check ingester
if check_write_service_health(server_type, connections.router_grpc_connection())
.await
if check_catalog_service_health(
server_type,
connections.router_grpc_connection(),
)
.await
&& check_arrow_service_health(
server_type,
connections.ingester_grpc_connection(),
@ -543,21 +549,25 @@ impl TestServer {
}
}
/// checks write service health, returning false if the service should be checked again
async fn check_write_service_health(server_type: ServerType, connection: Connection) -> bool {
/// checks catalog service health, as a proxy for all gRPC
/// services. Returns false if the service should be checked again
async fn check_catalog_service_health(server_type: ServerType, connection: Connection) -> bool {
let mut health = influxdb_iox_client::health::Client::new(connection);
match health.check("influxdata.pbdata.v1.WriteService").await {
match health
.check("influxdata.iox.catalog.v1.CatalogService")
.await
{
Ok(true) => {
info!("Write service {:?} is running", server_type);
info!("CatalogService service {:?} is running", server_type);
true
}
Ok(false) => {
info!("Write service {:?} is not running", server_type);
info!("CatalogService {:?} is not running", server_type);
true
}
Err(e) => {
info!("Write service {:?} not yet healthy: {:?}", server_type, e);
info!("CatalogService {:?} not yet healthy: {:?}", server_type, e);
false
}
}