Merge pull request #4907 from influxdata/dom/propagate-partition-key

feat: propagate partition key through kafka
pull/24376/head
Dom 2022-06-20 14:07:50 +01:00 committed by GitHub
commit 4df710a205
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 3 deletions

View File

@ -381,6 +381,7 @@ pub mod test_util {
pub fn assert_writes_eq(a: &DmlWrite, b: &DmlWrite) {
assert_eq!(a.namespace, b.namespace);
assert_eq!(a.meta(), b.meta());
assert_eq!(a.partition_key(), b.partition_key());
assert_eq!(a.table_count(), b.table_count());

View File

@ -5,8 +5,16 @@ package influxdata.pbdata.v1;
option go_package = "github.com/influxdata/influxdb-pb-data-protocol/golang;influxdbpbdataprotocol";
message DatabaseBatch {
// The destination database name / namespace for this write.
string database_name = 1;
// An optional partition key for this batch.
//
// If specified, all batches in this write MUST map to this partition key.
// Violating this invariant MAY cause data to become unqueryable, degrade
// query performance, or other bad things.
string partition_key = 3;
// Table data. Data for a given table may appear in multiple table batches.
repeated TableBatch table_batches = 2;
}

View File

@ -19,6 +19,10 @@ pub fn encode_write(db_name: &str, write: &DmlWrite) -> DatabaseBatch {
.tables()
.map(|(table_name, batch)| encode_batch(table_name, batch))
.collect(),
partition_key: write
.partition_key()
.map(ToString::to_string)
.unwrap_or_default(),
}
}

View File

@ -265,6 +265,7 @@ mod tests {
database_batch: Some(DatabaseBatch {
database_name: "".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
@ -287,6 +288,26 @@ mod tests {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};
grpc.write(Request::new(req))
.await
.expect("rpc request should succeed");
}
#[tokio::test]
async fn test_write_ok_with_partition_key() {
let metrics = Arc::new(metric::Registry::default());
let handler = Arc::new(MockDmlHandler::default().with_write_return([Ok(summary())]));
let grpc = super::WriteService::new(Arc::clone(&handler), &metrics);
let req = WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: "platanos".to_owned(),
}),
};
@ -308,6 +329,7 @@ mod tests {
database_batch: Some(DatabaseBatch {
database_name: "bananas".to_owned(),
table_batches: vec![],
partition_key: Default::default(),
}),
};

View File

@ -50,6 +50,7 @@ pub async fn write_to_router_grpc(
database_batch: Some(DatabaseBatch {
database_name: namespace.into(),
table_batches,
partition_key: Default::default(),
}),
};

View File

@ -1,7 +1,7 @@
//! Encode/Decode for messages
use crate::core::WriteBufferError;
use data_types::{NonEmptyString, Sequence};
use data_types::{NonEmptyString, PartitionKey, Sequence};
use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite};
use generated_types::{
google::FromOptionalField,
@ -193,11 +193,16 @@ pub fn decode(
))
})?;
let partition_key = if write.partition_key.is_empty() {
None
} else {
Some(PartitionKey::from(write.partition_key))
};
Ok(DmlOperation::Write(DmlWrite::new(
headers.namespace,
tables,
// TODO(3603): propagate partition key through kafka
None,
partition_key,
meta,
)))
}