From 04ae0aee80a1bbf888e7c009ce52dafcf67c68ea Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 30 Sep 2022 06:55:03 -0400 Subject: [PATCH] refactor: Remove protobuf based write service (#5750) * refactor: Remove grpc WriteService * fix: update end to end test * fix: Update generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../pbdata/v1/influxdb_pb_data_protocol.proto | 11 +- ioxd_router/src/lib.rs | 11 +- router/src/server.rs | 6 +- router/src/server/grpc.rs | 288 +----------------- test_helpers_end_to_end/src/server_fixture.rs | 30 +- 5 files changed, 39 insertions(+), 307 deletions(-) diff --git a/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto b/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto index 1e1412ef80..cb3d043141 100644 --- a/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto +++ b/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto @@ -118,9 +118,14 @@ message Column { bytes null_mask = 4; } -service WriteService { - rpc Write (WriteRequest) returns (WriteResponse); -} +// Note there used to be a service that would load this internal protobuf format. +// See https://github.com/influxdata/influxdb_iox/pull/5750 and +// https://github.com/influxdata/influxdb_iox/issues/4866 +// for rationale of why it was removed + +// service WriteService { +// rpc Write (WriteRequest) returns (WriteResponse); +// } message WriteRequest { DatabaseBatch database_batch = 1; diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 7856254ea4..ba4e3777cb 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -124,7 +124,6 @@ where /// [`GrpcDelegate`]: router::server::grpc::GrpcDelegate async fn server_grpc(self: Arc, builder_input: RpcBuilderInput) -> Result<(), RpcError> { let builder = setup_builder!(builder_input, self); - add_service!(builder, self.server.grpc().write_service()); add_service!(builder, self.server.grpc().schema_service()); add_service!(builder, self.server.grpc().catalog_service()); add_service!(builder, self.server.grpc().object_store_service()); @@ -284,7 +283,7 @@ pub async fn create_router_server_type( // Initialise the shard-mapping gRPC service. let shard_service = init_shard_service(sharder, write_buffer_config, catalog).await?; - // Initialise the API delegates, sharing the handler stack between them. + // Initialise the API delegates let handler_stack = Arc::new(handler_stack); let http = HttpDelegate::new( common_state.run_config().max_http_request_size, @@ -292,13 +291,7 @@ pub async fn create_router_server_type( Arc::clone(&handler_stack), &metrics, ); - let grpc = GrpcDelegate::new( - handler_stack, - schema_catalog, - object_store, - Arc::clone(&metrics), - shard_service, - ); + let grpc = GrpcDelegate::new(schema_catalog, object_store, shard_service); let router_server = RouterServer::new(http, grpc, metrics, common_state.trace_collector()); let server_type = Arc::new(RouterServerType::new(router_server, common_state)); diff --git a/router/src/server.rs b/router/src/server.rs index edb535555d..a82b414894 100644 --- a/router/src/server.rs +++ b/router/src/server.rs @@ -18,7 +18,7 @@ pub struct RouterServer { trace_collector: Option>, http: HttpDelegate, - grpc: GrpcDelegate, + grpc: GrpcDelegate, } impl RouterServer { @@ -26,7 +26,7 @@ impl RouterServer { /// handlers. pub fn new( http: HttpDelegate, - grpc: GrpcDelegate, + grpc: GrpcDelegate, metrics: Arc, trace_collector: Option>, ) -> Self { @@ -59,7 +59,7 @@ where } /// Get a reference to the router grpc delegate. - pub fn grpc(&self) -> &GrpcDelegate { + pub fn grpc(&self) -> &GrpcDelegate { &self.grpc } } diff --git a/router/src/server/grpc.rs b/router/src/server/grpc.rs index 669aad7e18..7d34227366 100644 --- a/router/src/server/grpc.rs +++ b/router/src/server/grpc.rs @@ -3,83 +3,45 @@ pub mod sharder; use self::sharder::ShardService; -use crate::{ - dml_handlers::{DmlError, DmlHandler, PartitionError}, - shard::Shard, -}; +use crate::shard::Shard; use ::sharder::Sharder; -use generated_types::{ - google::FieldViolation, - influxdata::{ - iox::{catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*}, - pbdata::v1::*, - }, +use generated_types::influxdata::iox::{ + catalog::v1::*, object_store::v1::*, schema::v1::*, sharder::v1::*, }; -use hashbrown::HashMap; use iox_catalog::interface::Catalog; -use metric::U64Counter; -use mutable_batch::MutableBatch; use object_store::DynObjectStore; -use observability_deps::tracing::*; -use schema::selection::Selection; use service_grpc_catalog::CatalogService; use service_grpc_object_store::ObjectStoreService; use service_grpc_schema::SchemaService; use std::sync::Arc; -use tonic::{metadata::AsciiMetadataValue, Request, Response, Status}; -use trace::ctx::SpanContext; -use write_summary::WriteSummary; - -// HERE BE DRAGONS: Uppercase characters in this constant cause a panic. Insert them and -// investigate the cause if you dare. -const WRITE_TOKEN_GRPC_HEADER: &str = "x-iox-write-token"; /// This type is responsible for managing all gRPC services exposed by `router`. #[derive(Debug)] -pub struct GrpcDelegate { - dml_handler: Arc, +pub struct GrpcDelegate { catalog: Arc, object_store: Arc, - metrics: Arc, shard_service: ShardService, } -impl GrpcDelegate { +impl GrpcDelegate { /// Initialise a new gRPC handler, dispatching DML operations to `dml_handler`. pub fn new( - dml_handler: Arc, catalog: Arc, object_store: Arc, - metrics: Arc, shard_service: ShardService, ) -> Self { Self { - dml_handler, catalog, object_store, - metrics, shard_service, } } } -impl GrpcDelegate +impl GrpcDelegate where - D: DmlHandler, WriteOutput = WriteSummary> + 'static, S: Sharder<(), Item = Arc> + Clone + 'static, { - /// Acquire a [`WriteService`] gRPC service implementation. - /// - /// [`WriteService`]: generated_types::influxdata::pbdata::v1::write_service_server::WriteService. - pub fn write_service( - &self, - ) -> write_service_server::WriteServiceServer { - write_service_server::WriteServiceServer::new(WriteService::new( - Arc::clone(&self.dml_handler), - &*self.metrics, - )) - } - /// Acquire a [`SchemaService`] gRPC service implementation. /// /// [`SchemaService`]: generated_types::influxdata::iox::schema::v1::schema_service_server::SchemaService. @@ -124,241 +86,3 @@ where shard_service_server::ShardServiceServer::new(self.shard_service.clone()) } } - -#[derive(Debug)] -struct WriteService { - dml_handler: Arc, - - write_metric_rows: U64Counter, - write_metric_columns: U64Counter, - write_metric_tables: U64Counter, -} - -impl WriteService { - fn new(dml_handler: Arc, metrics: &metric::Registry) -> Self { - let write_metric_rows = metrics - .register_metric::( - "grpc_write_rows_total", - "cumulative number of rows successfully routed", - ) - .recorder(&[]); - let write_metric_columns = metrics - .register_metric::( - "grpc_write_fields_total", - "cumulative number of fields successfully routed", - ) - .recorder(&[]); - let write_metric_tables = metrics - .register_metric::( - "grpc_write_tables_total", - "cumulative number of tables in each write request", - ) - .recorder(&[]); - - Self { - dml_handler, - write_metric_rows, - write_metric_columns, - write_metric_tables, - } - } -} - -#[tonic::async_trait] -impl write_service_server::WriteService for WriteService -where - D: DmlHandler, WriteOutput = WriteSummary> + 'static, -{ - /// Receive a gRPC [`WriteRequest`] and dispatch it to the DML handler. - async fn write( - &self, - request: Request, - ) -> Result, Status> { - let span_ctx: Option = request.extensions().get().cloned(); - let database_batch = request - .into_inner() - .database_batch - .ok_or_else(|| FieldViolation::required("database_batch"))?; - - let tables = - mutable_batch_pb::decode::decode_database_batch(&database_batch).map_err(|e| { - FieldViolation { - field: "database_batch".into(), - description: format!("Invalid DatabaseBatch: {}", e), - } - })?; - - let (row_count, column_count) = - tables.values().fold((0, 0), |(acc_rows, acc_cols), batch| { - let cols = batch - .schema(Selection::All) - .expect("failed to get schema") - .len(); - let rows = batch.rows(); - (acc_rows + rows, acc_cols + cols) - }); - - let namespace = database_batch - .database_name - .try_into() - .map_err(|e| FieldViolation { - field: "database_name".into(), - description: format!("Invalid namespace: {}", e), - })?; - - let num_tables = tables.len(); - debug!( - num_tables, - %namespace, - "routing grpc write", - ); - - let summary = self - .dml_handler - .write(&namespace, tables, span_ctx) - .await - .map_err(|e| match e.into() { - e @ DmlError::DatabaseNotFound(_) => Status::not_found(e.to_string()), - e @ DmlError::Schema(_) => Status::aborted(e.to_string()), - - e @ (DmlError::Internal(_) - | DmlError::WriteBuffer(_) - | DmlError::NamespaceCreation(_) - | DmlError::Partition(PartitionError::BatchWrite(_))) => { - Status::internal(e.to_string()) - } - })?; - - self.write_metric_rows.inc(row_count as _); - self.write_metric_columns.inc(column_count as _); - self.write_metric_tables.inc(num_tables as _); - - let mut response = Response::new(WriteResponse {}); - let metadata = response.metadata_mut(); - metadata.insert( - WRITE_TOKEN_GRPC_HEADER, - AsciiMetadataValue::try_from(&summary.to_token()).map_err(|e| { - Status::internal(format!( - "Could not convert WriteSummary token to AsciiMetadataValue: {e}" - )) - })?, - ); - - Ok(response) - } -} -#[cfg(test)] -mod tests { - use super::*; - use crate::dml_handlers::{mock::MockDmlHandler, DmlError}; - use generated_types::influxdata::pbdata::v1::write_service_server::WriteService; - use std::sync::Arc; - - fn summary() -> WriteSummary { - WriteSummary::default() - } - - #[tokio::test] - async fn test_write_no_batch() { - let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); - let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); - - let req = WriteRequest::default(); - - let err = grpc - .write(Request::new(req)) - .await - .expect_err("rpc request should fail"); - - assert_eq!(err.code(), tonic::Code::InvalidArgument); - assert!(err.message().contains("database_batch")); - } - - #[tokio::test] - async fn test_write_no_namespace() { - let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); - let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); - - let req = WriteRequest { - database_batch: Some(DatabaseBatch { - database_name: "".to_owned(), - table_batches: vec![], - partition_key: Default::default(), - }), - }; - - let err = grpc - .write(Request::new(req)) - .await - .expect_err("rpc request should fail"); - - assert_eq!(err.code(), tonic::Code::InvalidArgument); - assert!(err.message().contains("database_name")); - } - - #[tokio::test] - async fn test_write_ok() { - let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); - let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); - - let req = WriteRequest { - database_batch: Some(DatabaseBatch { - database_name: "bananas".to_owned(), - table_batches: vec![], - partition_key: Default::default(), - }), - }; - - grpc.write(Request::new(req)) - .await - .expect("rpc request should succeed"); - } - - #[tokio::test] - async fn test_write_ok_with_partition_key() { - let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); - let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); - - let req = WriteRequest { - database_batch: Some(DatabaseBatch { - database_name: "bananas".to_owned(), - table_batches: vec![], - partition_key: "platanos".to_owned(), - }), - }; - - grpc.write(Request::new(req)) - .await - .expect("rpc request should succeed"); - } - - #[tokio::test] - async fn test_write_dml_handler_error() { - let metrics = Arc::new(metric::Registry::default()); - let handler = Arc::new( - MockDmlHandler::default() - .with_write_return([Err(DmlError::DatabaseNotFound("nope".to_string()))]), - ); - let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); - - let req = WriteRequest { - database_batch: Some(DatabaseBatch { - database_name: "bananas".to_owned(), - table_batches: vec![], - partition_key: Default::default(), - }), - }; - - let err = grpc - .write(Request::new(req)) - .await - .expect_err("rpc request should fail"); - - assert_eq!(err.code(), tonic::Code::NotFound); - assert!(err.message().contains("nope")); - } -} diff --git a/test_helpers_end_to_end/src/server_fixture.rs b/test_helpers_end_to_end/src/server_fixture.rs index 5ba9d8a359..349524f569 100644 --- a/test_helpers_end_to_end/src/server_fixture.rs +++ b/test_helpers_end_to_end/src/server_fixture.rs @@ -492,8 +492,11 @@ impl TestServer { ); } ServerType::Router => { - if check_write_service_health(server_type, connections.router_grpc_connection()) - .await + if check_catalog_service_health( + server_type, + connections.router_grpc_connection(), + ) + .await { return; } @@ -521,8 +524,11 @@ impl TestServer { ServerType::AllInOne => { // ensure we can write and query // TODO also check ingester - if check_write_service_health(server_type, connections.router_grpc_connection()) - .await + if check_catalog_service_health( + server_type, + connections.router_grpc_connection(), + ) + .await && check_arrow_service_health( server_type, connections.ingester_grpc_connection(), @@ -543,21 +549,25 @@ impl TestServer { } } -/// checks write service health, returning false if the service should be checked again -async fn check_write_service_health(server_type: ServerType, connection: Connection) -> bool { +/// checks catalog service health, as a proxy for all gRPC +/// services. Returns false if the service should be checked again +async fn check_catalog_service_health(server_type: ServerType, connection: Connection) -> bool { let mut health = influxdb_iox_client::health::Client::new(connection); - match health.check("influxdata.pbdata.v1.WriteService").await { + match health + .check("influxdata.iox.catalog.v1.CatalogService") + .await + { Ok(true) => { - info!("Write service {:?} is running", server_type); + info!("CatalogService service {:?} is running", server_type); true } Ok(false) => { - info!("Write service {:?} is not running", server_type); + info!("CatalogService {:?} is not running", server_type); true } Err(e) => { - info!("Write service {:?} not yet healthy: {:?}", server_type, e); + info!("CatalogService {:?} not yet healthy: {:?}", server_type, e); false } }