test: update batching tests for new aggregator
Previously aggregated writes were merged into a single Kafka Record - this meant that all merged ops would be placed into the same Record, and therefore receive the same sequence number once published to Kafka. The new aggregator batches at the Record level, therefore aggregated writes now get their own distinct sequence number. This commit updates the batching tests to reflect this new sequence number assignment behaviour.pull/24376/head
parent
6d6fc9a08b
commit
8b054c14a8
|
@ -504,7 +504,6 @@ mod tests {
|
|||
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
|
||||
use futures::{stream::FuturesUnordered, TryStreamExt};
|
||||
use iox_time::TimeProvider;
|
||||
use metric::{Metric, U64Histogram};
|
||||
use rskafka::{client::partition::Compression, record::Record};
|
||||
use std::num::NonZeroU32;
|
||||
use test_helpers::assert_contains;
|
||||
|
@ -557,6 +556,7 @@ mod tests {
|
|||
})
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn metrics(&self) -> &metric::Registry {
|
||||
&self.metrics
|
||||
}
|
||||
|
@ -715,97 +715,50 @@ mod tests {
|
|||
write("ns2", &producer, &trace_collector, sequencer_id, "bananas"),
|
||||
);
|
||||
|
||||
// ensure that write operations were fused
|
||||
assert_eq!(w1_1.sequence().unwrap(), w1_2.sequence().unwrap());
|
||||
// ensure that write operations were NOT fused
|
||||
assert_ne!(w1_1.sequence().unwrap(), w1_2.sequence().unwrap());
|
||||
assert_ne!(w1_2.sequence().unwrap(), d1_1.sequence().unwrap());
|
||||
assert_ne!(d1_1.sequence().unwrap(), d1_2.sequence().unwrap());
|
||||
assert_ne!(d1_2.sequence().unwrap(), w1_3.sequence().unwrap());
|
||||
assert_eq!(w1_3.sequence().unwrap(), w1_4.sequence().unwrap());
|
||||
assert_ne!(w1_3.sequence().unwrap(), w1_4.sequence().unwrap());
|
||||
assert_ne!(w1_4.sequence().unwrap(), w1_1.sequence().unwrap());
|
||||
|
||||
assert_ne!(w2_1.sequence().unwrap(), w1_1.sequence().unwrap());
|
||||
assert_eq!(w2_1.sequence().unwrap(), w2_2.sequence().unwrap());
|
||||
|
||||
// Assert the batch sizes were captured in the metrics
|
||||
let histogram = ctx
|
||||
.metrics()
|
||||
.get_instrument::<Metric<U64Histogram>>("write_buffer_batch_coalesced_write_ops")
|
||||
.expect("failed to read metric")
|
||||
.get_observer(&[].into())
|
||||
.expect("failed to get observer")
|
||||
.fetch();
|
||||
assert_eq!(
|
||||
histogram.sample_count(),
|
||||
3,
|
||||
"metric did not record expected batch count"
|
||||
);
|
||||
// Validate the expected bucket values.
|
||||
[
|
||||
(1, 0),
|
||||
(2, 3),
|
||||
(4, 0),
|
||||
(8, 0),
|
||||
(16, 0),
|
||||
(32, 0),
|
||||
(64, 0),
|
||||
(128, 0),
|
||||
]
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.for_each(|(i, (le, count))| {
|
||||
let bucket = &histogram.buckets[i];
|
||||
assert_eq!(bucket.le, le);
|
||||
assert_eq!(bucket.count, count, "bucket le={}", le);
|
||||
});
|
||||
assert_ne!(w2_1.sequence().unwrap(), w2_2.sequence().unwrap());
|
||||
|
||||
let consumer = ctx.reading(true).await.unwrap();
|
||||
let mut handler = consumer.stream_handler(sequencer_id).await.unwrap();
|
||||
let mut stream = handler.stream().await;
|
||||
|
||||
// get output, note that the write operations were fused
|
||||
let op_w1_12 = stream.next().await.unwrap().unwrap();
|
||||
// get output, note that the write operations were NOT fused
|
||||
let op_w1_1 = stream.next().await.unwrap().unwrap();
|
||||
let op_w1_2 = stream.next().await.unwrap().unwrap();
|
||||
let op_w2_1 = stream.next().await.unwrap().unwrap();
|
||||
let op_d1_1 = stream.next().await.unwrap().unwrap();
|
||||
let op_d1_2 = stream.next().await.unwrap().unwrap();
|
||||
let op_w1_34 = stream.next().await.unwrap().unwrap();
|
||||
let op_w2_12 = stream.next().await.unwrap().unwrap();
|
||||
let op_w1_3 = stream.next().await.unwrap().unwrap();
|
||||
let op_w1_4 = stream.next().await.unwrap().unwrap();
|
||||
let op_w2_2 = stream.next().await.unwrap().unwrap();
|
||||
|
||||
// ensure that sequence numbers map as expected
|
||||
assert_eq!(
|
||||
op_w1_12.meta().sequence().unwrap(),
|
||||
w1_1.sequence().unwrap(),
|
||||
);
|
||||
assert_eq!(
|
||||
op_w1_12.meta().sequence().unwrap(),
|
||||
w1_2.sequence().unwrap(),
|
||||
);
|
||||
assert_eq!(op_w1_1.meta().sequence().unwrap(), w1_1.sequence().unwrap(),);
|
||||
assert_eq!(op_w1_2.meta().sequence().unwrap(), w1_2.sequence().unwrap(),);
|
||||
assert_eq!(op_d1_1.meta().sequence().unwrap(), d1_1.sequence().unwrap(),);
|
||||
assert_eq!(op_d1_2.meta().sequence().unwrap(), d1_2.sequence().unwrap(),);
|
||||
assert_eq!(
|
||||
op_w1_34.meta().sequence().unwrap(),
|
||||
w1_3.sequence().unwrap(),
|
||||
);
|
||||
assert_eq!(
|
||||
op_w1_34.meta().sequence().unwrap(),
|
||||
w1_4.sequence().unwrap(),
|
||||
);
|
||||
assert_eq!(
|
||||
op_w2_12.meta().sequence().unwrap(),
|
||||
w2_1.sequence().unwrap(),
|
||||
);
|
||||
assert_eq!(
|
||||
op_w2_12.meta().sequence().unwrap(),
|
||||
w2_2.sequence().unwrap(),
|
||||
);
|
||||
assert_eq!(op_w1_3.meta().sequence().unwrap(), w1_3.sequence().unwrap(),);
|
||||
assert_eq!(op_w1_4.meta().sequence().unwrap(), w1_4.sequence().unwrap(),);
|
||||
assert_eq!(op_w2_1.meta().sequence().unwrap(), w2_1.sequence().unwrap(),);
|
||||
assert_eq!(op_w2_2.meta().sequence().unwrap(), w2_2.sequence().unwrap(),);
|
||||
|
||||
// check tracing span links
|
||||
assert_span_context_eq_or_linked(
|
||||
w1_1.span_context().unwrap(),
|
||||
op_w1_12.meta().span_context().unwrap(),
|
||||
op_w1_1.meta().span_context().unwrap(),
|
||||
trace_collector.spans(),
|
||||
);
|
||||
assert_span_context_eq_or_linked(
|
||||
w1_2.span_context().unwrap(),
|
||||
op_w1_12.meta().span_context().unwrap(),
|
||||
op_w1_2.meta().span_context().unwrap(),
|
||||
trace_collector.spans(),
|
||||
);
|
||||
assert_span_context_eq_or_linked(
|
||||
|
@ -820,60 +773,26 @@ mod tests {
|
|||
);
|
||||
assert_span_context_eq_or_linked(
|
||||
w1_3.span_context().unwrap(),
|
||||
op_w1_34.meta().span_context().unwrap(),
|
||||
op_w1_3.meta().span_context().unwrap(),
|
||||
trace_collector.spans(),
|
||||
);
|
||||
assert_span_context_eq_or_linked(
|
||||
w1_4.span_context().unwrap(),
|
||||
op_w1_34.meta().span_context().unwrap(),
|
||||
op_w1_4.meta().span_context().unwrap(),
|
||||
trace_collector.spans(),
|
||||
);
|
||||
assert_span_context_eq_or_linked(
|
||||
w2_1.span_context().unwrap(),
|
||||
op_w2_12.meta().span_context().unwrap(),
|
||||
op_w2_1.meta().span_context().unwrap(),
|
||||
trace_collector.spans(),
|
||||
);
|
||||
assert_span_context_eq_or_linked(
|
||||
w2_2.span_context().unwrap(),
|
||||
op_w2_12.meta().span_context().unwrap(),
|
||||
op_w2_2.meta().span_context().unwrap(),
|
||||
trace_collector.spans(),
|
||||
);
|
||||
}
|
||||
|
||||
// Coverage of https://github.com/influxdata/influxdb_iox/issues/4787
|
||||
#[tokio::test]
|
||||
async fn test_batching_respects_partitioning() {
|
||||
let conn = maybe_skip_kafka_integration!();
|
||||
let adapter = RSKafkaTestAdapter::new(conn);
|
||||
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
|
||||
let trace_collector = ctx.trace_collector();
|
||||
|
||||
let producer = ctx.writing(true).await.unwrap();
|
||||
|
||||
let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap();
|
||||
|
||||
let (w1, w2, w3, w4) = tokio::join!(
|
||||
// These two ops have the same partition key, and therefore can be
|
||||
// merged together.
|
||||
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
|
||||
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
|
||||
// However this op has a different partition_key and cannot be
|
||||
// merged with the others.
|
||||
write("ns1", &producer, &trace_collector, sequencer_id, "platanos"),
|
||||
// this operation can still go into the first write, no need to start yet another one
|
||||
write("ns1", &producer, &trace_collector, sequencer_id, "bananas"),
|
||||
);
|
||||
|
||||
// Assert ops 1+2+4 were merged, by asserting they have the same
|
||||
// sequence number.
|
||||
assert_eq!(w1.sequence().unwrap(), w2.sequence().unwrap());
|
||||
assert_eq!(w1.sequence().unwrap(), w4.sequence().unwrap());
|
||||
|
||||
// And assert the third op was not merged because of the differing
|
||||
// partition key.
|
||||
assert_ne!(w1.sequence().unwrap(), w3.sequence().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic = "enqueuing unpartitioned write into kafka"]
|
||||
async fn test_enqueue_no_partition_key() {
|
||||
|
|
Loading…
Reference in New Issue