refactor: use one stream per sequencer/partition
Advantages are: - for large DBs w/ many partitions we can ingest data in-parallel - on top of this change we can implement per-sequencer seeking, which is required for replaypull/24376/head
parent
ee6a14b7dc
commit
592424c896
|
@ -640,14 +640,15 @@ impl Db {
|
|||
// streaming from the write buffer loop
|
||||
async {
|
||||
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
|
||||
let wb = Arc::clone(write_buffer);
|
||||
while !shutdown.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = {
|
||||
self.stream_in_sequenced_entries(wb.stream())
|
||||
} => {},
|
||||
_ = shutdown.cancelled() => break,
|
||||
}
|
||||
let mut futures = vec![];
|
||||
for (_sequencer_id, stream) in write_buffer.streams() {
|
||||
let fut = self.stream_in_sequenced_entries(stream);
|
||||
futures.push(fut);
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_ = futures::future::join_all(futures) => {},
|
||||
_ = shutdown.cancelled() => {},
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
@ -530,11 +530,11 @@ impl InitStatus {
|
|||
let rules = handle
|
||||
.rules()
|
||||
.expect("in this state rules should be loaded");
|
||||
let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules).context(
|
||||
CreateWriteBuffer {
|
||||
let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules)
|
||||
.await
|
||||
.context(CreateWriteBuffer {
|
||||
config: rules.write_buffer_connection.clone(),
|
||||
},
|
||||
)?;
|
||||
})?;
|
||||
info!(write_buffer_enabled=?write_buffer.is_some(), db_name=rules.db_name(), "write buffer config");
|
||||
|
||||
handle
|
||||
|
|
|
@ -555,8 +555,9 @@ where
|
|||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CannotCreatePreservedCatalog)?;
|
||||
|
||||
let write_buffer =
|
||||
WriteBufferConfig::new(server_id, &rules).map_err(|e| Error::CreatingWriteBuffer {
|
||||
let write_buffer = WriteBufferConfig::new(server_id, &rules)
|
||||
.await
|
||||
.map_err(|e| Error::CreatingWriteBuffer {
|
||||
config: rules.write_buffer_connection.clone(),
|
||||
source: e,
|
||||
})?;
|
||||
|
|
|
@ -99,23 +99,23 @@ async fn reads_come_from_kafka() {
|
|||
let db_name = rand_name();
|
||||
let write_buffer_connection = WriteBufferConnection::Reading(kafka_connection.to_string());
|
||||
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.write_buffer(write_buffer_connection)
|
||||
.build(server.grpc_channel())
|
||||
.await;
|
||||
|
||||
// Common Kafka config
|
||||
let mut cfg = ClientConfig::new();
|
||||
cfg.set("bootstrap.servers", kafka_connection);
|
||||
cfg.set("message.timeout.ms", "5000");
|
||||
|
||||
// Create a partition with 2 topics in Kafka
|
||||
// Create a partition with 2 topics in Kafka BEFORE creating the DB
|
||||
let num_partitions = 2;
|
||||
let admin: AdminClient<DefaultClientContext> = cfg.clone().create().unwrap();
|
||||
let topic = NewTopic::new(&db_name, num_partitions, TopicReplication::Fixed(1));
|
||||
let opts = AdminOptions::default();
|
||||
admin.create_topics(&[topic], &opts).await.unwrap();
|
||||
|
||||
DatabaseBuilder::new(db_name.clone())
|
||||
.write_buffer(write_buffer_connection)
|
||||
.build(server.grpc_channel())
|
||||
.await;
|
||||
|
||||
// put some points in Kafka
|
||||
let producer: FutureProducer = cfg.create().unwrap();
|
||||
|
||||
|
@ -150,10 +150,18 @@ async fn reads_come_from_kafka() {
|
|||
|
||||
if let Ok(mut results) = query_results {
|
||||
let mut batches = Vec::new();
|
||||
let mut num_rows = 0;
|
||||
while let Some(data) = results.next().await.unwrap() {
|
||||
num_rows += data.num_rows();
|
||||
batches.push(data);
|
||||
}
|
||||
|
||||
// Since data is streamed using two partitions, only a subset of the data might be present. If that's
|
||||
// the case, ignore that record batch and try again.
|
||||
if num_rows < 4 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let expected = vec![
|
||||
"+--------+-------------------------------+------+",
|
||||
"| region | time | user |",
|
||||
|
|
|
@ -8,11 +8,11 @@ async-trait = "0.1"
|
|||
data_types = { path = "../data_types" }
|
||||
entry = { path = "../entry" }
|
||||
futures = "0.3"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.11.1"
|
||||
rdkafka = "0.26.0"
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
tokio = { version = "1.0", features = ["macros", "fs"] }
|
||||
|
||||
[dev-dependencies]
|
||||
dotenv = "0.15.0"
|
||||
tokio = { version = "1.0", features = ["macros", "fs"] }
|
||||
uuid = { version = "0.8", features = ["serde", "v4"] }
|
||||
|
|
|
@ -17,7 +17,7 @@ pub enum WriteBufferConfig {
|
|||
}
|
||||
|
||||
impl WriteBufferConfig {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
server_id: ServerId,
|
||||
rules: &DatabaseRules,
|
||||
) -> Result<Option<Self>, WriteBufferError> {
|
||||
|
@ -34,7 +34,7 @@ impl WriteBufferConfig {
|
|||
Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _)))
|
||||
}
|
||||
Some(WriteBufferConnection::Reading(conn)) => {
|
||||
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?;
|
||||
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name).await?;
|
||||
|
||||
Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _)))
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use futures::stream::BoxStream;
|
|||
/// The dynamic boxing makes it easier to deal with error from different implementations.
|
||||
pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>;
|
||||
|
||||
/// Writing to a Write Buffer takes an `Entry` and returns `Sequence` data that facilitates reading
|
||||
/// Writing to a Write Buffer takes an [`Entry`] and returns [`Sequence`] data that facilitates reading
|
||||
/// entries from the Write Buffer at a later time.
|
||||
#[async_trait]
|
||||
pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
|
||||
|
@ -21,12 +21,13 @@ pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
|
|||
) -> Result<Sequence, WriteBufferError>;
|
||||
}
|
||||
|
||||
/// Produce a stream of `SequencedEntry` that a `Db` can add to the mutable buffer by using
|
||||
/// `Db::stream_in_sequenced_entries`.
|
||||
/// Output stream of [`WriteBufferReading`].
|
||||
pub type EntryStream<'a> = BoxStream<'a, Result<SequencedEntry, WriteBufferError>>;
|
||||
|
||||
/// Produce streams (one per sequencer) of [`SequencedEntry`]s.
|
||||
pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static {
|
||||
fn stream<'life0, 'async_trait>(
|
||||
&'life0 self,
|
||||
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
|
||||
/// Returns a stream per sequencer.
|
||||
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
|
||||
where
|
||||
'life0: 'async_trait,
|
||||
Self: 'async_trait;
|
||||
|
@ -46,13 +47,14 @@ pub mod test_utils {
|
|||
async fn new_context(&self, n_sequencers: u32) -> Self::Context;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait TestContext: Send + Sync {
|
||||
type Writing: WriteBufferWriting;
|
||||
type Reading: WriteBufferReading;
|
||||
|
||||
fn writing(&self) -> Self::Writing;
|
||||
|
||||
fn reading(&self) -> Self::Reading;
|
||||
async fn reading(&self) -> Self::Reading;
|
||||
}
|
||||
|
||||
pub async fn perform_generic_tests<T>(adapter: T)
|
||||
|
@ -61,6 +63,7 @@ pub mod test_utils {
|
|||
{
|
||||
test_single_stream_io(&adapter).await;
|
||||
test_multi_stream_io(&adapter).await;
|
||||
test_multi_sequencer_io(&adapter).await;
|
||||
test_multi_writer_multi_reader(&adapter).await;
|
||||
}
|
||||
|
||||
|
@ -75,9 +78,12 @@ pub mod test_utils {
|
|||
let entry_3 = lp_to_entry("upc user=3 300");
|
||||
|
||||
let writer = context.writing();
|
||||
let reader = context.reading();
|
||||
let reader = context.reading().await;
|
||||
|
||||
let mut streams = reader.streams();
|
||||
assert_eq!(streams.len(), 1);
|
||||
let (sequencer_id, mut stream) = streams.pop().unwrap();
|
||||
|
||||
let mut stream = reader.stream();
|
||||
let waker = futures::task::noop_waker();
|
||||
let mut cx = futures::task::Context::from_waker(&waker);
|
||||
|
||||
|
@ -85,15 +91,15 @@ pub mod test_utils {
|
|||
assert!(stream.poll_next_unpin(&mut cx).is_pending());
|
||||
|
||||
// adding content allows us to get results
|
||||
writer.store_entry(&entry_1, 0).await.unwrap();
|
||||
writer.store_entry(&entry_1, sequencer_id).await.unwrap();
|
||||
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_1);
|
||||
|
||||
// stream is pending again
|
||||
assert!(stream.poll_next_unpin(&mut cx).is_pending());
|
||||
|
||||
// adding more data unblocks the stream
|
||||
writer.store_entry(&entry_2, 0).await.unwrap();
|
||||
writer.store_entry(&entry_3, 0).await.unwrap();
|
||||
writer.store_entry(&entry_2, sequencer_id).await.unwrap();
|
||||
writer.store_entry(&entry_3, sequencer_id).await.unwrap();
|
||||
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_2);
|
||||
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_3);
|
||||
|
||||
|
@ -101,6 +107,50 @@ pub mod test_utils {
|
|||
assert!(stream.poll_next_unpin(&mut cx).is_pending());
|
||||
}
|
||||
|
||||
async fn test_multi_sequencer_io<T>(adapter: &T)
|
||||
where
|
||||
T: TestAdapter,
|
||||
{
|
||||
let context = adapter.new_context(2).await;
|
||||
|
||||
let entry_1 = lp_to_entry("upc user=1 100");
|
||||
let entry_2 = lp_to_entry("upc user=2 200");
|
||||
let entry_3 = lp_to_entry("upc user=3 300");
|
||||
|
||||
let writer = context.writing();
|
||||
let reader = context.reading().await;
|
||||
|
||||
let mut streams = reader.streams();
|
||||
assert_eq!(streams.len(), 2);
|
||||
let (sequencer_id_1, mut stream_1) = streams.pop().unwrap();
|
||||
let (sequencer_id_2, mut stream_2) = streams.pop().unwrap();
|
||||
assert_ne!(sequencer_id_1, sequencer_id_2);
|
||||
|
||||
let waker = futures::task::noop_waker();
|
||||
let mut cx = futures::task::Context::from_waker(&waker);
|
||||
|
||||
// empty streams are pending
|
||||
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
|
||||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
|
||||
// entries arrive at the right target stream
|
||||
writer.store_entry(&entry_1, sequencer_id_1).await.unwrap();
|
||||
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1);
|
||||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
|
||||
writer.store_entry(&entry_2, sequencer_id_2).await.unwrap();
|
||||
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
|
||||
assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2);
|
||||
|
||||
writer.store_entry(&entry_3, sequencer_id_1).await.unwrap();
|
||||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3);
|
||||
|
||||
// streams are pending again
|
||||
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
|
||||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
}
|
||||
|
||||
async fn test_multi_stream_io<T>(adapter: &T)
|
||||
where
|
||||
T: TestAdapter,
|
||||
|
@ -112,10 +162,16 @@ pub mod test_utils {
|
|||
let entry_3 = lp_to_entry("upc user=3 300");
|
||||
|
||||
let writer = context.writing();
|
||||
let reader = context.reading();
|
||||
let reader = context.reading().await;
|
||||
|
||||
let mut streams_1 = reader.streams();
|
||||
let mut streams_2 = reader.streams();
|
||||
assert_eq!(streams_1.len(), 1);
|
||||
assert_eq!(streams_2.len(), 1);
|
||||
let (sequencer_id_1, mut stream_1) = streams_1.pop().unwrap();
|
||||
let (sequencer_id_2, mut stream_2) = streams_2.pop().unwrap();
|
||||
assert_eq!(sequencer_id_1, sequencer_id_2);
|
||||
|
||||
let mut stream_1 = reader.stream();
|
||||
let mut stream_2 = reader.stream();
|
||||
let waker = futures::task::noop_waker();
|
||||
let mut cx = futures::task::Context::from_waker(&waker);
|
||||
|
||||
|
@ -124,9 +180,9 @@ pub mod test_utils {
|
|||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
|
||||
// streams poll from same source
|
||||
writer.store_entry(&entry_1, 0).await.unwrap();
|
||||
writer.store_entry(&entry_2, 0).await.unwrap();
|
||||
writer.store_entry(&entry_3, 0).await.unwrap();
|
||||
writer.store_entry(&entry_1, sequencer_id_1).await.unwrap();
|
||||
writer.store_entry(&entry_2, sequencer_id_1).await.unwrap();
|
||||
writer.store_entry(&entry_3, sequencer_id_1).await.unwrap();
|
||||
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1);
|
||||
assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2);
|
||||
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3);
|
||||
|
@ -148,34 +204,51 @@ pub mod test_utils {
|
|||
|
||||
let writer_1 = context.writing();
|
||||
let writer_2 = context.writing();
|
||||
let reader_1 = context.reading();
|
||||
let reader_2 = context.reading();
|
||||
let reader_1 = context.reading().await;
|
||||
let reader_2 = context.reading().await;
|
||||
|
||||
// TODO: do not hard-code sequencer IDs here but provide a proper interface
|
||||
writer_1.store_entry(&entry_east_1, 0).await.unwrap();
|
||||
writer_1.store_entry(&entry_west_1, 1).await.unwrap();
|
||||
writer_2.store_entry(&entry_east_2, 0).await.unwrap();
|
||||
|
||||
assert_reader_content(reader_1, &[&entry_east_1, &entry_east_2, &entry_west_1]).await;
|
||||
assert_reader_content(reader_2, &[&entry_east_1, &entry_east_2, &entry_west_1]).await;
|
||||
assert_reader_content(
|
||||
reader_1,
|
||||
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[&entry_west_1])],
|
||||
)
|
||||
.await;
|
||||
assert_reader_content(
|
||||
reader_2,
|
||||
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[&entry_west_1])],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn assert_reader_content<R>(reader: R, expected: &[&Entry])
|
||||
async fn assert_reader_content<R>(reader: R, expected: &[(u32, &[&Entry])])
|
||||
where
|
||||
R: WriteBufferReading,
|
||||
{
|
||||
// we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever
|
||||
let mut results: Vec<_> = reader
|
||||
.stream()
|
||||
.take(expected.len())
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
results.sort_by_key(|entry| {
|
||||
let sequence = entry.sequence().unwrap();
|
||||
(sequence.id, sequence.number)
|
||||
});
|
||||
let actual: Vec<_> = results.iter().map(|entry| entry.entry()).collect();
|
||||
assert_eq!(&actual[..], expected);
|
||||
let mut streams = reader.streams();
|
||||
assert_eq!(streams.len(), expected.len());
|
||||
streams.sort_by_key(|(sequencer_id, _stream)| *sequencer_id);
|
||||
|
||||
for ((actual_sequencer_id, actual_stream), (expected_sequencer_id, expected_entries)) in
|
||||
streams.into_iter().zip(expected.iter())
|
||||
{
|
||||
assert_eq!(actual_sequencer_id, *expected_sequencer_id);
|
||||
|
||||
// we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever
|
||||
let mut results: Vec<_> = actual_stream
|
||||
.take(expected_entries.len())
|
||||
.try_collect()
|
||||
.await
|
||||
.unwrap();
|
||||
results.sort_by_key(|entry| {
|
||||
let sequence = entry.sequence().unwrap();
|
||||
(sequence.id, sequence.number)
|
||||
});
|
||||
let actual_entries: Vec<_> = results.iter().map(|entry| entry.entry()).collect();
|
||||
assert_eq!(&&actual_entries[..], expected_entries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,19 +1,22 @@
|
|||
use std::convert::{TryFrom, TryInto};
|
||||
use std::{
|
||||
convert::{TryFrom, TryInto},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::server_id::ServerId;
|
||||
use entry::{Entry, Sequence, SequencedEntry};
|
||||
use futures::{stream::BoxStream, StreamExt};
|
||||
use observability_deps::tracing::debug;
|
||||
use futures::StreamExt;
|
||||
use observability_deps::tracing::{debug, info};
|
||||
use rdkafka::{
|
||||
consumer::{Consumer, StreamConsumer},
|
||||
consumer::{BaseConsumer, Consumer, StreamConsumer},
|
||||
error::KafkaError,
|
||||
producer::{FutureProducer, FutureRecord},
|
||||
util::Timeout,
|
||||
ClientConfig, Message,
|
||||
ClientConfig, Message, TopicPartitionList,
|
||||
};
|
||||
|
||||
use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
||||
use crate::core::{EntryStream, WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
||||
|
||||
pub struct KafkaBufferProducer {
|
||||
conn: String,
|
||||
|
@ -91,7 +94,7 @@ impl KafkaBufferProducer {
|
|||
pub struct KafkaBufferConsumer {
|
||||
conn: String,
|
||||
database_name: String,
|
||||
consumer: StreamConsumer,
|
||||
consumers: Vec<(u32, StreamConsumer)>,
|
||||
}
|
||||
|
||||
// Needed because rdkafka's StreamConsumer doesn't impl Debug
|
||||
|
@ -105,34 +108,38 @@ impl std::fmt::Debug for KafkaBufferConsumer {
|
|||
}
|
||||
|
||||
impl WriteBufferReading for KafkaBufferConsumer {
|
||||
fn stream<'life0, 'async_trait>(
|
||||
&'life0 self,
|
||||
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
|
||||
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
|
||||
where
|
||||
'life0: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
self.consumer
|
||||
.stream()
|
||||
.map(|message| {
|
||||
let message = message?;
|
||||
let entry = Entry::try_from(message.payload().unwrap().to_vec())?;
|
||||
let sequence = Sequence {
|
||||
id: message.partition().try_into()?,
|
||||
number: message.offset().try_into()?,
|
||||
};
|
||||
self.consumers
|
||||
.iter()
|
||||
.map(|(sequencer_id, consumer)| {
|
||||
let stream = consumer
|
||||
.stream()
|
||||
.map(|message| {
|
||||
let message = message?;
|
||||
let entry = Entry::try_from(message.payload().unwrap().to_vec())?;
|
||||
let sequence = Sequence {
|
||||
id: message.partition().try_into()?,
|
||||
number: message.offset().try_into()?,
|
||||
};
|
||||
|
||||
Ok(SequencedEntry::new_from_sequence(sequence, entry)?)
|
||||
Ok(SequencedEntry::new_from_sequence(sequence, entry)?)
|
||||
})
|
||||
.boxed();
|
||||
(*sequencer_id, stream)
|
||||
})
|
||||
.boxed()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl KafkaBufferConsumer {
|
||||
pub fn new(
|
||||
conn: impl Into<String>,
|
||||
pub async fn new(
|
||||
conn: impl Into<String> + Send + Sync,
|
||||
server_id: ServerId,
|
||||
database_name: impl Into<String>,
|
||||
database_name: impl Into<String> + Send + Sync,
|
||||
) -> Result<Self, KafkaError> {
|
||||
let conn = conn.into();
|
||||
let database_name = database_name.into();
|
||||
|
@ -149,17 +156,55 @@ impl KafkaBufferConsumer {
|
|||
// When subscribing without a partition offset, start from the smallest offset available.
|
||||
cfg.set("auto.offset.reset", "smallest");
|
||||
|
||||
let consumer: StreamConsumer = cfg.create()?;
|
||||
// figure out which partitions exists
|
||||
let partitions = Self::get_partitions(&database_name, &cfg).await?;
|
||||
info!(%database_name, ?partitions, "found Kafka partitions");
|
||||
|
||||
// Subscribe to all partitions of this database's topic.
|
||||
consumer.subscribe(&[&database_name]).unwrap();
|
||||
// setup a single consumer per partition, at least until https://github.com/fede1024/rust-rdkafka/pull/351 is
|
||||
// merged
|
||||
let consumers = partitions
|
||||
.into_iter()
|
||||
.map(|partition| {
|
||||
let consumer: StreamConsumer = cfg.create()?;
|
||||
|
||||
let mut assignment = TopicPartitionList::new();
|
||||
assignment.add_partition(&database_name, partition as i32);
|
||||
consumer.assign(&assignment)?;
|
||||
|
||||
Ok((partition, consumer))
|
||||
})
|
||||
.collect::<Result<Vec<(u32, StreamConsumer)>, KafkaError>>()?;
|
||||
|
||||
Ok(Self {
|
||||
conn,
|
||||
database_name,
|
||||
consumer,
|
||||
consumers,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_partitions(
|
||||
database_name: &str,
|
||||
cfg: &ClientConfig,
|
||||
) -> Result<Vec<u32>, KafkaError> {
|
||||
let database_name = database_name.to_string();
|
||||
let probe_consumer: BaseConsumer = cfg.create()?;
|
||||
|
||||
let metadata = tokio::task::spawn_blocking(move || {
|
||||
probe_consumer.fetch_metadata(Some(&database_name), Duration::from_secs(60))
|
||||
})
|
||||
.await
|
||||
.expect("subtask failed")?;
|
||||
let topic_metadata = metadata.topics().get(0).expect("requested a single topic");
|
||||
|
||||
let mut partitions: Vec<_> = topic_metadata
|
||||
.partitions()
|
||||
.iter()
|
||||
.map(|partition_metdata| partition_metdata.id().try_into().unwrap())
|
||||
.collect();
|
||||
partitions.sort_unstable();
|
||||
|
||||
Ok(partitions)
|
||||
}
|
||||
}
|
||||
|
||||
pub mod test_utils {
|
||||
|
@ -273,6 +318,7 @@ mod tests {
|
|||
server_id_counter: AtomicU32,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TestContext for KafkaTestContext {
|
||||
type Writing = KafkaBufferProducer;
|
||||
|
||||
|
@ -282,10 +328,12 @@ mod tests {
|
|||
KafkaBufferProducer::new(&self.conn, &self.database_name).unwrap()
|
||||
}
|
||||
|
||||
fn reading(&self) -> Self::Reading {
|
||||
async fn reading(&self) -> Self::Reading {
|
||||
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
|
||||
let server_id = ServerId::try_from(server_id).unwrap();
|
||||
KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name).unwrap()
|
||||
KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,13 +2,10 @@ use std::{collections::BTreeMap, sync::Arc, task::Poll};
|
|||
|
||||
use async_trait::async_trait;
|
||||
use entry::{Entry, Sequence, SequencedEntry};
|
||||
use futures::{
|
||||
stream::{self, BoxStream},
|
||||
StreamExt,
|
||||
};
|
||||
use futures::{stream, StreamExt};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::core::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
||||
use crate::core::{EntryStream, WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
||||
|
||||
type EntryResVec = Vec<Result<SequencedEntry, WriteBufferError>>;
|
||||
|
||||
|
@ -182,22 +179,28 @@ impl std::fmt::Debug for MockBufferForReading {
|
|||
}
|
||||
|
||||
impl WriteBufferReading for MockBufferForReading {
|
||||
fn stream<'life0, 'async_trait>(
|
||||
&'life0 self,
|
||||
) -> BoxStream<'async_trait, Result<SequencedEntry, WriteBufferError>>
|
||||
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
|
||||
where
|
||||
'life0: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
let state = self.state.clone();
|
||||
let positions = Arc::clone(&self.positions);
|
||||
let sequencer_ids: Vec<_> = {
|
||||
let positions = self.positions.lock();
|
||||
positions.keys().copied().collect()
|
||||
};
|
||||
|
||||
stream::poll_fn(move |_ctx| {
|
||||
let entries = state.entries.lock();
|
||||
let mut positions = positions.lock();
|
||||
let mut streams = vec![];
|
||||
for sequencer_id in sequencer_ids {
|
||||
let state = self.state.clone();
|
||||
let positions = Arc::clone(&self.positions);
|
||||
|
||||
let stream = stream::poll_fn(move |_ctx| {
|
||||
let entries = state.entries.lock();
|
||||
let mut positions = positions.lock();
|
||||
|
||||
let entry_vec = entries.get(&sequencer_id).unwrap();
|
||||
let position = positions.get_mut(&sequencer_id).unwrap();
|
||||
|
||||
for (sequencer_id, position) in positions.iter_mut() {
|
||||
let entry_vec = entries.get(sequencer_id).unwrap();
|
||||
if entry_vec.len() > *position {
|
||||
let entry = match &entry_vec[*position] {
|
||||
Ok(entry) => Ok(entry.clone()),
|
||||
|
@ -206,11 +209,14 @@ impl WriteBufferReading for MockBufferForReading {
|
|||
*position += 1;
|
||||
return Poll::Ready(Some(entry));
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
})
|
||||
.boxed()
|
||||
Poll::Pending
|
||||
})
|
||||
.boxed();
|
||||
streams.push((sequencer_id, stream));
|
||||
}
|
||||
|
||||
streams
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -239,6 +245,7 @@ mod tests {
|
|||
state: MockBufferSharedState,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TestContext for MockTestContext {
|
||||
type Writing = MockBufferForWriting;
|
||||
|
||||
|
@ -248,7 +255,7 @@ mod tests {
|
|||
MockBufferForWriting::new(self.state.clone())
|
||||
}
|
||||
|
||||
fn reading(&self) -> Self::Reading {
|
||||
async fn reading(&self) -> Self::Reading {
|
||||
MockBufferForReading::new(self.state.clone())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue