feat: WriteBufferReader use DmlOperation (#2731) (#3096)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-11-15 10:19:54 +00:00 committed by GitHub
parent 5a237e2bc6
commit a6d83a3026
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 103 additions and 65 deletions

View File

@ -90,6 +90,15 @@ pub enum DmlOperation {
Write(DmlWrite),
}
impl DmlOperation {
/// Gets the metadata associated with this operation
pub fn meta(&self) -> &DmlMeta {
match &self {
DmlOperation::Write(w) => w.meta(),
}
}
}
/// A collection of writes to potentially multiple tables within the same database
#[derive(Debug, Clone)]
pub struct DmlWrite {
@ -202,6 +211,13 @@ pub mod test_util {
use super::*;
/// Asserts `a` contains a WriteOperation equal to `b`
pub fn assert_write_op_eq(a: &DmlOperation, b: &DmlWrite) {
match a {
DmlOperation::Write(a) => assert_writes_eq(a, b),
}
}
/// Asserts two writes are equal
pub fn assert_writes_eq(a: &DmlWrite, b: &DmlWrite) {
assert_eq!(a.meta().sequence(), b.meta().sequence());

View File

@ -6,6 +6,7 @@ use crate::{
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
};
use arrow_util::assert_batches_sorted_eq;
use dml::DmlOperation;
use futures::StreamExt;
use generated_types::influxdata::iox::write_buffer::v1::{
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
@ -67,8 +68,9 @@ async fn writes_go_to_write_buffer() {
.await
.unwrap();
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
let db_write = stream.stream.next().await.unwrap().unwrap();
assert_eq!(db_write.table_count(), 2);
match stream.stream.next().await.unwrap().unwrap() {
DmlOperation::Write(write) => assert_eq!(write.table_count(), 2),
}
}
#[tokio::test]
@ -117,8 +119,9 @@ async fn writes_go_to_write_buffer_whitelist() {
.await
.unwrap();
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
let db_write = stream.stream.next().await.unwrap().unwrap();
assert_eq!(db_write.table_count(), 1);
match stream.stream.next().await.unwrap().unwrap() {
DmlOperation::Write(write) => assert_eq!(write.table_count(), 1),
}
}
#[tokio::test]

View File

@ -1,7 +1,7 @@
use super::scenario::{create_readable_database, create_router_to_write_buffer, rand_name};
use crate::common::server_fixture::{ServerFixture, ServerType};
use arrow_util::assert_batches_sorted_eq;
use dml::{test_util::assert_writes_eq, DmlWrite};
use dml::{test_util::assert_write_op_eq, DmlWrite};
use futures::StreamExt;
use generated_types::influxdata::pbdata::v1 as pb;
use mutable_batch_lp::lines_to_batches;
@ -60,7 +60,7 @@ pub async fn test_write_pb_router() {
// We don't care about the metadata here, timestamps and sequence numbers are hard to guess
write_actual.meta().clone(),
);
assert_writes_eq(&write_actual, &write_expected);
assert_write_op_eq(&write_actual, &write_expected);
}
fn write_request(db_name: &str) -> pb::WriteRequest {

View File

@ -5,6 +5,7 @@ use std::{
};
use data_types::sequence::Sequence;
use dml::DmlOperation;
use futures::TryStreamExt;
use mutable_batch::payload::PartitionWrite;
use observability_deps::tracing::{info, warn};
@ -214,13 +215,16 @@ pub async fn perform_replay(
"replay sequencer",
);
while let Some(db_write) = stream
while let Some(dml_operation) = stream
.stream
.try_next()
.await
.context(EntryError { sequencer_id })?
{
let sequence = *db_write.meta().sequence().expect("entry must be sequenced");
let sequence = *dml_operation
.meta()
.sequence()
.expect("entry must be sequenced");
if sequence.number > min_max.max() {
return Err(Error::EntryLostError {
sequencer_id,
@ -237,7 +241,11 @@ pub async fn perform_replay(
let mut logged_hard_limit = false;
let n_tries = 600; // 600*100ms = 60s
for n_try in 1..=n_tries {
match db.store_filtered_write(&db_write, filter) {
let result = match &dml_operation {
DmlOperation::Write(write) => db.store_filtered_write(write, filter),
};
match result {
Ok(_) => {
break;
}

View File

@ -8,7 +8,7 @@ use futures::{FutureExt, StreamExt, TryFutureExt};
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use dml::DmlWrite;
use dml::DmlOperation;
use observability_deps::tracing::{debug, error, info, warn};
use trace::span::SpanRecorder;
use write_buffer::core::{FetchHighWatermark, WriteBufferError, WriteBufferReading};
@ -101,7 +101,7 @@ impl Drop for WriteBufferConsumer {
async fn stream_in_sequenced_entries<'a>(
db: Arc<Db>,
sequencer_id: u32,
mut stream: BoxStream<'a, Result<DmlWrite, WriteBufferError>>,
mut stream: BoxStream<'a, Result<DmlOperation, WriteBufferError>>,
f_mark: FetchHighWatermark<'a>,
mut metrics: SequencerMetrics,
) {
@ -138,7 +138,7 @@ async fn stream_in_sequenced_entries<'a>(
let ingest_recorder = metrics.recorder(watermark);
// get entry from sequencer
let db_write = match db_write_result {
let dml_operation = match db_write_result {
Ok(db_write) => db_write,
// skip over invalid data in the write buffer so recovery can succeed
Err(e) => {
@ -152,19 +152,23 @@ async fn stream_in_sequenced_entries<'a>(
}
};
let ingest_recorder = ingest_recorder.write(&db_write);
let ingest_recorder = ingest_recorder.operation(&dml_operation);
// store entry
let mut logged_hard_limit = false;
loop {
let mut span_recorder = SpanRecorder::new(
db_write
dml_operation
.meta()
.span_context()
.map(|parent| parent.child("IOx write buffer")),
);
match db.store_write(&db_write) {
let result = match &dml_operation {
DmlOperation::Write(db_write) => db.store_write(db_write),
};
match result {
Ok(_) => {
ingest_recorder.success();
span_recorder.ok("stored write");
@ -221,7 +225,7 @@ mod tests {
use crate::utils::TestDb;
use super::*;
use dml::DmlMeta;
use dml::{DmlMeta, DmlWrite};
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use mutable_batch_lp::lines_to_batches;
use time::Time;

View File

@ -1,4 +1,4 @@
use dml::DmlWrite;
use dml::DmlOperation;
use metric::{Attributes, DurationHistogram, Metric, ResultMetric, U64Counter, U64Gauge};
use std::time::Instant;
@ -122,7 +122,7 @@ impl SequencerMetrics {
/// Get a recorder that automatically records an error on drop
pub fn recorder(&mut self, watermark: u64) -> IngestRecorder<'_> {
IngestRecorder {
write: None,
operation: None,
metrics: Some(self),
watermark,
start_time: Instant::now(),
@ -139,8 +139,8 @@ pub struct IngestRecorder<'a> {
watermark: u64,
start_time: Instant,
/// The `IngestRecorder` is initially created without a write in case of decode error
write: Option<&'a DmlWrite>,
/// The `IngestRecorder` is initially created without an operation in case of decode error
operation: Option<&'a DmlOperation>,
/// The SequencerMetrics are taken out of this on record to both avoid duplicate
/// recording and also work around lifetime shenanigans
@ -148,10 +148,10 @@ pub struct IngestRecorder<'a> {
}
impl<'a> IngestRecorder<'a> {
pub fn write(mut self, write: &'a DmlWrite) -> IngestRecorder<'a> {
assert!(self.write.is_none());
pub fn operation(mut self, operation: &'a DmlOperation) -> IngestRecorder<'a> {
assert!(self.operation.is_none());
Self {
write: Some(write),
operation: Some(operation),
metrics: self.metrics.take(),
watermark: self.watermark,
start_time: self.start_time,
@ -166,8 +166,8 @@ impl<'a> IngestRecorder<'a> {
let duration = self.start_time.elapsed();
let metrics = self.metrics.take().expect("record called twice");
if let Some(db_write) = self.write.as_ref() {
let meta = db_write.meta();
if let Some(operation) = self.operation.as_ref() {
let meta = operation.meta();
let producer_ts = meta
.producer_ts()
.expect("entry from write buffer must have a producer wallclock time");
@ -188,15 +188,19 @@ impl<'a> IngestRecorder<'a> {
.saturating_sub(1),
);
metrics.last_min_ts.set(db_write.min_timestamp() as u64);
metrics.last_max_ts.set(db_write.max_timestamp() as u64);
match operation {
DmlOperation::Write(write) => {
metrics.last_min_ts.set(write.min_timestamp() as u64);
metrics.last_max_ts.set(write.max_timestamp() as u64);
}
}
metrics
.last_ingest_ts
.set(producer_ts.timestamp_nanos() as u64);
}
match (success, self.write.is_some()) {
match (success, self.operation.is_some()) {
(true, true) => {
// Successfully ingested entry
metrics.ingest_duration.ok.record(duration);

View File

@ -7,7 +7,7 @@ use http::{HeaderMap, HeaderValue};
use prost::Message;
use data_types::sequence::Sequence;
use dml::{DmlMeta, DmlWrite};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload;
use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload;
use mutable_batch_pb::decode::decode_database_batch;
@ -132,22 +132,25 @@ pub fn decode(
headers: IoxHeaders,
sequence: Sequence,
producer_ts: Time,
) -> Result<DmlWrite, WriteBufferError> {
) -> Result<DmlOperation, WriteBufferError> {
match headers.content_type {
ContentType::Protobuf => {
let payload: WriteBufferPayload = prost::Message::decode(data)
.map_err(|e| format!("failed to decode WriteBufferPayload: {}", e))?;
let payload = payload.payload.ok_or_else(|| "no payload".to_string())?;
let tables = match &payload {
Payload::Write(write) => decode_database_batch(write)
.map_err(|e| format!("failed to decode database batch: {}", e))?,
};
Ok(DmlWrite::new(
tables,
DmlMeta::sequenced(sequence, producer_ts, headers.span_context, data.len()),
))
match &payload {
Payload::Write(write) => {
let tables = decode_database_batch(write)
.map_err(|e| format!("failed to decode database batch: {}", e))?;
Ok(DmlOperation::Write(DmlWrite::new(
tables,
DmlMeta::sequenced(sequence, producer_ts, headers.span_context, data.len()),
)))
}
}
}
}
}

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use dml::{DmlMeta, DmlWrite};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use futures::{future::BoxFuture, stream::BoxStream};
/// Generic boxed error type that is used in this crate.
@ -54,7 +54,7 @@ pub type FetchHighWatermark<'a> = Box<dyn (Fn() -> FetchHighWatermarkFut<'a>) +
/// Output stream of [`WriteBufferReading`].
pub struct WriteStream<'a> {
/// Stream that produces entries.
pub stream: BoxStream<'a, Result<DmlWrite, WriteBufferError>>,
pub stream: BoxStream<'a, Result<DmlOperation, WriteBufferError>>,
/// Get high watermark (= what we believe is the next sequence number to be added).
///
@ -105,7 +105,7 @@ pub mod test_utils {
};
use async_trait::async_trait;
use dml::{test_util::assert_writes_eq, DmlMeta, DmlWrite};
use dml::{test_util::assert_write_op_eq, DmlMeta, DmlWrite};
use futures::{StreamExt, TryStreamExt};
use time::{Time, TimeProvider};
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
@ -221,7 +221,7 @@ pub mod test_utils {
// adding content allows us to get results
let w1 = write(&writer, entry_1, sequencer_id, None).await;
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
// stream is pending again
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
@ -230,8 +230,8 @@ pub mod test_utils {
let w2 = write(&writer, entry_2, sequencer_id, None).await;
let w3 = write(&writer, entry_3, sequencer_id, None).await;
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w2);
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w3);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w2);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w3);
// stream is pending again
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
@ -270,7 +270,7 @@ pub mod test_utils {
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
// re-creating stream after reading remembers offset
drop(stream);
@ -279,8 +279,8 @@ pub mod test_utils {
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w2);
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w3);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w2);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w3);
// re-creating stream after reading everything makes it pending
drop(stream);
@ -324,16 +324,16 @@ pub mod test_utils {
// entries arrive at the right target stream
let w1 = write(&writer, entry_1, sequencer_id_1, None).await;
assert_writes_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w1);
assert_write_op_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w1);
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
let w2 = write(&writer, entry_2, sequencer_id_2, None).await;
assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending());
assert_writes_eq(&stream_2.stream.next().await.unwrap().unwrap(), &w2);
assert_write_op_eq(&stream_2.stream.next().await.unwrap().unwrap(), &w2);
let w3 = write(&writer, entry_3, sequencer_id_1, None).await;
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
assert_writes_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w3);
assert_write_op_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w3);
// streams are pending again
assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending());
@ -679,7 +679,7 @@ pub mod test_utils {
let actual_writes: Vec<_> = results.iter().collect();
assert_eq!(actual_writes.len(), expected_writes.len());
for (actual, expected) in actual_writes.iter().zip(expected_writes.iter()) {
assert_writes_eq(actual, expected);
assert_write_op_eq(actual, expected);
}
}

View File

@ -122,7 +122,7 @@ use std::{
use crate::codec::{ContentType, IoxHeaders};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use dml::{DmlMeta, DmlWrite};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use futures::{channel::mpsc::Receiver, FutureExt, SinkExt, Stream, StreamExt};
use pin_project::{pin_project, pinned_drop};
use time::{Time, TimeProvider};
@ -345,7 +345,7 @@ impl WriteBufferReading for FileBufferConsumer {
struct ConsumerStream {
join_handle: JoinHandle<()>,
#[pin]
rx: Receiver<Result<DmlWrite, WriteBufferError>>,
rx: Receiver<Result<DmlOperation, WriteBufferError>>,
}
impl ConsumerStream {
@ -438,7 +438,7 @@ impl ConsumerStream {
mut data: Vec<u8>,
sequence: Sequence,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<DmlWrite, WriteBufferError> {
) -> Result<DmlOperation, WriteBufferError> {
let mut headers = [httparse::EMPTY_HEADER; 16];
match httparse::parse_headers(&data, &mut headers)? {
httparse::Status::Complete((offset, headers)) => {
@ -482,7 +482,7 @@ impl PinnedDrop for ConsumerStream {
}
impl Stream for ConsumerStream {
type Item = Result<DmlWrite, WriteBufferError>;
type Item = Result<DmlOperation, WriteBufferError>;
fn poll_next(
self: Pin<&mut Self>,
@ -630,7 +630,7 @@ pub mod test_utils {
mod tests {
use std::num::NonZeroU32;
use dml::test_util::assert_writes_eq;
use dml::test_util::assert_write_op_eq;
use tempfile::TempDir;
use trace::RingBufferTraceCollector;
@ -754,8 +754,8 @@ mod tests {
let mut reader = ctx.reading(true).await.unwrap();
let mut stream = reader.streams().remove(&sequencer_id).unwrap();
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w4);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w4);
}
#[tokio::test]
@ -782,6 +782,6 @@ mod tests {
let mut reader = ctx.reading(true).await.unwrap();
let mut stream = reader.streams().remove(&sequencer_id).unwrap();
assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w2);
assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w2);
}
}

View File

@ -11,7 +11,7 @@ use parking_lot::Mutex;
use data_types::sequence::Sequence;
use data_types::write_buffer::WriteBufferCreationConfig;
use dml::{DmlMeta, DmlWrite};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use time::TimeProvider;
use crate::core::{
@ -25,7 +25,7 @@ struct WriteResVec {
max_seqno: Option<u64>,
/// The writes
writes: Vec<Result<DmlWrite, WriteBufferError>>,
writes: Vec<Result<DmlOperation, WriteBufferError>>,
/// A list of Waker waiting for a new entry to be pushed
///
@ -35,7 +35,7 @@ struct WriteResVec {
}
impl WriteResVec {
pub fn push(&mut self, val: Result<DmlWrite, WriteBufferError>) {
pub fn push(&mut self, val: Result<DmlOperation, WriteBufferError>) {
if let Ok(entry) = &val {
if let Some(seqno) = entry.meta().sequence() {
self.max_seqno = Some(match self.max_seqno {
@ -135,7 +135,7 @@ impl MockBufferSharedState {
);
}
writes_vec.push(Ok(write));
writes_vec.push(Ok(DmlOperation::Write(write)));
}
/// Push line protocol data with placeholder values used for write metadata
@ -165,7 +165,7 @@ impl MockBufferSharedState {
/// # Panics
/// - when no sequencer was initialized
/// - when sequencer does not exist
pub fn get_messages(&self, sequencer_id: u32) -> Vec<Result<DmlWrite, WriteBufferError>> {
pub fn get_messages(&self, sequencer_id: u32) -> Vec<Result<DmlOperation, WriteBufferError>> {
let mut guard = self.writes.lock();
let writes = guard.as_mut().expect("no sequencers initialized");
let writes_vec = writes.get_mut(&sequencer_id).expect("invalid sequencer ID");
@ -269,7 +269,7 @@ impl WriteBufferWriting for MockBufferForWriting {
let mut write = write.clone();
write.set_meta(meta.clone());
writes_vec.push(Ok(write));
writes_vec.push(Ok(DmlOperation::Write(write)));
Ok(meta)
}