From 90dd9906f6475c4bb770f9bed9aa5f24163ede1f Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 16 Nov 2022 19:11:39 +0100 Subject: [PATCH] feat(ingester): rpc write endpoint Adds a handler implementation of the gRPC WriteService to receive direct RPC writes from a router. This code is currently unused. --- Cargo.lock | 1 + ingester/Cargo.toml | 1 + ingester/src/server/grpc.rs | 1 + ingester/src/server/grpc/rpc_write.rs | 383 ++++++++++++++++++++++++++ 4 files changed, 386 insertions(+) create mode 100644 ingester/src/server/grpc/rpc_write.rs diff --git a/Cargo.lock b/Cargo.lock index eb4c98ac5a..0ab8dc4ff4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2340,6 +2340,7 @@ dependencies = [ "metric", "mutable_batch", "mutable_batch_lp", + "mutable_batch_pb", "object_store", "observability_deps", "once_cell", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index fb83a4db4d..e558fa92e8 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -50,6 +50,7 @@ uuid = { version = "1", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} write_buffer = { path = "../write_buffer" } write_summary = { path = "../write_summary" } +mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" } [dev-dependencies] assert_matches = "1.5.0" diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index 4b47c8683e..cb936f9da1 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -1,6 +1,7 @@ //! gRPC service implementations for `ingester`. mod query; +mod rpc_write; mod write_info; use std::sync::{atomic::AtomicU64, Arc}; diff --git a/ingester/src/server/grpc/rpc_write.rs b/ingester/src/server/grpc/rpc_write.rs new file mode 100644 index 0000000000..cef2dc9862 --- /dev/null +++ b/ingester/src/server/grpc/rpc_write.rs @@ -0,0 +1,383 @@ +use data_types::{NamespaceId, PartitionKey, TableId}; +use dml::{DmlMeta, DmlOperation, DmlWrite}; +use generated_types::influxdata::iox::ingester::v1::{ + self as proto, write_service_server::WriteService, +}; +use mutable_batch::writer; +use mutable_batch_pb::decode::decode_database_batch; +use observability_deps::tracing::*; +use thiserror::Error; +use tonic::{Request, Response}; + +use crate::{data::DmlApplyAction, stream_handler::DmlSink}; + +// A list of error states when handling an RPC write request. +// +// Note that this isn't strictly necessary as the [`WriteService`] trait +// expects a [`tonic::Status`] error value, but by defining the errors here they +// serve as documentation of the potential error states (which are then +// converted into [`tonic::Status`] for the handler). +#[derive(Debug, Error)] +enum RpcError { + #[error("rpc write request does not contain a payload")] + NoPayload, + + #[error("rpc write request does not contain any table data")] + NoTables, + + #[error(transparent)] + Decode(mutable_batch_pb::decode::Error), + + #[error(transparent)] + Apply(crate::data::Error), +} + +impl From for tonic::Status { + fn from(e: RpcError) -> Self { + use crate::data::Error; + + match e { + RpcError::Decode(_) | RpcError::NoPayload | RpcError::NoTables => { + Self::invalid_argument(e.to_string()) + } + RpcError::Apply(Error::BufferWrite { source }) => map_write_error(source), + RpcError::Apply(Error::ShardNotFound { .. }) => { + // This is not a reachable error state in the gRPC write model, + // and is enumerated here purely because of error conflation + // (one big error type instead of small, composable errors). + unreachable!() + } + } + } +} + +/// Map a [`mutable_batch::Error`] to a [`tonic::Status`]. +/// +/// This method takes care to enumerate all possible error states, so that new +/// error additions cause a compilation failure, and therefore require the new +/// error to be explicitly mapped to a gRPC status code. +fn map_write_error(e: mutable_batch::Error) -> tonic::Status { + use tonic::Status; + match e { + mutable_batch::Error::ColumnError { .. } + | mutable_batch::Error::ArrowError { .. } + | mutable_batch::Error::InternalSchema { .. } + | mutable_batch::Error::ColumnNotFound { .. } + | mutable_batch::Error::WriterError { + source: writer::Error::KeyNotFound { .. } | writer::Error::InsufficientValues { .. }, + } => Status::internal(e.to_string()), + mutable_batch::Error::WriterError { + source: writer::Error::TypeMismatch { .. }, + } => { + // While a schema type conflict is ultimately a user error, if it + // reaches the ingester it should have already passed through schema + // validation in the router, and as such it is an internal system + // failure. + Status::internal(e.to_string()) + } + } +} + +/// A gRPC [`WriteService`] handler. +/// +/// This handler accepts writes from an upstream, and applies them to the +/// provided [`DmlSink`]. +#[derive(Debug)] +pub struct RpcWrite { + sink: T, +} + +impl RpcWrite { + /// Instantiate a new [`RpcWrite`] that pushes [`DmlOperation`] instances + /// into `sink`. + #[allow(dead_code)] + pub fn new(sink: T) -> Self { + Self { sink } + } +} + +#[tonic::async_trait] +impl WriteService for RpcWrite +where + T: DmlSink + 'static, +{ + async fn write( + &self, + request: Request, + ) -> Result, tonic::Status> { + let remote_addr = request + .remote_addr() + .map(|v| v.to_string()) + .unwrap_or_else(|| "".to_string()); + + // Extract the write payload + let payload = request.into_inner().payload.ok_or(RpcError::NoPayload)?; + + let batches = decode_database_batch(&payload).map_err(RpcError::Decode)?; + let num_tables = batches.0.len(); + let namespace_id = NamespaceId::new(payload.database_id); + let partition_key = PartitionKey::from(payload.partition_key); + + // Never attempt to create a DmlWrite with no tables - doing so causes a + // panic. + if num_tables == 0 { + return Err(RpcError::NoTables)?; + } + + // Reconstruct the DML operation + let op = DmlWrite::new( + namespace_id, + batches.0, + batches + .1 + .into_iter() + .map(|(k, v)| (k, TableId::new(v))) + .collect(), + partition_key.clone(), + // The tracing context should be propagated over the RPC boundary. + // + // See https://github.com/influxdata/influxdb_iox/issues/6177 + DmlMeta::unsequenced(None), + ); + + trace!( + remote_addr, + num_tables, + %namespace_id, + %partition_key, + "received rpc write" + ); + + // Apply the DML op to the in-memory buffer. + let res = self + .sink + .apply(DmlOperation::Write(op)) + .await + .map_err(RpcError::Apply)?; + + // Assert that the write was not skipped due to having a non-monotonic + // sequence number. In this gRPC write model, there are no sequence + // numbers! + match res { + DmlApplyAction::Applied(_) => { + // Discard the lifecycle manager's "should_pause" hint. + } + DmlApplyAction::Skipped => unreachable!(), + } + + Ok(Response::new(proto::WriteResponse {})) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use generated_types::influxdata::pbdata::v1::{ + column::{SemanticType, Values}, + Column, DatabaseBatch, TableBatch, + }; + + use crate::stream_handler::mock_sink::MockDmlSink; + + use super::*; + + const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + const PARTITION_KEY: &str = "bananas"; + + macro_rules! test_rpc_write { + ( + $name:ident, + request = $request:expr, // Proto WriteRequest request the server receives + sink_ret = $sink_ret:expr, // The mock return value from the DmlSink, if called + want_err = $want_err:literal, // The expectation of an error from the handler + want_calls = $($want_calls:tt)+ // + ) => { + paste::paste! { + #[tokio::test] + async fn []() { + let mock = Arc::new( + MockDmlSink::default().with_apply_return(vec![$sink_ret]), + ); + let handler = RpcWrite::new(Arc::clone(&mock)); + + let ret = handler + .write(Request::new($request)) + .await; + + assert_eq!(ret.is_err(), $want_err, "wanted handler error {} got {:?}", $want_err, ret); + assert_matches!(mock.get_calls().as_slice(), $($want_calls)+); + } + } + }; + } + + test_rpc_write!( + apply_ok_pause_true, + request = proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: NAMESPACE_ID.get(), + partition_key: PARTITION_KEY.to_string(), + table_batches: vec![TableBatch { + table_name: "".to_string(), + table_id: 42, + columns: vec![Column { + column_name: "time".to_string(), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![4242], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count: 1, + }], + }), + }, + sink_ret = Ok(DmlApplyAction::Applied(true)), + want_err = false, + want_calls = [DmlOperation::Write(w)] => { + // Assert the various DmlWrite properties match the expected values + assert_eq!(w.namespace_id(), NAMESPACE_ID); + assert_eq!(w.table_count(), 1); + assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY)); + } + ); + + test_rpc_write!( + apply_ok_pause_false, + request = proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: NAMESPACE_ID.get(), + partition_key: PARTITION_KEY.to_string(), + table_batches: vec![TableBatch { + table_name: "".to_string(), + table_id: 42, + columns: vec![Column { + column_name: "time".to_string(), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![4242], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count: 1, + }], + }), + }, + sink_ret = Ok(DmlApplyAction::Applied(false)), + want_err = false, + want_calls = [DmlOperation::Write(w)] => { + // Assert the various DmlWrite properties match the expected values + assert_eq!(w.namespace_id(), NAMESPACE_ID); + assert_eq!(w.table_count(), 1); + assert_eq!(*w.partition_key(), PartitionKey::from(PARTITION_KEY)); + } + ); + + test_rpc_write!( + no_payload, + request = proto::WriteRequest { payload: None }, + sink_ret = Ok(DmlApplyAction::Applied(false)), + want_err = true, + want_calls = [] + ); + + test_rpc_write!( + no_tables, + request = proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: NAMESPACE_ID.get(), + partition_key: PARTITION_KEY.to_string(), + table_batches: vec![], + }), + }, + sink_ret = Ok(DmlApplyAction::Applied(false)), + want_err = true, + want_calls = [] + ); + + test_rpc_write!( + batch_error, + request = proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: NAMESPACE_ID.get(), + partition_key: PARTITION_KEY.to_string(), + table_batches: vec![TableBatch { + table_name: "".to_string(), + table_id: 42, + columns: vec![Column { + column_name: "time".to_string(), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![4242], + f64_values: vec![], + u64_values: vec![4242], // Two types for one column + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count: 1, + }], + }), + }, + sink_ret = Ok(DmlApplyAction::Applied(false)), + want_err = true, + want_calls = [] + ); + + #[tokio::test] + #[should_panic(expected = "unreachable")] + async fn test_rpc_write_apply_skipped() { + let mock = + Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(DmlApplyAction::Skipped)])); + let handler = RpcWrite::new(Arc::clone(&mock)); + + let _ = handler + .write(Request::new(proto::WriteRequest { + payload: Some(DatabaseBatch { + database_id: NAMESPACE_ID.get(), + partition_key: PARTITION_KEY.to_string(), + table_batches: vec![TableBatch { + table_name: "".to_string(), + table_id: 42, + columns: vec![Column { + column_name: "time".to_string(), + semantic_type: SemanticType::Time.into(), + values: Some(Values { + i64_values: vec![4242], + f64_values: vec![], + u64_values: vec![], + string_values: vec![], + bool_values: vec![], + bytes_values: vec![], + packed_string_values: None, + interned_string_values: None, + }), + null_mask: vec![0], + }], + row_count: 1, + }], + }), + })) + .await; + } +}