Merge branch 'main' into crepererum/issue3154_b

pull/24376/head
kodiakhq[bot] 2021-11-24 13:33:30 +00:00 committed by GitHub
commit fa0185118e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 70 deletions

1
Cargo.lock generated
View File

@ -5130,6 +5130,7 @@ dependencies = [
"tempfile",
"time 0.1.0",
"tokio",
"tokio-util",
"trace",
"trace_http",
"uuid",

View File

@ -22,6 +22,7 @@ prost = "0.8"
rdkafka = "0.27.0"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "fs"] }
tokio-util = "0.6.9"
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
uuid = { version = "0.8", features = ["serde", "v4"] }

View File

@ -285,7 +285,9 @@ pub mod test_utils {
let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
// re-creating stream after reading remembers offset
// re-creating stream after reading remembers offset, but wait a bit to provoke the stream to buffer some
// entries
tokio::time::sleep(Duration::from_millis(10)).await;
drop(stream);
drop(streams);
let mut streams = reader.streams();

View File

@ -123,10 +123,10 @@ use crate::codec::{ContentType, IoxHeaders};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use dml::{DmlMeta, DmlOperation};
use futures::{channel::mpsc::Receiver, FutureExt, SinkExt, Stream, StreamExt};
use pin_project::{pin_project, pinned_drop};
use futures::{FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use time::{Time, TimeProvider};
use tokio::task::JoinHandle;
use tokio_util::sync::ReusableBoxFuture;
use trace::TraceCollector;
use uuid::Uuid;
@ -343,11 +343,13 @@ impl WriteBufferReading for FileBufferConsumer {
}
}
#[pin_project(PinnedDrop)]
#[pin_project]
struct ConsumerStream {
join_handle: JoinHandle<()>,
#[pin]
rx: Receiver<Result<DmlOperation, WriteBufferError>>,
fut: ReusableBoxFuture<Result<DmlOperation, WriteBufferError>>,
sequencer_id: u32,
path: PathBuf,
next_sequence_number: Arc<AtomicU64>,
trace_collector: Option<Arc<dyn TraceCollector>>,
}
impl ConsumerStream {
@ -357,49 +359,71 @@ impl ConsumerStream {
next_sequence_number: Arc<AtomicU64>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self {
let (mut tx, rx) = futures::channel::mpsc::channel(1);
Self {
fut: ReusableBoxFuture::new(Self::poll_next_inner(
sequencer_id,
path.clone(),
Arc::clone(&next_sequence_number),
trace_collector.clone(),
)),
sequencer_id,
path,
next_sequence_number,
trace_collector,
}
}
let join_handle = tokio::spawn(async move {
loop {
let sequence_number = next_sequence_number.load(Ordering::SeqCst);
async fn poll_next_inner(
sequencer_id: u32,
path: PathBuf,
next_sequence_number: Arc<AtomicU64>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<DmlOperation, WriteBufferError> {
loop {
let sequence_number = next_sequence_number.load(Ordering::SeqCst);
// read file
let file_path = path.join(sequence_number.to_string());
let msg = match tokio::fs::read(&file_path).await {
Ok(data) => {
// decode file
let sequence = Sequence {
id: sequencer_id,
number: sequence_number,
};
match Self::decode_file(data, sequence, trace_collector.clone()) {
Ok(write) => {
match next_sequence_number.compare_exchange(
sequence_number,
sequence_number + 1,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
// can send to output
Ok(write)
}
Err(_) => {
// interleaving change, retry
continue;
}
// read file
let file_path = path.join(sequence_number.to_string());
let msg = match tokio::fs::read(&file_path).await {
Ok(data) => {
// decode file
let sequence = Sequence {
id: sequencer_id,
number: sequence_number,
};
match Self::decode_file(data, sequence, trace_collector.clone()) {
Ok(write) => {
match next_sequence_number.compare_exchange(
sequence_number,
sequence_number + 1,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
// can send to output
Ok(write)
}
Err(_) => {
// interleaving change, retry
continue;
}
}
Err(e) => Err(e),
}
Err(e) => Err(e),
}
Err(error) => {
match error.kind() {
std::io::ErrorKind::NotFound => {
// figure out watermark and see if there's a gap in the stream
if let Ok(watermark) = watermark(&path).await {
// watermark is "last sequence number + 1", so substract 1 before comparing
if watermark.saturating_sub(1) > sequence_number {
}
Err(error) => {
match error.kind() {
std::io::ErrorKind::NotFound => {
// figure out watermark and see if there's a gap in the stream
if let Ok(watermark) = watermark(&path).await {
// watermark is "last sequence number + 1", so substract 1 before comparing
if watermark.saturating_sub(1) > sequence_number {
// while generating the watermark, a writer might have created the file that we've
// tried to read, so we need to double-check
if let Err(std::io::ErrorKind::NotFound) =
tokio::fs::metadata(&file_path).await.map_err(|e| e.kind())
{
// update position
// failures are OK here since we'll re-read this value next round
next_sequence_number
@ -412,28 +436,23 @@ impl ConsumerStream {
.ok();
continue;
}
};
}
};
// no gap detected, just wait a bit for new data
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
_ => {
// cannot read file => communicate to user
Err(Box::new(error) as WriteBufferError)
}
// no gap detected, just wait a bit for new data
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
_ => {
// cannot read file => communicate to user
Err(Box::new(error) as WriteBufferError)
}
}
};
if tx.send(msg).await.is_err() {
// Receiver is gone
return;
}
}
});
};
Self { join_handle, rx }
return msg;
}
}
fn decode_file(
@ -476,13 +495,6 @@ impl ConsumerStream {
}
}
#[pinned_drop]
impl PinnedDrop for ConsumerStream {
fn drop(self: Pin<&mut Self>) {
self.join_handle.abort();
}
}
impl Stream for ConsumerStream {
type Item = Result<DmlOperation, WriteBufferError>;
@ -491,7 +503,19 @@ impl Stream for ConsumerStream {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.rx.poll_next(cx)
match this.fut.poll(cx) {
std::task::Poll::Ready(res) => {
this.fut.set(Self::poll_next_inner(
*this.sequencer_id,
this.path.clone(),
Arc::clone(this.next_sequence_number),
this.trace_collector.clone(),
));
std::task::Poll::Ready(Some(res))
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}