fix: respect partition key when batching dml ops

This commit changes the kafka write aggregator to only merge DML ops
destined for the same partition.

Prior to this commit the aggregator was merging DML ops that had
different partition keys, causing data to be persisted in incorrect
partitions:

    https://github.com/influxdata/influxdb_iox/issues/4787
pull/24376/head
Dom Dwyer 2022-06-15 15:10:28 +01:00
parent 743c1692ea
commit 43b3f22411
4 changed files with 198 additions and 47 deletions

View File

@ -372,14 +372,14 @@ pub mod test_utils {
writer: &impl WriteBufferWriting,
lp: &str,
sequencer_id: u32,
partition_key: Option<PartitionKey>,
partition_key: PartitionKey,
span_context: Option<&SpanContext>,
) -> DmlWrite {
let tables = mutable_batch_lp::lines_to_batches(lp, 0).unwrap();
let write = DmlWrite::new(
namespace,
tables,
partition_key,
Some(partition_key),
DmlMeta::unsequenced(span_context.cloned()),
);
let operation = DmlOperation::Write(write);
@ -431,7 +431,7 @@ pub mod test_utils {
&writer,
entry_1,
sequencer_id,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -441,13 +441,21 @@ pub mod test_utils {
assert_stream_pending(&mut stream).await;
// adding more data unblocks the stream
let w2 = write("namespace", &writer, entry_2, sequencer_id, None, None).await;
let w2 = write(
"namespace",
&writer,
entry_2,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let w3 = write(
"namespace",
&writer,
entry_3,
sequencer_id,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -478,9 +486,33 @@ pub mod test_utils {
let writer = context.writing(true).await.unwrap();
let reader = context.reading(true).await.unwrap();
let w1 = write("namespace", &writer, entry_1, 0, None, None).await;
let w2 = write("namespace", &writer, entry_2, 0, None, None).await;
let w3 = write("namespace", &writer, entry_3, 0, None, None).await;
let w1 = write(
"namespace",
&writer,
entry_1,
0,
PartitionKey::from("bananas"),
None,
)
.await;
let w2 = write(
"namespace",
&writer,
entry_2,
0,
PartitionKey::from("bananas"),
None,
)
.await;
let w3 = write(
"namespace",
&writer,
entry_3,
0,
PartitionKey::from("bananas"),
None,
)
.await;
// creating stream, drop stream, re-create it => still starts at first entry
let sequencer_id = set_pop_first(&mut reader.sequencer_ids()).unwrap();
@ -552,7 +584,7 @@ pub mod test_utils {
&writer,
entry_1,
sequencer_id_1,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -564,7 +596,7 @@ pub mod test_utils {
&writer,
entry_2,
sequencer_id_2,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -576,7 +608,7 @@ pub mod test_utils {
&writer,
entry_3,
sequencer_id_1,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -623,7 +655,7 @@ pub mod test_utils {
&writer_1,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -632,7 +664,7 @@ pub mod test_utils {
&writer_1,
entry_west_1,
sequencer_id_2,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -641,7 +673,7 @@ pub mod test_utils {
&writer_2,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -687,7 +719,7 @@ pub mod test_utils {
&writer,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -696,7 +728,7 @@ pub mod test_utils {
&writer,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -705,7 +737,7 @@ pub mod test_utils {
&writer,
entry_west_1,
sequencer_id_2,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -744,7 +776,15 @@ pub mod test_utils {
.seek(SequenceNumber::new(1_000_000))
.await
.unwrap();
let w_east_3 = write("namespace", &writer, entry_east_3, 0, None, None).await;
let w_east_3 = write(
"namespace",
&writer,
entry_east_3,
0,
PartitionKey::from("bananas"),
None,
)
.await;
let err = handler_1_1_a
.stream()
@ -794,7 +834,7 @@ pub mod test_utils {
&writer,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -803,7 +843,7 @@ pub mod test_utils {
&writer,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -867,7 +907,7 @@ pub mod test_utils {
&writer,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -876,7 +916,7 @@ pub mod test_utils {
&writer,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -885,7 +925,7 @@ pub mod test_utils {
&writer,
entry_west_1,
sequencer_id_2,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -924,7 +964,15 @@ pub mod test_utils {
assert_eq!(sequencer_ids.len(), 1);
let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap();
let write = write("namespace", &writer, entry, sequencer_id, None, None).await;
let write = write(
"namespace",
&writer,
entry,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let reported_ts = write.meta().producer_ts().unwrap();
// advance time
@ -1014,7 +1062,15 @@ pub mod test_utils {
let mut stream = handler.stream().await;
// 1: no context
write("namespace", &writer, entry, sequencer_id, None, None).await;
write(
"namespace",
&writer,
entry,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
// check write 1
let write_1 = stream.next().await.unwrap().unwrap();
@ -1031,7 +1087,7 @@ pub mod test_utils {
&writer,
entry,
sequencer_id,
None,
PartitionKey::from("bananas"),
Some(&span_context_1),
)
.await;
@ -1044,7 +1100,7 @@ pub mod test_utils {
&writer,
entry,
sequencer_id,
None,
PartitionKey::from("bananas"),
Some(&span_context_2),
)
.await;
@ -1106,8 +1162,24 @@ pub mod test_utils {
assert_eq!(sequencer_ids.len(), 1);
let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap();
let w1 = write("namespace_1", &writer, entry_2, sequencer_id, None, None).await;
let w2 = write("namespace_2", &writer, entry_1, sequencer_id, None, None).await;
let w1 = write(
"namespace_1",
&writer,
entry_2,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let w2 = write(
"namespace_2",
&writer,
entry_1,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let mut handler = reader.stream_handler(sequencer_id).await.unwrap();
assert_reader_content(&mut handler, &[&w1, &w2]).await;
@ -1133,7 +1205,15 @@ pub mod test_utils {
async move {
let entry = format!("upc,region=east user={} {}", i, i);
write("ns", writer.as_ref(), &entry, sequencer_id, None, None).await;
write(
"ns",
writer.as_ref(),
&entry,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
}
})
.collect();

View File

@ -732,6 +732,7 @@ pub mod test_utils {
mod tests {
use std::{num::NonZeroU32, time::Duration};
use data_types::PartitionKey;
use dml::test_util::assert_write_op_eq;
use tempfile::TempDir;
use trace::RingBufferTraceCollector;
@ -841,7 +842,7 @@ mod tests {
&writer,
entry_1,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -850,7 +851,7 @@ mod tests {
&writer,
entry_2,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -859,7 +860,7 @@ mod tests {
&writer,
entry_3,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -868,7 +869,7 @@ mod tests {
&writer,
entry_4,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -911,7 +912,7 @@ mod tests {
&writer,
entry_1,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -920,7 +921,7 @@ mod tests {
&writer,
entry_2,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;

View File

@ -1,5 +1,5 @@
use crate::codec::{ContentType, IoxHeaders};
use data_types::{Sequence, SequenceNumber};
use data_types::{PartitionKey, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use hashbrown::{hash_map::Entry, HashMap};
use iox_time::{Time, TimeProvider};
@ -34,6 +34,9 @@ struct WriteAggregator {
/// Data for every table.
tables: HashMap<String, MutableBatch>,
/// The partition key derived for these batches.
partition_key: PartitionKey,
/// Span recorder to link spans from incoming writes to aggregated write.
span_recorder: Option<SpanRecorder>,
@ -49,9 +52,15 @@ impl WriteAggregator {
let mut span_recorder = None;
Self::record_span(&mut span_recorder, write.meta().span_context(), &collector);
let partition_key = write
.partition_key()
.cloned()
.expect("enqueuing unpartitioned write into kafka");
Self {
namespace: write.namespace().to_owned(),
tables: write.into_tables().collect(),
partition_key,
span_recorder,
tag,
collector,
@ -91,6 +100,11 @@ impl WriteAggregator {
fn can_push(&self, write: &DmlWrite) -> bool {
assert_eq!(write.namespace(), self.namespace);
// Only batch together writes for the same partition.
if write.partition_key() != Some(&self.partition_key) {
return false;
}
for (table, batch) in write.tables() {
if let Some(existing) = self.tables.get(table) {
match (
@ -151,7 +165,12 @@ impl WriteAggregator {
};
let meta = DmlMeta::unsequenced(ctx);
DmlWrite::new(self.namespace.clone(), self.tables.clone(), None, meta)
DmlWrite::new(
self.namespace.clone(),
self.tables.clone(),
Some(self.partition_key.clone()),
meta,
)
}
}

View File

@ -377,7 +377,7 @@ mod tests {
},
maybe_skip_kafka_integration,
};
use data_types::{DeletePredicate, TimestampRange};
use data_types::{DeletePredicate, PartitionKey, TimestampRange};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use rskafka::{client::partition::Compression, record::Record};
@ -534,7 +534,7 @@ mod tests {
&producer,
"table foo=1 1",
sequencer_id,
None,
"bananas".into(),
None,
)
.await;
@ -567,19 +567,19 @@ mod tests {
let (w1_1, w1_2, w2_1, d1_1, d1_2, w1_3, w1_4, w2_2) = tokio::join!(
// ns1: batch 1
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
// ns2: batch 1, part A
write("ns2", &producer, &trace_collector, sequencer_id),
write("ns2", &producer, &trace_collector, sequencer_id, "bananas"),
// ns1: batch 2
delete("ns1", &producer, &trace_collector, sequencer_id),
// ns1: batch 3
delete("ns1", &producer, &trace_collector, sequencer_id),
// ns1: batch 4
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
// ns2: batch 1, part B
write("ns2", &producer, &trace_collector, sequencer_id),
write("ns2", &producer, &trace_collector, sequencer_id, "bananas"),
);
// ensure that write operations were fused
@ -675,18 +675,69 @@ mod tests {
);
}
// Coverage of https://github.com/influxdata/influxdb_iox/issues/4787
#[tokio::test]
async fn test_batching_respects_partitioning() {
let conn = maybe_skip_kafka_integration!();
let adapter = RSKafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let trace_collector = ctx.trace_collector();
let producer = ctx.writing(true).await.unwrap();
let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap();
let (w1, w2, w3) = tokio::join!(
// These two ops have the same partition key, and therefore can be
// merged together.
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
// However this op has a different partition_key and cannot be
// merged with the others.
write("ns1", &producer, &trace_collector, sequencer_id, "platanos"),
);
// Assert ops 1 and 2 were merged, by asserting they have the same
// sequence number.
assert_eq!(w1.sequence().unwrap(), w2.sequence().unwrap());
// And assert the third op was not merged because of the differing
// partition key.
assert_ne!(w1.sequence().unwrap(), w3.sequence().unwrap());
}
#[tokio::test]
#[should_panic = "enqueuing unpartitioned write into kafka"]
async fn test_enqueue_no_partition_key() {
let conn = maybe_skip_kafka_integration!();
let adapter = RSKafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let producer = ctx.writing(true).await.unwrap();
let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap();
let write = DmlWrite::new("bananas", tables, None, DmlMeta::unsequenced(None));
let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap();
producer
.store_operation(sequencer_id, &DmlOperation::Write(write))
.await
.unwrap();
}
async fn write(
namespace: &str,
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
sequencer_id: u32,
partition_key: impl Into<PartitionKey> + Send,
) -> DmlMeta {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap();
let write = DmlWrite::new(
namespace,
tables,
None,
Some(partition_key.into()),
DmlMeta::unsequenced(Some(span_ctx)),
);
let op = DmlOperation::Write(write);