feat: add ability to fetch watermarks from write buffer

pull/24376/head
Marco Neumann 2021-07-21 10:34:08 +02:00
parent 58108b79ec
commit 5df88c70aa
3 changed files with 264 additions and 44 deletions

View File

@ -1,6 +1,8 @@
use std::fmt::Debug;
use async_trait::async_trait;
use entry::{Entry, Sequence, SequencedEntry};
use futures::stream::BoxStream;
use futures::{future::BoxFuture, stream::BoxStream};
/// Generic boxed error type that is used in this crate.
///
@ -10,7 +12,7 @@ pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>;
/// Writing to a Write Buffer takes an [`Entry`] and returns [`Sequence`] data that facilitates reading
/// entries from the Write Buffer at a later time.
#[async_trait]
pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
/// Send an `Entry` to the write buffer using the specified sequencer ID.
///
/// Returns information that can be used to restore entries at a later time.
@ -21,17 +23,42 @@ pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
) -> Result<Sequence, WriteBufferError>;
}
pub type FetchHighWatermarkFut<'a> = BoxFuture<'a, Result<u64, WriteBufferError>>;
pub type FetchHighWatermark<'a> = Box<dyn (Fn() -> FetchHighWatermarkFut<'a>) + Send + Sync>;
/// Output stream of [`WriteBufferReading`].
pub type EntryStream<'a> = BoxStream<'a, Result<SequencedEntry, WriteBufferError>>;
pub struct EntryStream<'a> {
/// Stream that produces entries.
pub stream: BoxStream<'a, Result<SequencedEntry, WriteBufferError>>,
/// Get high watermark (= what we believe is the next sequence number to be added).
///
/// Can be used to calculate lag. Note that since the watermark is "next sequence ID number to be added", it starts
/// at 0 and after the entry with sequence number 0 is added to the buffer, it is 1.
pub fetch_high_watermark: FetchHighWatermark<'a>,
}
impl<'a> Debug for EntryStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EntryStream").finish_non_exhaustive()
}
}
/// Produce streams (one per sequencer) of [`SequencedEntry`]s.
#[async_trait]
pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static {
pub trait WriteBufferReading: Sync + Send + Debug + 'static {
/// Returns a stream per sequencer.
///
/// Note that due to the mutable borrow, it is not possible to have multiple streams from the same
/// [`WriteBufferReading`] instance at the same time. If all streams are dropped and requested again, the last
/// offsets of the old streams will be the start offsets for the new streams. If you want to prevent that either
/// create a new [`WriteBufferReading`] or use [`seek`](Self::seek).
fn streams(&mut self) -> Vec<(u32, EntryStream<'_>)>;
/// Seek given sequencer to given sequence number. The next output of related streams will be an entry with at least
/// the given sequence number (the actual sequence number might be skipped due to "holes" in the stream).
///
/// Note that due to the mutable borrow, it is not possible to seek while streams exists.
async fn seek(
&mut self,
sequencer_id: u32,
@ -68,9 +95,11 @@ pub mod test_utils {
T: TestAdapter,
{
test_single_stream_io(&adapter).await;
test_multi_stream_io(&adapter).await;
test_multi_sequencer_io(&adapter).await;
test_multi_writer_multi_reader(&adapter).await;
test_seek(&adapter).await;
test_watermark(&adapter).await;
}
async fn test_single_stream_io<T>(adapter: &T)
@ -94,23 +123,90 @@ pub mod test_utils {
let mut cx = futures::task::Context::from_waker(&waker);
// empty stream is pending
assert!(stream.poll_next_unpin(&mut cx).is_pending());
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
// adding content allows us to get results
writer.store_entry(&entry_1, sequencer_id).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_1);
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_1
);
// stream is pending again
assert!(stream.poll_next_unpin(&mut cx).is_pending());
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
// adding more data unblocks the stream
writer.store_entry(&entry_2, sequencer_id).await.unwrap();
writer.store_entry(&entry_3, sequencer_id).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_2);
assert_eq!(stream.next().await.unwrap().unwrap().entry(), &entry_3);
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_2
);
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_3
);
// stream is pending again
assert!(stream.poll_next_unpin(&mut cx).is_pending());
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
}
async fn test_multi_stream_io<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(1).await;
let entry_1 = lp_to_entry("upc user=1 100");
let entry_2 = lp_to_entry("upc user=2 200");
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let mut reader = context.reading().await;
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
writer.store_entry(&entry_1, 0).await.unwrap();
writer.store_entry(&entry_2, 0).await.unwrap();
writer.store_entry(&entry_3, 0).await.unwrap();
// creating stream, drop stream, re-create it => still starts at first entry
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, stream) = streams.pop().unwrap();
drop(stream);
drop(streams);
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = streams.pop().unwrap();
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_1
);
// re-creating stream after reading remembers offset
drop(stream);
drop(streams);
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = streams.pop().unwrap();
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_2
);
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_3
);
// re-creating stream after reading everything makes it pending
drop(stream);
drop(streams);
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = streams.pop().unwrap();
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
}
async fn test_multi_sequencer_io<T>(adapter: &T)
@ -136,25 +232,34 @@ pub mod test_utils {
let mut cx = futures::task::Context::from_waker(&waker);
// empty streams are pending
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
// entries arrive at the right target stream
writer.store_entry(&entry_1, sequencer_id_1).await.unwrap();
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1);
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
assert_eq!(
stream_1.stream.next().await.unwrap().unwrap().entry(),
&entry_1
);
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
writer.store_entry(&entry_2, sequencer_id_2).await.unwrap();
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2);
assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending());
assert_eq!(
stream_2.stream.next().await.unwrap().unwrap().entry(),
&entry_2
);
writer.store_entry(&entry_3, sequencer_id_1).await.unwrap();
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3);
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
assert_eq!(
stream_1.stream.next().await.unwrap().unwrap().entry(),
&entry_3
);
// streams are pending again
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
}
async fn test_multi_writer_multi_reader<T>(adapter: &T)
@ -239,8 +344,8 @@ pub mod test_utils {
assert_eq!(streams.len(), 2);
let (_sequencer_id, mut stream_1) = streams.pop().unwrap();
let (_sequencer_id, mut stream_2) = streams.pop().unwrap();
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending());
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
drop(stream_1);
drop(stream_2);
drop(streams);
@ -249,6 +354,47 @@ pub mod test_utils {
reader_1.seek(0, 42).await.unwrap();
}
async fn test_watermark<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(2).await;
let entry_east_1 = lp_to_entry("upc,region=east user=1 100");
let entry_east_2 = lp_to_entry("upc,region=east user=2 200");
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
let writer = context.writing();
let mut reader = context.reading().await;
let mut streams = reader.streams();
assert_eq!(streams.len(), 2);
let (sequencer_id_1, stream_1) = streams.pop().unwrap();
let (sequencer_id_2, stream_2) = streams.pop().unwrap();
// start at watermark 0
assert_eq!((stream_1.fetch_high_watermark)().await.unwrap(), 0);
assert_eq!((stream_2.fetch_high_watermark)().await.unwrap(), 0);
// high water mark moves
writer
.store_entry(&entry_east_1, sequencer_id_1)
.await
.unwrap();
let mark_1 = writer
.store_entry(&entry_east_2, sequencer_id_1)
.await
.unwrap()
.number;
let mark_2 = writer
.store_entry(&entry_west_1, sequencer_id_2)
.await
.unwrap()
.number;
assert_eq!((stream_1.fetch_high_watermark)().await.unwrap(), mark_1 + 1);
assert_eq!((stream_2.fetch_high_watermark)().await.unwrap(), mark_2 + 1);
}
async fn assert_reader_content<R>(reader: &mut R, expected: &[(u32, &[&Entry])])
where
R: WriteBufferReading,
@ -264,6 +410,7 @@ pub mod test_utils {
// we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever
let mut results: Vec<_> = actual_stream
.stream
.take(expected_entries.len())
.try_collect()
.await

View File

@ -8,7 +8,7 @@ use std::{
use async_trait::async_trait;
use data_types::server_id::ServerId;
use entry::{Entry, Sequence, SequencedEntry};
use futures::StreamExt;
use futures::{FutureExt, StreamExt};
use observability_deps::tracing::{debug, info};
use rdkafka::{
consumer::{BaseConsumer, Consumer, StreamConsumer},
@ -18,7 +18,10 @@ use rdkafka::{
ClientConfig, Message, Offset, TopicPartitionList,
};
use crate::core::{EntryStream, WriteBufferError, WriteBufferReading, WriteBufferWriting};
use crate::core::{
EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
WriteBufferWriting,
};
pub struct KafkaBufferProducer {
conn: String,
@ -112,25 +115,59 @@ impl std::fmt::Debug for KafkaBufferConsumer {
#[async_trait]
impl WriteBufferReading for KafkaBufferConsumer {
fn streams(&mut self) -> Vec<(u32, EntryStream<'_>)> {
self.consumers
.iter()
.map(|(sequencer_id, consumer)| {
let stream = consumer
.stream()
.map(move |message| {
let message = message?;
let entry = Entry::try_from(message.payload().unwrap().to_vec())?;
let sequence = Sequence {
id: message.partition().try_into()?,
number: message.offset().try_into()?,
};
let mut streams = vec![];
Ok(SequencedEntry::new_from_sequence(sequence, entry)?)
for (sequencer_id, consumer) in &self.consumers {
let sequencer_id = *sequencer_id;
let consumer_cloned = Arc::clone(consumer);
let database_name = self.database_name.clone();
let stream = consumer
.stream()
.map(move |message| {
let message = message?;
let entry = Entry::try_from(message.payload().unwrap().to_vec())?;
let sequence = Sequence {
id: message.partition().try_into()?,
number: message.offset().try_into()?,
};
Ok(SequencedEntry::new_from_sequence(sequence, entry)?)
})
.boxed();
let fetch_high_watermark = move || {
let consumer_cloned = Arc::clone(&consumer_cloned);
let database_name = database_name.clone();
let fut = async move {
let (_low, high) = tokio::task::spawn_blocking(move || {
consumer_cloned.fetch_watermarks(
&database_name,
sequencer_id as i32,
Duration::from_secs(60),
)
})
.boxed();
(*sequencer_id, stream)
})
.collect()
.await
.expect("subtask failed")?;
Ok(high as u64)
};
fut.boxed() as FetchHighWatermarkFut<'_>
};
let fetch_high_watermark = Box::new(fetch_high_watermark) as FetchHighWatermark<'_>;
streams.push((
sequencer_id,
EntryStream {
stream,
fetch_high_watermark,
},
));
}
streams
}
async fn seek(

View File

@ -2,10 +2,13 @@ use std::{collections::BTreeMap, sync::Arc, task::Poll};
use async_trait::async_trait;
use entry::{Entry, Sequence, SequencedEntry};
use futures::{stream, StreamExt};
use futures::{stream, FutureExt, StreamExt};
use parking_lot::Mutex;
use crate::core::{EntryStream, WriteBufferError, WriteBufferReading, WriteBufferWriting};
use crate::core::{
EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
WriteBufferWriting,
};
type EntryResVec = Vec<Result<SequencedEntry, WriteBufferError>>;
@ -244,7 +247,40 @@ impl WriteBufferReading for MockBufferForReading {
Poll::Pending
})
.boxed();
streams.push((sequencer_id, stream));
let shared_state = self.shared_state.clone();
let fetch_high_watermark = move || {
let shared_state = shared_state.clone();
let fut = async move {
let entries = shared_state.entries.lock();
let entry_vec = entries.get(&sequencer_id).unwrap();
let watermark = entry_vec
.iter()
.filter_map(|entry_res| {
entry_res
.as_ref()
.ok()
.map(|entry| entry.sequence().unwrap().number)
})
.max()
.map(|n| n + 1)
.unwrap_or(0);
Ok(watermark)
};
fut.boxed() as FetchHighWatermarkFut<'_>
};
let fetch_high_watermark = Box::new(fetch_high_watermark) as FetchHighWatermark<'_>;
streams.push((
sequencer_id,
EntryStream {
stream,
fetch_high_watermark,
},
));
}
streams