refactor(router): use KafkaPartition in sequencer

The Sequencer (which will be renamed shortly) is a type that represents
a single sequencer/shard/kafka partition in the router.

In order to minimise confusion with all the various IDs floating around,
we have a KafkaPartition - this commit changes the Sequencer to return
the Kafka partition index as a typed value, rather than a usize to help
eliminate any inconsistencies.

As a side effect of these conversion changes, I've tightened up the
casting to ensure we assert on any overflows - we juggle a lot of
numeric types!
pull/24376/head
Dom Dwyer 2022-08-23 15:54:06 +02:00
parent 9bd2b9aa12
commit 9b920f1cbb
5 changed files with 81 additions and 42 deletions

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use clap_blocks::write_buffer::WriteBufferConfig;
use data_types::{DatabaseName, PartitionTemplate, TemplatePart};
use data_types::{DatabaseName, KafkaPartition, PartitionTemplate, TemplatePart};
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
@ -325,7 +325,13 @@ async fn init_write_buffer(
Ok(ShardedWriteBuffer::new(JumpHash::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics))
.map(|id| {
Sequencer::new(
KafkaPartition::new(id.try_into().unwrap()),
Arc::clone(&write_buffer),
&metrics,
)
})
.map(Arc::new),
)?))
}

View File

@ -1,5 +1,5 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use data_types::{PartitionTemplate, TemplatePart};
use data_types::{KafkaPartition, PartitionTemplate, TemplatePart};
use hyper::{Body, Request};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use router::{
@ -38,7 +38,13 @@ fn init_write_buffer(n_sequencers: u32) -> ShardedWriteBuffer<JumpHash<Arc<Seque
JumpHash::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &Default::default()))
.map(|id| {
Sequencer::new(
KafkaPartition::new(id as _),
Arc::clone(&write_buffer),
&Default::default(),
)
})
.map(Arc::new),
)
.expect("failed to init sharder"),

View File

