From c1f7154031f8bd5956d4cd82577a2b1c26e738a2 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 20 Jun 2022 13:42:51 +0100 Subject: [PATCH] feat: propagate partition key through kafka Changes the kafka message wire format to include the partition key for serialised DML writes on the wire. After this commit, the kafka messages will contain the partition key for each op, but this information will go unused in the ingester - this enables us to roll out the producer side, before making the value's presence necessary on the consumer side. A follow-up PR will change the ingester to utilise this embedded partition key. This has the unfortunate side effect of making the partition key part of the public gRPC write API: https://github.com/influxdata/influxdb_iox/issues/4866 --- dml/src/lib.rs | 1 + .../pbdata/v1/influxdb_pb_data_protocol.proto | 8 +++++++ mutable_batch_pb/src/encode.rs | 4 ++++ router/src/server/grpc.rs | 22 +++++++++++++++++++ test_helpers_end_to_end/src/client.rs | 1 + write_buffer/src/codec.rs | 11 +++++++--- 6 files changed, 44 insertions(+), 3 deletions(-) diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 29091c23d1..42b6fc88ca 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -381,6 +381,7 @@ pub mod test_util { pub fn assert_writes_eq(a: &DmlWrite, b: &DmlWrite) { assert_eq!(a.namespace, b.namespace); assert_eq!(a.meta(), b.meta()); + assert_eq!(a.partition_key(), b.partition_key()); assert_eq!(a.table_count(), b.table_count()); diff --git a/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto b/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto index 96df307b00..1e1412ef80 100644 --- a/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto +++ b/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto @@ -5,8 +5,16 @@ package influxdata.pbdata.v1; option go_package = "github.com/influxdata/influxdb-pb-data-protocol/golang;influxdbpbdataprotocol"; message DatabaseBatch { + // The destination database name / namespace for this write. string database_name = 1; + // An optional partition key for this batch. + // + // If specified, all batches in this write MUST map to this partition key. + // Violating this invariant MAY cause data to become unqueryable, degrade + // query performance, or other bad things. + string partition_key = 3; + // Table data. Data for a given table may appear in multiple table batches. repeated TableBatch table_batches = 2; } diff --git a/mutable_batch_pb/src/encode.rs b/mutable_batch_pb/src/encode.rs index f913801ffc..e296631332 100644 --- a/mutable_batch_pb/src/encode.rs +++ b/mutable_batch_pb/src/encode.rs @@ -19,6 +19,10 @@ pub fn encode_write(db_name: &str, write: &DmlWrite) -> DatabaseBatch { .tables() .map(|(table_name, batch)| encode_batch(table_name, batch)) .collect(), + partition_key: write + .partition_key() + .map(ToString::to_string) + .unwrap_or_default(), } } diff --git a/router/src/server/grpc.rs b/router/src/server/grpc.rs index aefe655806..4fd9ef5c33 100644 --- a/router/src/server/grpc.rs +++ b/router/src/server/grpc.rs @@ -265,6 +265,7 @@ mod tests { database_batch: Some(DatabaseBatch { database_name: "".to_owned(), table_batches: vec![], + partition_key: Default::default(), }), }; @@ -287,6 +288,26 @@ mod tests { database_batch: Some(DatabaseBatch { database_name: "bananas".to_owned(), table_batches: vec![], + partition_key: Default::default(), + }), + }; + + grpc.write(Request::new(req)) + .await + .expect("rpc request should succeed"); + } + + #[tokio::test] + async fn test_write_ok_with_partition_key() { + let metrics = Arc::new(metric::Registry::default()); + let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); + let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); + + let req = WriteRequest { + database_batch: Some(DatabaseBatch { + database_name: "bananas".to_owned(), + table_batches: vec![], + partition_key: "platanos".to_owned(), }), }; @@ -308,6 +329,7 @@ mod tests { database_batch: Some(DatabaseBatch { database_name: "bananas".to_owned(), table_batches: vec![], + partition_key: Default::default(), }), }; diff --git a/test_helpers_end_to_end/src/client.rs b/test_helpers_end_to_end/src/client.rs index 0d3b6fbdad..07df0f9222 100644 --- a/test_helpers_end_to_end/src/client.rs +++ b/test_helpers_end_to_end/src/client.rs @@ -50,6 +50,7 @@ pub async fn write_to_router_grpc( database_batch: Some(DatabaseBatch { database_name: namespace.into(), table_batches, + partition_key: Default::default(), }), }; diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 390b817f60..06209cdb8b 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -1,7 +1,7 @@ //! Encode/Decode for messages use crate::core::WriteBufferError; -use data_types::{NonEmptyString, Sequence}; +use data_types::{NonEmptyString, PartitionKey, Sequence}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use generated_types::{ google::FromOptionalField, @@ -193,11 +193,16 @@ pub fn decode( )) })?; + let partition_key = if write.partition_key.is_empty() { + None + } else { + Some(PartitionKey::from(write.partition_key)) + }; + Ok(DmlOperation::Write(DmlWrite::new( headers.namespace, tables, - // TODO(3603): propagate partition key through kafka - None, + partition_key, meta, ))) }