diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 2e6b1568d7..ec6cc385dc 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -372,14 +372,14 @@ pub mod test_utils { writer: &impl WriteBufferWriting, lp: &str, sequencer_id: u32, - partition_key: Option, + partition_key: PartitionKey, span_context: Option<&SpanContext>, ) -> DmlWrite { let tables = mutable_batch_lp::lines_to_batches(lp, 0).unwrap(); let write = DmlWrite::new( namespace, tables, - partition_key, + Some(partition_key), DmlMeta::unsequenced(span_context.cloned()), ); let operation = DmlOperation::Write(write); @@ -431,7 +431,7 @@ pub mod test_utils { &writer, entry_1, sequencer_id, - Some(PartitionKey::from("bananas")), + PartitionKey::from("bananas"), None, ) .await; @@ -441,13 +441,21 @@ pub mod test_utils { assert_stream_pending(&mut stream).await; // adding more data unblocks the stream - let w2 = write("namespace", &writer, entry_2, sequencer_id, None, None).await; + let w2 = write( + "namespace", + &writer, + entry_2, + sequencer_id, + PartitionKey::from("bananas"), + None, + ) + .await; let w3 = write( "namespace", &writer, entry_3, sequencer_id, - Some(PartitionKey::from("bananas")), + PartitionKey::from("bananas"), None, ) .await; @@ -478,9 +486,33 @@ pub mod test_utils { let writer = context.writing(true).await.unwrap(); let reader = context.reading(true).await.unwrap(); - let w1 = write("namespace", &writer, entry_1, 0, None, None).await; - let w2 = write("namespace", &writer, entry_2, 0, None, None).await; - let w3 = write("namespace", &writer, entry_3, 0, None, None).await; + let w1 = write( + "namespace", + &writer, + entry_1, + 0, + PartitionKey::from("bananas"), + None, + ) + .await; + let w2 = write( + "namespace", + &writer, + entry_2, + 0, + PartitionKey::from("bananas"), + None, + ) + .await; + let w3 = write( + "namespace", + &writer, + entry_3, + 0, + PartitionKey::from("bananas"), + None, + ) + .await; // creating stream, drop stream, re-create it => still starts at first entry let sequencer_id = set_pop_first(&mut reader.sequencer_ids()).unwrap(); @@ -552,7 +584,7 @@ pub mod test_utils { &writer, entry_1, sequencer_id_1, - Some(PartitionKey::from("bananas")), + PartitionKey::from("bananas"), None, ) .await; @@ -564,7 +596,7 @@ pub mod test_utils { &writer, entry_2, sequencer_id_2, - Some(PartitionKey::from("bananas")), + PartitionKey::from("bananas"), None, ) .await; @@ -576,7 +608,7 @@ pub mod test_utils { &writer, entry_3, sequencer_id_1, - Some(PartitionKey::from("bananas")), + PartitionKey::from("bananas"), None, ) .await; @@ -623,7 +655,7 @@ pub mod test_utils { &writer_1, entry_east_1, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -632,7 +664,7 @@ pub mod test_utils { &writer_1, entry_west_1, sequencer_id_2, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -641,7 +673,7 @@ pub mod test_utils { &writer_2, entry_east_2, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -687,7 +719,7 @@ pub mod test_utils { &writer, entry_east_1, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -696,7 +728,7 @@ pub mod test_utils { &writer, entry_east_2, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -705,7 +737,7 @@ pub mod test_utils { &writer, entry_west_1, sequencer_id_2, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -744,7 +776,15 @@ pub mod test_utils { .seek(SequenceNumber::new(1_000_000)) .await .unwrap(); - let w_east_3 = write("namespace", &writer, entry_east_3, 0, None, None).await; + let w_east_3 = write( + "namespace", + &writer, + entry_east_3, + 0, + PartitionKey::from("bananas"), + None, + ) + .await; let err = handler_1_1_a .stream() @@ -794,7 +834,7 @@ pub mod test_utils { &writer, entry_east_1, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -803,7 +843,7 @@ pub mod test_utils { &writer, entry_east_2, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -867,7 +907,7 @@ pub mod test_utils { &writer, entry_east_1, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -876,7 +916,7 @@ pub mod test_utils { &writer, entry_east_2, sequencer_id_1, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -885,7 +925,7 @@ pub mod test_utils { &writer, entry_west_1, sequencer_id_2, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -924,7 +964,15 @@ pub mod test_utils { assert_eq!(sequencer_ids.len(), 1); let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap(); - let write = write("namespace", &writer, entry, sequencer_id, None, None).await; + let write = write( + "namespace", + &writer, + entry, + sequencer_id, + PartitionKey::from("bananas"), + None, + ) + .await; let reported_ts = write.meta().producer_ts().unwrap(); // advance time @@ -1014,7 +1062,15 @@ pub mod test_utils { let mut stream = handler.stream().await; // 1: no context - write("namespace", &writer, entry, sequencer_id, None, None).await; + write( + "namespace", + &writer, + entry, + sequencer_id, + PartitionKey::from("bananas"), + None, + ) + .await; // check write 1 let write_1 = stream.next().await.unwrap().unwrap(); @@ -1031,7 +1087,7 @@ pub mod test_utils { &writer, entry, sequencer_id, - None, + PartitionKey::from("bananas"), Some(&span_context_1), ) .await; @@ -1044,7 +1100,7 @@ pub mod test_utils { &writer, entry, sequencer_id, - None, + PartitionKey::from("bananas"), Some(&span_context_2), ) .await; @@ -1106,8 +1162,24 @@ pub mod test_utils { assert_eq!(sequencer_ids.len(), 1); let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap(); - let w1 = write("namespace_1", &writer, entry_2, sequencer_id, None, None).await; - let w2 = write("namespace_2", &writer, entry_1, sequencer_id, None, None).await; + let w1 = write( + "namespace_1", + &writer, + entry_2, + sequencer_id, + PartitionKey::from("bananas"), + None, + ) + .await; + let w2 = write( + "namespace_2", + &writer, + entry_1, + sequencer_id, + PartitionKey::from("bananas"), + None, + ) + .await; let mut handler = reader.stream_handler(sequencer_id).await.unwrap(); assert_reader_content(&mut handler, &[&w1, &w2]).await; @@ -1133,7 +1205,15 @@ pub mod test_utils { async move { let entry = format!("upc,region=east user={} {}", i, i); - write("ns", writer.as_ref(), &entry, sequencer_id, None, None).await; + write( + "ns", + writer.as_ref(), + &entry, + sequencer_id, + PartitionKey::from("bananas"), + None, + ) + .await; } }) .collect(); diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index 0b28b61d46..a9d2538a9a 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -732,6 +732,7 @@ pub mod test_utils { mod tests { use std::{num::NonZeroU32, time::Duration}; + use data_types::PartitionKey; use dml::test_util::assert_write_op_eq; use tempfile::TempDir; use trace::RingBufferTraceCollector; @@ -841,7 +842,7 @@ mod tests { &writer, entry_1, sequencer_id, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -850,7 +851,7 @@ mod tests { &writer, entry_2, sequencer_id, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -859,7 +860,7 @@ mod tests { &writer, entry_3, sequencer_id, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -868,7 +869,7 @@ mod tests { &writer, entry_4, sequencer_id, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -911,7 +912,7 @@ mod tests { &writer, entry_1, sequencer_id, - None, + PartitionKey::from("bananas"), None, ) .await; @@ -920,7 +921,7 @@ mod tests { &writer, entry_2, sequencer_id, - None, + PartitionKey::from("bananas"), None, ) .await; diff --git a/write_buffer/src/kafka/aggregator.rs b/write_buffer/src/kafka/aggregator.rs index 6903b2dd77..f2de52f1de 100644 --- a/write_buffer/src/kafka/aggregator.rs +++ b/write_buffer/src/kafka/aggregator.rs @@ -1,5 +1,5 @@ use crate::codec::{ContentType, IoxHeaders}; -use data_types::{Sequence, SequenceNumber}; +use data_types::{PartitionKey, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlOperation, DmlWrite}; use hashbrown::{hash_map::Entry, HashMap}; use iox_time::{Time, TimeProvider}; @@ -34,6 +34,9 @@ struct WriteAggregator { /// Data for every table. tables: HashMap, + /// The partition key derived for these batches. + partition_key: PartitionKey, + /// Span recorder to link spans from incoming writes to aggregated write. span_recorder: Option, @@ -49,9 +52,15 @@ impl WriteAggregator { let mut span_recorder = None; Self::record_span(&mut span_recorder, write.meta().span_context(), &collector); + let partition_key = write + .partition_key() + .cloned() + .expect("enqueuing unpartitioned write into kafka"); + Self { namespace: write.namespace().to_owned(), tables: write.into_tables().collect(), + partition_key, span_recorder, tag, collector, @@ -91,6 +100,11 @@ impl WriteAggregator { fn can_push(&self, write: &DmlWrite) -> bool { assert_eq!(write.namespace(), self.namespace); + // Only batch together writes for the same partition. + if write.partition_key() != Some(&self.partition_key) { + return false; + } + for (table, batch) in write.tables() { if let Some(existing) = self.tables.get(table) { match ( @@ -151,7 +165,12 @@ impl WriteAggregator { }; let meta = DmlMeta::unsequenced(ctx); - DmlWrite::new(self.namespace.clone(), self.tables.clone(), None, meta) + DmlWrite::new( + self.namespace.clone(), + self.tables.clone(), + Some(self.partition_key.clone()), + meta, + ) } } diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 4ffda8edad..0896d9a58a 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -377,7 +377,7 @@ mod tests { }, maybe_skip_kafka_integration, }; - use data_types::{DeletePredicate, TimestampRange}; + use data_types::{DeletePredicate, PartitionKey, TimestampRange}; use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite}; use futures::{stream::FuturesUnordered, TryStreamExt}; use rskafka::{client::partition::Compression, record::Record}; @@ -534,7 +534,7 @@ mod tests { &producer, "table foo=1 1", sequencer_id, - None, + "bananas".into(), None, ) .await; @@ -567,19 +567,19 @@ mod tests { let (w1_1, w1_2, w2_1, d1_1, d1_2, w1_3, w1_4, w2_2) = tokio::join!( // ns1: batch 1 - write("ns1", &producer, &trace_collector, sequencer_id), - write("ns1", &producer, &trace_collector, sequencer_id), + write("ns1", &producer, &trace_collector, sequencer_id, "bananas"), + write("ns1", &producer, &trace_collector, sequencer_id, "bananas"), // ns2: batch 1, part A - write("ns2", &producer, &trace_collector, sequencer_id), + write("ns2", &producer, &trace_collector, sequencer_id, "bananas"), // ns1: batch 2 delete("ns1", &producer, &trace_collector, sequencer_id), // ns1: batch 3 delete("ns1", &producer, &trace_collector, sequencer_id), // ns1: batch 4 - write("ns1", &producer, &trace_collector, sequencer_id), - write("ns1", &producer, &trace_collector, sequencer_id), + write("ns1", &producer, &trace_collector, sequencer_id, "bananas"), + write("ns1", &producer, &trace_collector, sequencer_id, "bananas"), // ns2: batch 1, part B - write("ns2", &producer, &trace_collector, sequencer_id), + write("ns2", &producer, &trace_collector, sequencer_id, "bananas"), ); // ensure that write operations were fused @@ -675,18 +675,69 @@ mod tests { ); } + // Coverage of https://github.com/influxdata/influxdb_iox/issues/4787 + #[tokio::test] + async fn test_batching_respects_partitioning() { + let conn = maybe_skip_kafka_integration!(); + let adapter = RSKafkaTestAdapter::new(conn); + let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await; + let trace_collector = ctx.trace_collector(); + + let producer = ctx.writing(true).await.unwrap(); + + let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap(); + + let (w1, w2, w3) = tokio::join!( + // These two ops have the same partition key, and therefore can be + // merged together. + write("ns1", &producer, &trace_collector, sequencer_id, "bananas"), + write("ns1", &producer, &trace_collector, sequencer_id, "bananas"), + // However this op has a different partition_key and cannot be + // merged with the others. + write("ns1", &producer, &trace_collector, sequencer_id, "platanos"), + ); + + // Assert ops 1 and 2 were merged, by asserting they have the same + // sequence number. + assert_eq!(w1.sequence().unwrap(), w2.sequence().unwrap()); + + // And assert the third op was not merged because of the differing + // partition key. + assert_ne!(w1.sequence().unwrap(), w3.sequence().unwrap()); + } + + #[tokio::test] + #[should_panic = "enqueuing unpartitioned write into kafka"] + async fn test_enqueue_no_partition_key() { + let conn = maybe_skip_kafka_integration!(); + let adapter = RSKafkaTestAdapter::new(conn); + let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await; + + let producer = ctx.writing(true).await.unwrap(); + + let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap(); + let write = DmlWrite::new("bananas", tables, None, DmlMeta::unsequenced(None)); + + let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap(); + producer + .store_operation(sequencer_id, &DmlOperation::Write(write)) + .await + .unwrap(); + } + async fn write( namespace: &str, producer: &RSKafkaProducer, trace_collector: &Arc, sequencer_id: u32, + partition_key: impl Into + Send, ) -> DmlMeta { let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>); let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap(); let write = DmlWrite::new( namespace, tables, - None, + Some(partition_key.into()), DmlMeta::unsequenced(Some(span_ctx)), ); let op = DmlOperation::Write(write);