Merge pull request #2059 from influxdata/crepererum/writer_buffer_seek
feat: implement `seek` for write bufferpull/24376/head
commit
7d9e1f9704
|
@ -656,6 +656,9 @@ impl Db {
|
|||
// streaming from the write buffer loop
|
||||
async {
|
||||
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
|
||||
let mut write_buffer = write_buffer
|
||||
.try_lock()
|
||||
.expect("no streams should exist at this point");
|
||||
let mut futures = vec![];
|
||||
for (_sequencer_id, stream) in write_buffer.streams() {
|
||||
let fut = self.stream_in_sequenced_entries(stream);
|
||||
|
@ -1212,10 +1215,12 @@ mod tests {
|
|||
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
|
||||
write_buffer_state
|
||||
.push_entry(SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry).unwrap());
|
||||
let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state));
|
||||
let write_buffer = MockBufferForReading::new(write_buffer_state);
|
||||
|
||||
let db = TestDb::builder()
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _))
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::new(
|
||||
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
|
||||
)))
|
||||
.build()
|
||||
.await
|
||||
.db;
|
||||
|
@ -1271,10 +1276,12 @@ mod tests {
|
|||
String::from("Something bad happened on the way to creating a SequencedEntry").into(),
|
||||
0,
|
||||
);
|
||||
let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state));
|
||||
let write_buffer = MockBufferForReading::new(write_buffer_state);
|
||||
|
||||
let test_db = TestDb::builder()
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _))
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::new(
|
||||
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
|
||||
)))
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -2259,10 +2266,12 @@ mod tests {
|
|||
);
|
||||
write_buffer_state
|
||||
.push_entry(SequencedEntry::new_from_sequence(Sequence::new(0, 1), entry).unwrap());
|
||||
let write_buffer = Arc::new(MockBufferForReading::new(write_buffer_state));
|
||||
let write_buffer = MockBufferForReading::new(write_buffer_state);
|
||||
|
||||
let db = TestDb::builder()
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::clone(&write_buffer) as _))
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::new(
|
||||
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
|
||||
)))
|
||||
.build()
|
||||
.await
|
||||
.db;
|
||||
|
|
|
@ -13,7 +13,7 @@ use crate::{
|
|||
#[derive(Debug)]
|
||||
pub enum WriteBufferConfig {
|
||||
Writing(Arc<dyn WriteBufferWriting>),
|
||||
Reading(Arc<dyn WriteBufferReading>),
|
||||
Reading(Arc<tokio::sync::Mutex<Box<dyn WriteBufferReading>>>),
|
||||
}
|
||||
|
||||
impl WriteBufferConfig {
|
||||
|
@ -36,7 +36,9 @@ impl WriteBufferConfig {
|
|||
Some(WriteBufferConnection::Reading(conn)) => {
|
||||
let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name).await?;
|
||||
|
||||
Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _)))
|
||||
Ok(Some(Self::Reading(Arc::new(tokio::sync::Mutex::new(
|
||||
Box::new(kafka_buffer) as _,
|
||||
)))))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
|
|
|
@ -25,12 +25,18 @@ pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
|
|||
pub type EntryStream<'a> = BoxStream<'a, Result<SequencedEntry, WriteBufferError>>;
|
||||
|
||||
/// Produce streams (one per sequencer) of [`SequencedEntry`]s.
|
||||
#[async_trait]
|
||||
pub trait WriteBufferReading: Sync + Send + std::fmt::Debug + 'static {
|
||||
/// Returns a stream per sequencer.
|
||||
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
|
||||
where
|
||||
'life0: 'async_trait,
|
||||
Self: 'async_trait;
|
||||
fn streams(&mut self) -> Vec<(u32, EntryStream<'_>)>;
|
||||
|
||||
/// Seek given sequencer to given sequence number. The next output of related streams will be an entry with at least
|
||||
/// the given sequence number (the actual sequence number might be skipped due to "holes" in the stream).
|
||||
async fn seek(
|
||||
&mut self,
|
||||
sequencer_id: u32,
|
||||
sequence_number: u64,
|
||||
) -> Result<(), WriteBufferError>;
|
||||
}
|
||||
|
||||
pub mod test_utils {
|
||||
|
@ -62,9 +68,9 @@ pub mod test_utils {
|
|||
T: TestAdapter,
|
||||
{
|
||||
test_single_stream_io(&adapter).await;
|
||||
test_multi_stream_io(&adapter).await;
|
||||
test_multi_sequencer_io(&adapter).await;
|
||||
test_multi_writer_multi_reader(&adapter).await;
|
||||
test_seek(&adapter).await;
|
||||
}
|
||||
|
||||
async fn test_single_stream_io<T>(adapter: &T)
|
||||
|
@ -78,7 +84,7 @@ pub mod test_utils {
|
|||
let entry_3 = lp_to_entry("upc user=3 300");
|
||||
|
||||
let writer = context.writing();
|
||||
let reader = context.reading().await;
|
||||
let mut reader = context.reading().await;
|
||||
|
||||
let mut streams = reader.streams();
|
||||
assert_eq!(streams.len(), 1);
|
||||
|
@ -118,7 +124,7 @@ pub mod test_utils {
|
|||
let entry_3 = lp_to_entry("upc user=3 300");
|
||||
|
||||
let writer = context.writing();
|
||||
let reader = context.reading().await;
|
||||
let mut reader = context.reading().await;
|
||||
|
||||
let mut streams = reader.streams();
|
||||
assert_eq!(streams.len(), 2);
|
||||
|
@ -151,47 +157,6 @@ pub mod test_utils {
|
|||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
}
|
||||
|
||||
async fn test_multi_stream_io<T>(adapter: &T)
|
||||
where
|
||||
T: TestAdapter,
|
||||
{
|
||||
let context = adapter.new_context(1).await;
|
||||
|
||||
let entry_1 = lp_to_entry("upc user=1 100");
|
||||
let entry_2 = lp_to_entry("upc user=2 200");
|
||||
let entry_3 = lp_to_entry("upc user=3 300");
|
||||
|
||||
let writer = context.writing();
|
||||
let reader = context.reading().await;
|
||||
|
||||
let mut streams_1 = reader.streams();
|
||||
let mut streams_2 = reader.streams();
|
||||
assert_eq!(streams_1.len(), 1);
|
||||
assert_eq!(streams_2.len(), 1);
|
||||
let (sequencer_id_1, mut stream_1) = streams_1.pop().unwrap();
|
||||
let (sequencer_id_2, mut stream_2) = streams_2.pop().unwrap();
|
||||
assert_eq!(sequencer_id_1, sequencer_id_2);
|
||||
|
||||
let waker = futures::task::noop_waker();
|
||||
let mut cx = futures::task::Context::from_waker(&waker);
|
||||
|
||||
// empty streams is pending
|
||||
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
|
||||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
|
||||
// streams poll from same source
|
||||
writer.store_entry(&entry_1, sequencer_id_1).await.unwrap();
|
||||
writer.store_entry(&entry_2, sequencer_id_1).await.unwrap();
|
||||
writer.store_entry(&entry_3, sequencer_id_1).await.unwrap();
|
||||
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_1);
|
||||
assert_eq!(stream_2.next().await.unwrap().unwrap().entry(), &entry_2);
|
||||
assert_eq!(stream_1.next().await.unwrap().unwrap().entry(), &entry_3);
|
||||
|
||||
// both streams are pending again
|
||||
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
|
||||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
}
|
||||
|
||||
async fn test_multi_writer_multi_reader<T>(adapter: &T)
|
||||
where
|
||||
T: TestAdapter,
|
||||
|
@ -204,8 +169,8 @@ pub mod test_utils {
|
|||
|
||||
let writer_1 = context.writing();
|
||||
let writer_2 = context.writing();
|
||||
let reader_1 = context.reading().await;
|
||||
let reader_2 = context.reading().await;
|
||||
let mut reader_1 = context.reading().await;
|
||||
let mut reader_2 = context.reading().await;
|
||||
|
||||
// TODO: do not hard-code sequencer IDs here but provide a proper interface
|
||||
writer_1.store_entry(&entry_east_1, 0).await.unwrap();
|
||||
|
@ -213,18 +178,78 @@ pub mod test_utils {
|
|||
writer_2.store_entry(&entry_east_2, 0).await.unwrap();
|
||||
|
||||
assert_reader_content(
|
||||
reader_1,
|
||||
&mut reader_1,
|
||||
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[&entry_west_1])],
|
||||
)
|
||||
.await;
|
||||
assert_reader_content(
|
||||
reader_2,
|
||||
&mut reader_2,
|
||||
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[&entry_west_1])],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn assert_reader_content<R>(reader: R, expected: &[(u32, &[&Entry])])
|
||||
async fn test_seek<T>(adapter: &T)
|
||||
where
|
||||
T: TestAdapter,
|
||||
{
|
||||
let context = adapter.new_context(2).await;
|
||||
|
||||
let waker = futures::task::noop_waker();
|
||||
let mut cx = futures::task::Context::from_waker(&waker);
|
||||
|
||||
let entry_east_1 = lp_to_entry("upc,region=east user=1 100");
|
||||
let entry_east_2 = lp_to_entry("upc,region=east user=2 200");
|
||||
let entry_east_3 = lp_to_entry("upc,region=east user=3 300");
|
||||
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
|
||||
|
||||
let writer = context.writing();
|
||||
let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().number;
|
||||
let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().number;
|
||||
let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().number;
|
||||
|
||||
let mut reader_1 = context.reading().await;
|
||||
let mut reader_2 = context.reading().await;
|
||||
|
||||
// forward seek
|
||||
reader_1.seek(0, sequence_number_east_2).await.unwrap();
|
||||
assert_reader_content(
|
||||
&mut reader_1,
|
||||
&[(0, &[&entry_east_2]), (1, &[&entry_west_1])],
|
||||
)
|
||||
.await;
|
||||
assert_reader_content(
|
||||
&mut reader_2,
|
||||
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[&entry_west_1])],
|
||||
)
|
||||
.await;
|
||||
|
||||
// backward seek
|
||||
reader_1.seek(0, 0).await.unwrap();
|
||||
assert_reader_content(
|
||||
&mut reader_1,
|
||||
&[(0, &[&entry_east_1, &entry_east_2]), (1, &[])],
|
||||
)
|
||||
.await;
|
||||
|
||||
// seek to far end and then at data
|
||||
reader_1.seek(0, 1_000_000).await.unwrap();
|
||||
let _sequence_number_east_3 = writer.store_entry(&entry_east_3, 0).await.unwrap().number;
|
||||
let mut streams = reader_1.streams();
|
||||
assert_eq!(streams.len(), 2);
|
||||
let (_sequencer_id, mut stream_1) = streams.pop().unwrap();
|
||||
let (_sequencer_id, mut stream_2) = streams.pop().unwrap();
|
||||
assert!(stream_1.poll_next_unpin(&mut cx).is_pending());
|
||||
assert!(stream_2.poll_next_unpin(&mut cx).is_pending());
|
||||
drop(stream_1);
|
||||
drop(stream_2);
|
||||
drop(streams);
|
||||
|
||||
// seeking unknown sequencer is NOT an error
|
||||
reader_1.seek(0, 42).await.unwrap();
|
||||
}
|
||||
|
||||
async fn assert_reader_content<R>(reader: &mut R, expected: &[(u32, &[&Entry])])
|
||||
where
|
||||
R: WriteBufferReading,
|
||||
{
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use std::{
|
||||
collections::BTreeMap,
|
||||
convert::{TryFrom, TryInto},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
|
@ -13,7 +15,7 @@ use rdkafka::{
|
|||
error::KafkaError,
|
||||
producer::{FutureProducer, FutureRecord},
|
||||
util::Timeout,
|
||||
ClientConfig, Message, TopicPartitionList,
|
||||
ClientConfig, Message, Offset, TopicPartitionList,
|
||||
};
|
||||
|
||||
use crate::core::{EntryStream, WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
||||
|
@ -94,7 +96,7 @@ impl KafkaBufferProducer {
|
|||
pub struct KafkaBufferConsumer {
|
||||
conn: String,
|
||||
database_name: String,
|
||||
consumers: Vec<(u32, StreamConsumer)>,
|
||||
consumers: BTreeMap<u32, Arc<StreamConsumer>>,
|
||||
}
|
||||
|
||||
// Needed because rdkafka's StreamConsumer doesn't impl Debug
|
||||
|
@ -107,18 +109,15 @@ impl std::fmt::Debug for KafkaBufferConsumer {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WriteBufferReading for KafkaBufferConsumer {
|
||||
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
|
||||
where
|
||||
'life0: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
fn streams(&mut self) -> Vec<(u32, EntryStream<'_>)> {
|
||||
self.consumers
|
||||
.iter()
|
||||
.map(|(sequencer_id, consumer)| {
|
||||
let stream = consumer
|
||||
.stream()
|
||||
.map(|message| {
|
||||
.map(move |message| {
|
||||
let message = message?;
|
||||
let entry = Entry::try_from(message.payload().unwrap().to_vec())?;
|
||||
let sequence = Sequence {
|
||||
|
@ -133,6 +132,35 @@ impl WriteBufferReading for KafkaBufferConsumer {
|
|||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn seek(
|
||||
&mut self,
|
||||
sequencer_id: u32,
|
||||
sequence_number: u64,
|
||||
) -> Result<(), WriteBufferError> {
|
||||
if let Some(consumer) = self.consumers.get(&sequencer_id) {
|
||||
let consumer = Arc::clone(consumer);
|
||||
let database_name = self.database_name.clone();
|
||||
let offset = if sequence_number > 0 {
|
||||
Offset::Offset(sequence_number as i64)
|
||||
} else {
|
||||
Offset::Beginning
|
||||
};
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
consumer.seek(
|
||||
&database_name,
|
||||
sequencer_id as i32,
|
||||
offset,
|
||||
Duration::from_secs(60),
|
||||
)
|
||||
})
|
||||
.await
|
||||
.expect("subtask failed")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl KafkaBufferConsumer {
|
||||
|
@ -169,11 +197,21 @@ impl KafkaBufferConsumer {
|
|||
|
||||
let mut assignment = TopicPartitionList::new();
|
||||
assignment.add_partition(&database_name, partition as i32);
|
||||
consumer.assign(&assignment)?;
|
||||
|
||||
Ok((partition, consumer))
|
||||
// We must set the offset to `Beginning` here to avoid the following error during seek:
|
||||
// KafkaError (Seek error: Local: Erroneous state)
|
||||
//
|
||||
// Also see:
|
||||
// - https://github.com/Blizzard/node-rdkafka/issues/237
|
||||
// - https://github.com/confluentinc/confluent-kafka-go/issues/121#issuecomment-362308376
|
||||
assignment
|
||||
.set_partition_offset(&database_name, partition as i32, Offset::Beginning)
|
||||
.expect("partition was set just before");
|
||||
|
||||
consumer.assign(&assignment)?;
|
||||
Ok((partition, Arc::new(consumer)))
|
||||
})
|
||||
.collect::<Result<Vec<(u32, StreamConsumer)>, KafkaError>>()?;
|
||||
.collect::<Result<BTreeMap<u32, Arc<StreamConsumer>>, KafkaError>>()?;
|
||||
|
||||
Ok(Self {
|
||||
conn,
|
||||
|
|
|
@ -153,21 +153,38 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
|
|||
}
|
||||
}
|
||||
|
||||
/// Sequencer-specific playback state
|
||||
struct PlaybackState {
|
||||
/// Index within the entry vector.
|
||||
vector_index: usize,
|
||||
|
||||
/// Offset within the sequencer IDs.
|
||||
offset: u64,
|
||||
}
|
||||
|
||||
pub struct MockBufferForReading {
|
||||
state: MockBufferSharedState,
|
||||
positions: Arc<Mutex<BTreeMap<u32, usize>>>,
|
||||
shared_state: MockBufferSharedState,
|
||||
playback_states: Arc<Mutex<BTreeMap<u32, PlaybackState>>>,
|
||||
}
|
||||
|
||||
impl MockBufferForReading {
|
||||
pub fn new(state: MockBufferSharedState) -> Self {
|
||||
let n_sequencers = state.entries.lock().len() as u32;
|
||||
let positions: BTreeMap<_, _> = (0..n_sequencers)
|
||||
.map(|sequencer_id| (sequencer_id, 0))
|
||||
let playback_states: BTreeMap<_, _> = (0..n_sequencers)
|
||||
.map(|sequencer_id| {
|
||||
(
|
||||
sequencer_id,
|
||||
PlaybackState {
|
||||
vector_index: 0,
|
||||
offset: 0,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
state,
|
||||
positions: Arc::new(Mutex::new(positions)),
|
||||
shared_state: state,
|
||||
playback_states: Arc::new(Mutex::new(playback_states)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -178,38 +195,52 @@ impl std::fmt::Debug for MockBufferForReading {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WriteBufferReading for MockBufferForReading {
|
||||
fn streams<'life0, 'async_trait>(&'life0 self) -> Vec<(u32, EntryStream<'async_trait>)>
|
||||
where
|
||||
'life0: 'async_trait,
|
||||
Self: 'async_trait,
|
||||
{
|
||||
fn streams(&mut self) -> Vec<(u32, EntryStream<'_>)> {
|
||||
let sequencer_ids: Vec<_> = {
|
||||
let positions = self.positions.lock();
|
||||
positions.keys().copied().collect()
|
||||
let playback_states = self.playback_states.lock();
|
||||
playback_states.keys().copied().collect()
|
||||
};
|
||||
|
||||
let mut streams = vec![];
|
||||
for sequencer_id in sequencer_ids {
|
||||
let state = self.state.clone();
|
||||
let positions = Arc::clone(&self.positions);
|
||||
let shared_state = self.shared_state.clone();
|
||||
let playback_states = Arc::clone(&self.playback_states);
|
||||
|
||||
let stream = stream::poll_fn(move |_ctx| {
|
||||
let entries = state.entries.lock();
|
||||
let mut positions = positions.lock();
|
||||
let entries = shared_state.entries.lock();
|
||||
let mut playback_states = playback_states.lock();
|
||||
|
||||
let entry_vec = entries.get(&sequencer_id).unwrap();
|
||||
let position = positions.get_mut(&sequencer_id).unwrap();
|
||||
let playback_state = playback_states.get_mut(&sequencer_id).unwrap();
|
||||
|
||||
if entry_vec.len() > *position {
|
||||
let entry = match &entry_vec[*position] {
|
||||
Ok(entry) => Ok(entry.clone()),
|
||||
Err(e) => Err(e.to_string().into()),
|
||||
};
|
||||
*position += 1;
|
||||
return Poll::Ready(Some(entry));
|
||||
while entry_vec.len() > playback_state.vector_index {
|
||||
let entry_result = &entry_vec[playback_state.vector_index];
|
||||
|
||||
// consume entry
|
||||
playback_state.vector_index += 1;
|
||||
|
||||
match entry_result {
|
||||
Ok(entry) => {
|
||||
// found an entry => need to check if it is within the offset
|
||||
let sequence = entry.sequence().unwrap();
|
||||
if sequence.number >= playback_state.offset {
|
||||
// within offset => return entry to caller
|
||||
return Poll::Ready(Some(Ok(entry.clone())));
|
||||
} else {
|
||||
// offset is larger then the current entry => ignore entry and try next
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// found an error => return entry to caller
|
||||
return Poll::Ready(Some(Err(e.to_string().into())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// we are at the end of the recorded entries => report pending
|
||||
Poll::Pending
|
||||
})
|
||||
.boxed();
|
||||
|
@ -218,6 +249,23 @@ impl WriteBufferReading for MockBufferForReading {
|
|||
|
||||
streams
|
||||
}
|
||||
|
||||
async fn seek(
|
||||
&mut self,
|
||||
sequencer_id: u32,
|
||||
sequence_number: u64,
|
||||
) -> Result<(), WriteBufferError> {
|
||||
let mut playback_states = self.playback_states.lock();
|
||||
|
||||
if let Some(playback_state) = playback_states.get_mut(&sequencer_id) {
|
||||
playback_state.offset = sequence_number;
|
||||
|
||||
// reset position to start since seeking might go backwards
|
||||
playback_state.vector_index = 0;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
Loading…
Reference in New Issue