Merge pull request #4878 from influxdata/dom/fix-dml-aggregation

fix: respect partition key when batching dml ops
pull/24376/head
Dom 2022-06-16 16:00:09 +01:00 committed by GitHub
commit 9122850fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 198 additions and 47 deletions

View File

@ -372,14 +372,14 @@ pub mod test_utils {
writer: &impl WriteBufferWriting,
lp: &str,
sequencer_id: u32,
partition_key: Option<PartitionKey>,
partition_key: PartitionKey,
span_context: Option<&SpanContext>,
) -> DmlWrite {
let tables = mutable_batch_lp::lines_to_batches(lp, 0).unwrap();
let write = DmlWrite::new(
namespace,
tables,
partition_key,
Some(partition_key),
DmlMeta::unsequenced(span_context.cloned()),
);
let operation = DmlOperation::Write(write);
@ -431,7 +431,7 @@ pub mod test_utils {
&writer,
entry_1,
sequencer_id,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -441,13 +441,21 @@ pub mod test_utils {
assert_stream_pending(&mut stream).await;
// adding more data unblocks the stream
let w2 = write("namespace", &writer, entry_2, sequencer_id, None, None).await;
let w2 = write(
"namespace",
&writer,
entry_2,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let w3 = write(
"namespace",
&writer,
entry_3,
sequencer_id,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -478,9 +486,33 @@ pub mod test_utils {
let writer = context.writing(true).await.unwrap();
let reader = context.reading(true).await.unwrap();
let w1 = write("namespace", &writer, entry_1, 0, None, None).await;
let w2 = write("namespace", &writer, entry_2, 0, None, None).await;
let w3 = write("namespace", &writer, entry_3, 0, None, None).await;
let w1 = write(
"namespace",
&writer,
entry_1,
0,
PartitionKey::from("bananas"),
None,
)
.await;
let w2 = write(
"namespace",
&writer,
entry_2,
0,
PartitionKey::from("bananas"),
None,
)
.await;
let w3 = write(
"namespace",
&writer,
entry_3,
0,
PartitionKey::from("bananas"),
None,
)
.await;
// creating stream, drop stream, re-create it => still starts at first entry
let sequencer_id = set_pop_first(&mut reader.sequencer_ids()).unwrap();
@ -552,7 +584,7 @@ pub mod test_utils {
&writer,
entry_1,
sequencer_id_1,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -564,7 +596,7 @@ pub mod test_utils {
&writer,
entry_2,
sequencer_id_2,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -576,7 +608,7 @@ pub mod test_utils {
&writer,
entry_3,
sequencer_id_1,
Some(PartitionKey::from("bananas")),
PartitionKey::from("bananas"),
None,
)
.await;
@ -623,7 +655,7 @@ pub mod test_utils {
&writer_1,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -632,7 +664,7 @@ pub mod test_utils {
&writer_1,
entry_west_1,
sequencer_id_2,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -641,7 +673,7 @@ pub mod test_utils {
&writer_2,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -687,7 +719,7 @@ pub mod test_utils {
&writer,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -696,7 +728,7 @@ pub mod test_utils {
&writer,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -705,7 +737,7 @@ pub mod test_utils {
&writer,
entry_west_1,
sequencer_id_2,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -744,7 +776,15 @@ pub mod test_utils {
.seek(SequenceNumber::new(1_000_000))
.await
.unwrap();
let w_east_3 = write("namespace", &writer, entry_east_3, 0, None, None).await;
let w_east_3 = write(
"namespace",
&writer,
entry_east_3,
0,
PartitionKey::from("bananas"),
None,
)
.await;
let err = handler_1_1_a
.stream()
@ -794,7 +834,7 @@ pub mod test_utils {
&writer,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -803,7 +843,7 @@ pub mod test_utils {
&writer,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -867,7 +907,7 @@ pub mod test_utils {
&writer,
entry_east_1,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -876,7 +916,7 @@ pub mod test_utils {
&writer,
entry_east_2,
sequencer_id_1,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -885,7 +925,7 @@ pub mod test_utils {
&writer,
entry_west_1,
sequencer_id_2,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -924,7 +964,15 @@ pub mod test_utils {
assert_eq!(sequencer_ids.len(), 1);
let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap();
let write = write("namespace", &writer, entry, sequencer_id, None, None).await;
let write = write(
"namespace",
&writer,
entry,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let reported_ts = write.meta().producer_ts().unwrap();
// advance time
@ -1014,7 +1062,15 @@ pub mod test_utils {
let mut stream = handler.stream().await;
// 1: no context
write("namespace", &writer, entry, sequencer_id, None, None).await;
write(
"namespace",
&writer,
entry,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
// check write 1
let write_1 = stream.next().await.unwrap().unwrap();
@ -1031,7 +1087,7 @@ pub mod test_utils {
&writer,
entry,
sequencer_id,
None,
PartitionKey::from("bananas"),
Some(&span_context_1),
)
.await;
@ -1044,7 +1100,7 @@ pub mod test_utils {
&writer,
entry,
sequencer_id,
None,
PartitionKey::from("bananas"),
Some(&span_context_2),
)
.await;
@ -1106,8 +1162,24 @@ pub mod test_utils {
assert_eq!(sequencer_ids.len(), 1);
let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap();
let w1 = write("namespace_1", &writer, entry_2, sequencer_id, None, None).await;
let w2 = write("namespace_2", &writer, entry_1, sequencer_id, None, None).await;
let w1 = write(
"namespace_1",
&writer,
entry_2,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let w2 = write(
"namespace_2",
&writer,
entry_1,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
let mut handler = reader.stream_handler(sequencer_id).await.unwrap();
assert_reader_content(&mut handler, &[&w1, &w2]).await;
@ -1133,7 +1205,15 @@ pub mod test_utils {
async move {
let entry = format!("upc,region=east user={} {}", i, i);
write("ns", writer.as_ref(), &entry, sequencer_id, None, None).await;
write(
"ns",
writer.as_ref(),
&entry,
sequencer_id,
PartitionKey::from("bananas"),
None,
)
.await;
}
})
.collect();

View File

@ -732,6 +732,7 @@ pub mod test_utils {
mod tests {
use std::{num::NonZeroU32, time::Duration};
use data_types::PartitionKey;
use dml::test_util::assert_write_op_eq;
use tempfile::TempDir;
use trace::RingBufferTraceCollector;
@ -841,7 +842,7 @@ mod tests {
&writer,
entry_1,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -850,7 +851,7 @@ mod tests {
&writer,
entry_2,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -859,7 +860,7 @@ mod tests {
&writer,
entry_3,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -868,7 +869,7 @@ mod tests {
&writer,
entry_4,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -911,7 +912,7 @@ mod tests {
&writer,
entry_1,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;
@ -920,7 +921,7 @@ mod tests {
&writer,
entry_2,
sequencer_id,
None,
PartitionKey::from("bananas"),
None,
)
.await;

View File

@ -1,5 +1,5 @@
use crate::codec::{ContentType, IoxHeaders};
use data_types::{Sequence, SequenceNumber};
use data_types::{PartitionKey, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use hashbrown::{hash_map::Entry, HashMap};
use iox_time::{Time, TimeProvider};
@ -34,6 +34,9 @@ struct WriteAggregator {
/// Data for every table.
tables: HashMap<String, MutableBatch>,
/// The partition key derived for these batches.
partition_key: PartitionKey,
/// Span recorder to link spans from incoming writes to aggregated write.
span_recorder: Option<SpanRecorder>,
@ -49,9 +52,15 @@ impl WriteAggregator {
let mut span_recorder = None;
Self::record_span(&mut span_recorder, write.meta().span_context(), &collector);
let partition_key = write
.partition_key()
.cloned()
.expect("enqueuing unpartitioned write into kafka");
Self {
namespace: write.namespace().to_owned(),
tables: write.into_tables().collect(),
partition_key,
span_recorder,
tag,
collector,
@ -91,6 +100,11 @@ impl WriteAggregator {
fn can_push(&self, write: &DmlWrite) -> bool {
assert_eq!(write.namespace(), self.namespace);
// Only batch together writes for the same partition.
if write.partition_key() != Some(&self.partition_key) {
return false;
}
for (table, batch) in write.tables() {
if let Some(existing) = self.tables.get(table) {
match (
@ -151,7 +165,12 @@ impl WriteAggregator {
};
let meta = DmlMeta::unsequenced(ctx);
DmlWrite::new(self.namespace.clone(), self.tables.clone(), None, meta)
DmlWrite::new(
self.namespace.clone(),
self.tables.clone(),
Some(self.partition_key.clone()),
meta,
)
}
}

View File

@ -377,7 +377,7 @@ mod tests {
},
maybe_skip_kafka_integration,
};
use data_types::{DeletePredicate, TimestampRange};
use data_types::{DeletePredicate, PartitionKey, TimestampRange};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use rskafka::{client::partition::Compression, record::Record};
@ -534,7 +534,7 @@ mod tests {
&producer,
"table foo=1 1",
sequencer_id,
None,
"bananas".into(),
None,
)
.await;
@ -567,19 +567,19 @@ mod tests {
let (w1_1, w1_2, w2_1, d1_1, d1_2, w1_3, w1_4, w2_2) = tokio::join!(
// ns1: batch 1
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
// ns2: batch 1, part A
write("ns2", &producer, &trace_collector, sequencer_id),
write("ns2", &producer, &trace_collector, sequencer_id, "bananas"),
// ns1: batch 2
delete("ns1", &producer, &trace_collector, sequencer_id),
// ns1: batch 3
delete("ns1", &producer, &trace_collector, sequencer_id),
// ns1: batch 4
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
// ns2: batch 1, part B
write("ns2", &producer, &trace_collector, sequencer_id),
write("ns2", &producer, &trace_collector, sequencer_id, "bananas"),
);
// ensure that write operations were fused
@ -675,18 +675,69 @@ mod tests {
);
}
// Coverage of https://github.com/influxdata/influxdb_iox/issues/4787
#[tokio::test]
async fn test_batching_respects_partitioning() {
let conn = maybe_skip_kafka_integration!();
let adapter = RSKafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let trace_collector = ctx.trace_collector();
let producer = ctx.writing(true).await.unwrap();
let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap();
let (w1, w2, w3) = tokio::join!(
// These two ops have the same partition key, and therefore can be
// merged together.
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
// However this op has a different partition_key and cannot be
// merged with the others.
write("ns1", &producer, &trace_collector, sequencer_id, "platanos"),
);
// Assert ops 1 and 2 were merged, by asserting they have the same
// sequence number.
assert_eq!(w1.sequence().unwrap(), w2.sequence().unwrap());
// And assert the third op was not merged because of the differing
// partition key.
assert_ne!(w1.sequence().unwrap(), w3.sequence().unwrap());
}
#[tokio::test]
#[should_panic = "enqueuing unpartitioned write into kafka"]
async fn test_enqueue_no_partition_key() {
let conn = maybe_skip_kafka_integration!();
let adapter = RSKafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let producer = ctx.writing(true).await.unwrap();
let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap();
let write = DmlWrite::new("bananas", tables, None, DmlMeta::unsequenced(None));
let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap();
producer
.store_operation(sequencer_id, &DmlOperation::Write(write))
.await
.unwrap();
}
async fn write(
namespace: &str,
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
sequencer_id: u32,
partition_key: impl Into<PartitionKey> + Send,
) -> DmlMeta {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap();
let write = DmlWrite::new(
namespace,
tables,
None,
Some(partition_key.into()),
DmlMeta::unsequenced(Some(span_ctx)),
);
let op = DmlOperation::Write(write);