diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 29091c23d1..42b6fc88ca 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -381,6 +381,7 @@ pub mod test_util { pub fn assert_writes_eq(a: &DmlWrite, b: &DmlWrite) { assert_eq!(a.namespace, b.namespace); assert_eq!(a.meta(), b.meta()); + assert_eq!(a.partition_key(), b.partition_key()); assert_eq!(a.table_count(), b.table_count()); diff --git a/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto b/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto index 96df307b00..1e1412ef80 100644 --- a/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto +++ b/generated_types/protos/influxdata/pbdata/v1/influxdb_pb_data_protocol.proto @@ -5,8 +5,16 @@ package influxdata.pbdata.v1; option go_package = "github.com/influxdata/influxdb-pb-data-protocol/golang;influxdbpbdataprotocol"; message DatabaseBatch { + // The destination database name / namespace for this write. string database_name = 1; + // An optional partition key for this batch. + // + // If specified, all batches in this write MUST map to this partition key. + // Violating this invariant MAY cause data to become unqueryable, degrade + // query performance, or other bad things. + string partition_key = 3; + // Table data. Data for a given table may appear in multiple table batches. repeated TableBatch table_batches = 2; } diff --git a/mutable_batch_pb/src/encode.rs b/mutable_batch_pb/src/encode.rs index f913801ffc..e296631332 100644 --- a/mutable_batch_pb/src/encode.rs +++ b/mutable_batch_pb/src/encode.rs @@ -19,6 +19,10 @@ pub fn encode_write(db_name: &str, write: &DmlWrite) -> DatabaseBatch { .tables() .map(|(table_name, batch)| encode_batch(table_name, batch)) .collect(), + partition_key: write + .partition_key() + .map(ToString::to_string) + .unwrap_or_default(), } } diff --git a/router/src/server/grpc.rs b/router/src/server/grpc.rs index aefe655806..4fd9ef5c33 100644 --- a/router/src/server/grpc.rs +++ b/router/src/server/grpc.rs @@ -265,6 +265,7 @@ mod tests { database_batch: Some(DatabaseBatch { database_name: "".to_owned(), table_batches: vec![], + partition_key: Default::default(), }), }; @@ -287,6 +288,26 @@ mod tests { database_batch: Some(DatabaseBatch { database_name: "bananas".to_owned(), table_batches: vec![], + partition_key: Default::default(), + }), + }; + + grpc.write(Request::new(req)) + .await + .expect("rpc request should succeed"); + } + + #[tokio::test] + async fn test_write_ok_with_partition_key() { + let metrics = Arc::new(metric::Registry::default()); + let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())])); + let grpc = super::WriteService::new(Arc::clone(&handler), &metrics); + + let req = WriteRequest { + database_batch: Some(DatabaseBatch { + database_name: "bananas".to_owned(), + table_batches: vec![], + partition_key: "platanos".to_owned(), }), }; @@ -308,6 +329,7 @@ mod tests { database_batch: Some(DatabaseBatch { database_name: "bananas".to_owned(), table_batches: vec![], + partition_key: Default::default(), }), }; diff --git a/test_helpers_end_to_end/src/client.rs b/test_helpers_end_to_end/src/client.rs index 0d3b6fbdad..07df0f9222 100644 --- a/test_helpers_end_to_end/src/client.rs +++ b/test_helpers_end_to_end/src/client.rs @@ -50,6 +50,7 @@ pub async fn write_to_router_grpc( database_batch: Some(DatabaseBatch { database_name: namespace.into(), table_batches, + partition_key: Default::default(), }), }; diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 390b817f60..06209cdb8b 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -1,7 +1,7 @@ //! Encode/Decode for messages use crate::core::WriteBufferError; -use data_types::{NonEmptyString, Sequence}; +use data_types::{NonEmptyString, PartitionKey, Sequence}; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use generated_types::{ google::FromOptionalField, @@ -193,11 +193,16 @@ pub fn decode( )) })?; + let partition_key = if write.partition_key.is_empty() { + None + } else { + Some(PartitionKey::from(write.partition_key)) + }; + Ok(DmlOperation::Write(DmlWrite::new( headers.namespace, tables, - // TODO(3603): propagate partition key through kafka - None, + partition_key, meta, ))) }