@ -125,7 +125,7 @@ where
trace!(
%partition_key,
kafka_partition=%sequencer.id(),
kafka_partition=%sequencer.kafka_index(),
tables=%dml.table_count(),
%namespace,
approx_size=%dml.size(),
@ -157,7 +157,7 @@ where
);
let iter = sequencers.into_iter().map(|s| {
trace!(sequencer_id=%s.id(), %table_name, %namespace, "routing delete to shard");
trace!(sequencer_id=%s.kafka_index(), %table_name, %namespace, "routing delete to shard");
(s, DmlOperation::from(dml.clone()))
});
@ -211,7 +211,7 @@ mod tests {
use super::*;
use crate::dml_handlers::DmlHandler;
use assert_matches::assert_matches;
use data_types::TimestampRange;
use data_types::{KafkaPartition, TimestampRange};
use sharder::mock::{MockSharder, MockSharderCall, MockSharderPayload};
use std::sync::Arc;
use write_buffer::mock::{MockBufferForWriting, MockBufferSharedState};
@ -253,7 +253,7 @@ mod tests {
// Configure the sharder to return shards containing the mock write
// buffer.
let shard = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer),
&Default::default(),
));
@ -279,7 +279,7 @@ mod tests {
// All writes were dispatched to the same shard, which should observe
// one op containing all writes lines (asserting that all the writes for
// one shard are collated into one op).
let mut got = write_buffer_state.get_messages(shard.id() as _);
let mut got = write_buffer_state.get_messages(shard.kafka_index().get() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
@ -305,7 +305,7 @@ mod tests {
let write_buffer1 = init_write_buffer(1);
let write_buffer1_state = write_buffer1.state();
let shard1 = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer1),
&Default::default(),
));
@ -315,7 +315,7 @@ mod tests {
let write_buffer2 = init_write_buffer(2);
let write_buffer2_state = write_buffer2.state();
let shard2 = Arc::new(Sequencer::new(
1,
KafkaPartition::new(1),
Arc::new(write_buffer2),
&Default::default(),
));
@ -352,7 +352,7 @@ mod tests {
.any(|v| v.table_name == "table" && v.payload.mutable_batch().rows() == 1));
// The write buffer for shard 1 should observe 1 write containing 3 rows.
let mut got = write_buffer1_state.get_messages(shard1.id() as _);
let mut got = write_buffer1_state.get_messages(shard1.kafka_index().get() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
@ -363,7 +363,7 @@ mod tests {
});
// The second shard should observe 1 write containing 1 row.
let mut got = write_buffer2_state.get_messages(shard2.id() as _);
let mut got = write_buffer2_state.get_messages(shard2.kafka_index().get() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
@ -387,7 +387,7 @@ mod tests {
let write_buffer1 = init_write_buffer(1);
let write_buffer1_state = write_buffer1.state();
let shard1 = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer1),
&Default::default(),
));
@ -396,7 +396,7 @@ mod tests {
let write_buffer2 = init_write_buffer(1);
// Non-existant sequencer ID to trigger an error.
let shard2 = Arc::new(Sequencer::new(
13,
KafkaPartition::new(13),
Arc::new(write_buffer2),
&Default::default(),
));
@ -420,7 +420,7 @@ mod tests {
// The write buffer for shard 1 should observe 1 write independent of
// the second, erroring shard.
let got = write_buffer1_state.get_messages(shard1.id() as _);
let got = write_buffer1_state.get_messages(shard1.kafka_index().get() as _);
assert_eq!(got.len(), 1);
}
@ -439,7 +439,7 @@ mod tests {
// Configure the sharder to return shards containing the mock write
// buffer.
let shard = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer),
&Default::default(),
));
@ -462,7 +462,7 @@ mod tests {
// All writes were dispatched to the same shard, which should observe
// one op containing all writes lines (asserting that all the writes for
// one shard are collated into one op).
let mut got = write_buffer_state.get_messages(shard.id() as _);
let mut got = write_buffer_state.get_messages(shard.kafka_index().get() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
@ -487,7 +487,7 @@ mod tests {
// Configure the sharder to return shards containing the mock write
// buffer.
let shard = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer),
&Default::default(),
));
@ -513,7 +513,7 @@ mod tests {
// one shard are collated into one op).
//
// The table name should be None as it was specified as an empty string.
let mut got = write_buffer_state.get_messages(shard.id() as _);
let mut got = write_buffer_state.get_messages(shard.kafka_index().get() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
@ -567,7 +567,7 @@ mod tests {
let write_buffer1 = init_write_buffer(1);
let write_buffer1_state = write_buffer1.state();
let shard1 = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer1),
&Default::default(),
));
@ -576,7 +576,7 @@ mod tests {
let write_buffer2 = init_write_buffer(1);
let write_buffer2_state = write_buffer2.state();
let shard2 = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer2),
&Default::default(),
));
@ -592,7 +592,7 @@ mod tests {
.expect("delete failed");
// The write buffer for shard 1 should observe the delete
let mut got = write_buffer1_state.get_messages(shard1.id() as _);
let mut got = write_buffer1_state.get_messages(shard1.kafka_index().get() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
@ -601,7 +601,7 @@ mod tests {
assert_matches!(got, DmlOperation::Delete(_));
// The second shard should observe the delete as well
let mut got = write_buffer2_state.get_messages(shard2.id() as _);
let mut got = write_buffer2_state.get_messages(shard2.kafka_index().get() as _);
assert_eq!(got.len(), 1);
let got = got
.pop()
@ -623,7 +623,7 @@ mod tests {
let write_buffer1 = init_write_buffer(1);
let write_buffer1_state = write_buffer1.state();
let shard1 = Arc::new(Sequencer::new(
0,
KafkaPartition::new(0),
Arc::new(write_buffer1),
&Default::default(),
));
@ -632,7 +632,7 @@ mod tests {
let write_buffer2 = init_write_buffer(1);
// Non-existant sequencer ID to trigger an error.
let shard2 = Arc::new(Sequencer::new(
13,
KafkaPartition::new(13),
Arc::new(write_buffer2),
&Default::default(),
));
@ -653,7 +653,7 @@ mod tests {
});
// The write buffer for shard 1 will still observer the delete.
let got = write_buffer1_state.get_messages(shard1.id() as _);
let got = write_buffer1_state.get_messages(shard1.kafka_index().get() as _);
assert_eq!(got.len(), 1);
}
}

View File

@ -1,15 +1,21 @@
//! A representation of a single operation sequencer.
use data_types::KafkaPartition;
use dml::{DmlMeta, DmlOperation};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use std::{borrow::Cow, hash::Hash, sync::Arc};
use write_buffer::core::{WriteBufferError, WriteBufferWriting};
/// A sequencer tags an write buffer with a sequencer ID.
/// A sequencer tags an write buffer with a Kafka partition index.
#[derive(Debug)]
pub struct Sequencer<P = SystemProvider> {
id: usize,
/// The 0..N index / identifier for the Kakfa partition.
///
/// NOTE: this is NOT the ID of the Sequencer row in the catalog this
/// Sequencer represents.
kafka_index: KafkaPartition,
inner: Arc<dyn WriteBufferWriting>,
time_provider: P,
@ -21,35 +27,44 @@ impl Eq for Sequencer {}
impl PartialEq for Sequencer {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
self.kafka_index == other.kafka_index
}
}
impl Hash for Sequencer {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
self.kafka_index.hash(state);
}
}
impl Sequencer {
/// Tag `inner` with the specified `id`.
pub fn new(id: usize, inner: Arc<dyn WriteBufferWriting>, metrics: &metric::Registry) -> Self {
/// Tag `inner` with the specified `kafka_index`.
pub fn new(
kafka_index: KafkaPartition,
inner: Arc<dyn WriteBufferWriting>,
metrics: &metric::Registry,
) -> Self {
let write: Metric<DurationHistogram> = metrics.register_metric(
"sequencer_enqueue_duration",
"sequencer enqueue call duration",
);
let enqueue_success = write.recorder([
("kafka_partition", Cow::from(id.to_string())),
("kafka_partition", Cow::from(kafka_index.to_string())),
("result", Cow::from("success")),
]);
let enqueue_error = write.recorder([
("kafka_partition", Cow::from(id.to_string())),
("kafka_partition", Cow::from(kafka_index.to_string())),
("result", Cow::from("error")),
]);
// Validate the conversion between the kafka partition type's inner
// numerical type (an i32) can be cast correctly to the write buffer
// abstractions numerical type used to identify the partition (a u32).
assert!(u32::try_from(kafka_index.get()).is_ok());
Self {
id,
kafka_index,
inner,
enqueue_success,
enqueue_error,
@ -57,9 +72,12 @@ impl Sequencer {
}
}
/// Return the ID of this sequencer.
pub fn id(&self) -> usize {
self.id
/// Return the 0..N index / identifier for the Kafka partition.
///
/// NOTE: this is NOT the ID of the Sequencer row in the catalog this
/// Sequencer represents.
pub fn kafka_index(&self) -> KafkaPartition {
self.kafka_index
}
/// Enqueue `op` into this sequencer.
@ -70,7 +88,10 @@ impl Sequencer {
pub async fn enqueue<'a>(&self, op: DmlOperation) -> Result<DmlMeta, WriteBufferError> {
let t = self.time_provider.now();
let res = self.inner.store_operation(self.id as u32, op).await;
let res = self
.inner
.store_operation(self.kafka_index.get() as _, op)
.await;
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {

View File

@ -1,5 +1,5 @@
use assert_matches::assert_matches;
use data_types::{KafkaTopicId, PartitionTemplate, QueryPoolId, TemplatePart};
use data_types::{KafkaPartition, KafkaTopicId, PartitionTemplate, QueryPoolId, TemplatePart};
use dml::DmlOperation;
use hashbrown::HashMap;
use hyper::{Body, Request, StatusCode};
@ -86,7 +86,13 @@ impl TestContext {
JumpHash::new(
shards
.into_iter()
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics))
.map(|id| {
Sequencer::new(
KafkaPartition::new(id as _),
Arc::clone(&write_buffer),
&metrics,
)
})
.map(Arc::new),
)
.unwrap(),