refactor(dml): PartitionKey required for writes

Changes the DmlWrite type to require a PartitionKey be specified,
instead of accepting an Option.

This requirement was already in place - the write buffer upheld an
invariant that all writes contained a partition key value (was not
"None") or it panicked at runtime when attempting to enqueue the write.

It is now possible to encode this invariant in the type system, which is
what this change does.
pull/24376/head
Dom Dwyer 2022-10-25 11:10:42 +02:00
parent 3568564d39
commit 72a358e52f
17 changed files with 51 additions and 94 deletions

View File

@ -180,11 +180,8 @@ pub struct DmlWrite {
meta: DmlMeta,
min_timestamp: i64,
max_timestamp: i64,
/// An optional partition key for this write.
///
/// NOTE: all rows in this batch MUST map to this partition key if
/// specified.
partition_key: Option<PartitionKey>,
/// The partition key derived for this write.
partition_key: PartitionKey,
}
impl DmlWrite {
@ -200,7 +197,7 @@ impl DmlWrite {
pub fn new(
namespace: impl Into<String>,
tables: HashMap<String, MutableBatch>,
partition_key: Option<PartitionKey>,
partition_key: PartitionKey,
meta: DmlMeta,
) -> Self {
assert_ne!(tables.len(), 0);
@ -293,8 +290,8 @@ impl DmlWrite {
}
/// Return the partition key derived for this op.
pub fn partition_key(&self) -> Option<&PartitionKey> {
self.partition_key.as_ref()
pub fn partition_key(&self) -> &PartitionKey {
&self.partition_key
}
}

View File

@ -663,7 +663,7 @@ mod tests {
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)),
ignored_ts,
@ -704,7 +704,7 @@ mod tests {
let w2 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)),
ignored_ts,
@ -756,7 +756,7 @@ mod tests {
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10\nmem foo=1 11", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)),
Time::from_timestamp_millis(42),
@ -865,7 +865,7 @@ mod tests {
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)),
ignored_ts,
@ -881,7 +881,7 @@ mod tests {
let w2 = DmlWrite::new(
"foo",
lines_to_batches("cpu foo=1 10", 1).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(2), SequenceNumber::new(1)),
ignored_ts,
@ -899,7 +899,7 @@ mod tests {
let w3 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 30", 2).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)),
ignored_ts,
@ -1140,7 +1140,7 @@ mod tests {
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)),
ignored_ts,
@ -1157,7 +1157,7 @@ mod tests {
let w2 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 30\ncpu bar=1 20", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)),
ignored_ts,
@ -1255,7 +1255,7 @@ mod tests {
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)),
ignored_ts,
@ -1266,7 +1266,7 @@ mod tests {
let w2 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(2)),
ignored_ts,
@ -1442,7 +1442,7 @@ mod tests {
let w1 = DmlWrite::new(
"foo",
lines_to_batches("mem foo=1 10", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(1), SequenceNumber::new(1)),
ignored_ts,

View File

@ -197,10 +197,7 @@ impl NamespaceData {
let mut all_skipped = true;
// Extract the partition key derived by the router.
let partition_key = write
.partition_key()
.expect("no partition key in dml write")
.clone();
let partition_key = write.partition_key().clone();
for (t, b) in write.into_tables() {
let t = TableName::from(t);

View File

@ -601,7 +601,7 @@ mod tests {
let write_operations = vec![DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 20", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(10)),
ingest_ts1,
@ -627,7 +627,7 @@ mod tests {
let write_operations = vec![DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 20", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(2)),
ingest_ts1,
@ -653,7 +653,7 @@ mod tests {
let write_operations = vec![DmlWrite::new(
"foo",
lines_to_batches("cpu bar=2 20", 0).unwrap(),
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::sequenced(
Sequence::new(ShardIndex::new(0), SequenceNumber::new(2)),
ingest_ts1,

View File

@ -547,7 +547,7 @@ mod tests {
None,
42,
);
DmlWrite::new(name, tables, Some("1970-01-01".into()), sequence)
DmlWrite::new(name, tables, "1970-01-01".into(), sequence)
}
// Return a DmlDelete with the given namespace.

View File

@ -272,7 +272,7 @@ mod tests {
/// Return a DmlWrite with the given metadata and a single table.
fn make_write(meta: DmlMeta) -> DmlWrite {
let tables = lines_to_batches("bananas level=42 4242", 0).unwrap();
DmlWrite::new("bananas", tables, None, meta)
DmlWrite::new("bananas", tables, "1970-01-01".into(), meta)
}
/// Extract the metric with the given name from `metrics`.

View File

@ -575,7 +575,7 @@ pub(crate) fn make_write_op(
DmlWrite::new(
namespace.to_string(),
lines_to_batches(lines, 0).unwrap(),
Some(partition_key.clone()),
partition_key.clone(),
DmlMeta::sequenced(
Sequence {
shard_index,

View File

@ -264,7 +264,7 @@ impl TestContext {
self.enqueue_write(DmlWrite::new(
namespace,
lines_to_batches(lp, 0).unwrap(),
Some(partition_key),
partition_key,
DmlMeta::sequenced(
Sequence::new(TEST_SHARD_INDEX, SequenceNumber::new(sequence_number)),
iox_time::SystemProvider::new().now(),

View File

@ -19,10 +19,7 @@ pub fn encode_write(db_name: &str, write: &DmlWrite) -> DatabaseBatch {
.tables()
.map(|(table_name, batch)| encode_batch(table_name, batch))
.collect(),
partition_key: write
.partition_key()
.map(ToString::to_string)
.unwrap_or_default(),
partition_key: write.partition_key().to_string(),
}
}

View File

@ -12,12 +12,7 @@ fn generate_pbdata_bytes() -> Vec<(String, (usize, Bytes))> {
.into_iter()
.map(|(bench, lp)| {
let batches = lines_to_batches(&lp, 0).unwrap();
let write = DmlWrite::new(
"test_db",
batches,
Some("bananas".into()),
Default::default(),
);
let write = DmlWrite::new("test_db", batches, "bananas".into(), Default::default());
let database_batch = mutable_batch_pb::encode::encode_write("db", &write);
let mut bytes = BytesMut::new();

View File

@ -830,7 +830,7 @@ impl MockIngester {
let op = DmlOperation::Write(DmlWrite::new(
self.ns.namespace.name.clone(),
mutable_batches,
Some(PartitionKey::from(partition_key)),
PartitionKey::from(partition_key),
meta,
));
(op, partition_ids)

View File

@ -118,7 +118,7 @@ where
let dml = DmlWrite::new(
namespace,
batch,
Some(partition_key.clone()),
partition_key.clone(),
DmlMeta::unsequenced(span_ctx.clone()),
);

View File

@ -195,9 +195,11 @@ pub fn decode(
})?;
let partition_key = if write.partition_key.is_empty() {
None
return Err(WriteBufferError::invalid_data(
"write contains no partition key",
));
} else {
Some(PartitionKey::from(write.partition_key))
PartitionKey::from(write.partition_key)
};
Ok(DmlOperation::Write(DmlWrite::new(

View File

@ -1,5 +1,5 @@
use async_trait::async_trait;
use data_types::{SequenceNumber, ShardIndex};
use data_types::{PartitionKey, SequenceNumber, ShardIndex};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use futures::stream::BoxStream;
use std::{
@ -178,7 +178,12 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
self.store_operation(
shard_index,
DmlOperation::Write(DmlWrite::new("test_db", tables, None, Default::default())),
DmlOperation::Write(DmlWrite::new(
"test_db",
tables,
PartitionKey::from("platanos"),
Default::default(),
)),
)
.await
}
@ -396,7 +401,7 @@ pub mod test_utils {
let write = DmlWrite::new(
namespace,
tables,
Some(partition_key),
partition_key,
DmlMeta::unsequenced(span_context.cloned()),
);
let operation = DmlOperation::Write(write);
@ -1240,7 +1245,7 @@ pub mod test_utils {
let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await;
let tables = mutable_batch_lp::lines_to_batches("upc user=1 100", 0).unwrap();
let write = DmlWrite::new("foo", tables, Some("bananas".into()), Default::default());
let write = DmlWrite::new("foo", tables, "bananas".into(), Default::default());
let operation = DmlOperation::Write(write);
let writer = context.writing(true).await.unwrap();
@ -1510,7 +1515,7 @@ pub mod test_utils {
fn partition_key(dml_op: &DmlOperation) -> Option<&PartitionKey> {
match dml_op {
DmlOperation::Write(w) => w.partition_key(),
DmlOperation::Write(w) => Some(w.partition_key()),
DmlOperation::Delete(_) => None,
}
}

View File

@ -119,14 +119,6 @@ impl WriteBufferWriting for RSKafkaProducer {
shard_index: ShardIndex,
operation: DmlOperation,
) -> Result<DmlMeta, WriteBufferError> {
// Sanity check to ensure only partitioned writes are pushed into Kafka.
if let DmlOperation::Write(w) = &operation {
assert!(
w.partition_key().is_some(),
"enqueuing unpartitioned write into kafka"
)
}
let producer = self
.producers
.get(&shard_index)
@ -834,25 +826,6 @@ mod tests {
);
}
#[tokio::test]
#[should_panic = "enqueuing unpartitioned write into kafka"]
async fn test_enqueue_no_partition_key() {
let conn = maybe_skip_kafka_integration!("enqueuing unpartitioned write into kafka");
let adapter = RSKafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let producer = ctx.writing(true).await.unwrap();
let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap();
let write = DmlWrite::new("bananas", tables, None, DmlMeta::unsequenced(None));
let shard_index = set_pop_first(&mut producer.shard_indexes()).unwrap();
producer
.store_operation(shard_index, DmlOperation::Write(write))
.await
.unwrap();
}
async fn write(
namespace: &str,
producer: &RSKafkaProducer,
@ -865,7 +838,7 @@ mod tests {
let write = DmlWrite::new(
namespace,
tables,
Some(partition_key.into()),
partition_key.into(),
DmlMeta::unsequenced(Some(span_ctx)),
);
let op = DmlOperation::Write(write);

View File

@ -236,7 +236,7 @@ mod tests {
DmlOperation::Write(DmlWrite::new(
NAMESPACE.to_string(),
m,
Some("1970-01-01".into()),
"1970-01-01".into(),
DmlMeta::unsequenced(Some(span)),
))
}

View File

@ -174,7 +174,7 @@ impl MockBufferSharedState {
pub fn push_lp(&self, sequence: Sequence, lp: &str) {
let tables = mutable_batch_lp::lines_to_batches(lp, 0).unwrap();
let meta = DmlMeta::sequenced(sequence, iox_time::Time::from_timestamp_nanos(0), None, 0);
self.push_write(DmlWrite::new("foo", tables, None, meta))
self.push_write(DmlWrite::new("foo", tables, "test-partition".into(), meta))
}
/// Push error to specified shard.
@ -731,19 +731,6 @@ mod tests {
perform_generic_tests(MockTestAdapter {}).await;
}
#[test]
#[should_panic(expected = "write must be sequenced")]
fn test_state_push_write_panic_unsequenced() {
let state = MockBufferSharedState::empty_with_n_shards(NonZeroU32::try_from(2).unwrap());
let tables = lines_to_batches("upc user=1 100", 0).unwrap();
state.push_write(DmlWrite::new(
"test_db",
tables,
None,
DmlMeta::unsequenced(None),
));
}
#[test]
#[should_panic(expected = "invalid shard index")]
fn test_state_push_write_panic_wrong_shard() {
@ -907,8 +894,12 @@ mod tests {
let writer = MockBufferForWritingThatAlwaysErrors {};
let tables = lines_to_batches("upc user=1 100", 0).unwrap();
let operation =
DmlOperation::Write(DmlWrite::new("test_db", tables, None, Default::default()));
let operation = DmlOperation::Write(DmlWrite::new(
"test_db",
tables,
"bananas".into(),
Default::default(),
));
assert_contains!(
writer