From a6d83a3026e743719b062d0db5cb051091bf0572 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Mon, 15 Nov 2021 10:19:54 +0000 Subject: [PATCH] feat: WriteBufferReader use DmlOperation (#2731) (#3096) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- dml/src/lib.rs | 16 +++++++++++ .../tests/end_to_end_cases/write_buffer.rs | 11 +++++--- .../tests/end_to_end_cases/write_pb.rs | 4 +-- server/src/db/replay.rs | 14 ++++++++-- server/src/write_buffer.rs | 18 +++++++----- server/src/write_buffer/metrics.rs | 28 +++++++++++-------- write_buffer/src/codec.rs | 23 ++++++++------- write_buffer/src/core.rs | 26 ++++++++--------- write_buffer/src/file.rs | 16 +++++------ write_buffer/src/mock.rs | 12 ++++---- 10 files changed, 103 insertions(+), 65 deletions(-) diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 75ea3be86f..a0d8ebffe9 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -90,6 +90,15 @@ pub enum DmlOperation { Write(DmlWrite), } +impl DmlOperation { + /// Gets the metadata associated with this operation + pub fn meta(&self) -> &DmlMeta { + match &self { + DmlOperation::Write(w) => w.meta(), + } + } +} + /// A collection of writes to potentially multiple tables within the same database #[derive(Debug, Clone)] pub struct DmlWrite { @@ -202,6 +211,13 @@ pub mod test_util { use super::*; + /// Asserts `a` contains a WriteOperation equal to `b` + pub fn assert_write_op_eq(a: &DmlOperation, b: &DmlWrite) { + match a { + DmlOperation::Write(a) => assert_writes_eq(a, b), + } + } + /// Asserts two writes are equal pub fn assert_writes_eq(a: &DmlWrite, b: &DmlWrite) { assert_eq!(a.meta().sequence(), b.meta().sequence()); diff --git a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs index 042647ca32..05a580a766 100644 --- a/influxdb_iox/tests/end_to_end_cases/write_buffer.rs +++ b/influxdb_iox/tests/end_to_end_cases/write_buffer.rs @@ -6,6 +6,7 @@ use crate::{ end_to_end_cases::scenario::{rand_name, DatabaseBuilder}, }; use arrow_util::assert_batches_sorted_eq; +use dml::DmlOperation; use futures::StreamExt; use generated_types::influxdata::iox::write_buffer::v1::{ write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection, @@ -67,8 +68,9 @@ async fn writes_go_to_write_buffer() { .await .unwrap(); let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); - let db_write = stream.stream.next().await.unwrap().unwrap(); - assert_eq!(db_write.table_count(), 2); + match stream.stream.next().await.unwrap().unwrap() { + DmlOperation::Write(write) => assert_eq!(write.table_count(), 2), + } } #[tokio::test] @@ -117,8 +119,9 @@ async fn writes_go_to_write_buffer_whitelist() { .await .unwrap(); let (_, mut stream) = consumer.streams().into_iter().next().unwrap(); - let db_write = stream.stream.next().await.unwrap().unwrap(); - assert_eq!(db_write.table_count(), 1); + match stream.stream.next().await.unwrap().unwrap() { + DmlOperation::Write(write) => assert_eq!(write.table_count(), 1), + } } #[tokio::test] diff --git a/influxdb_iox/tests/end_to_end_cases/write_pb.rs b/influxdb_iox/tests/end_to_end_cases/write_pb.rs index 320bc8805f..3e2bec803e 100644 --- a/influxdb_iox/tests/end_to_end_cases/write_pb.rs +++ b/influxdb_iox/tests/end_to_end_cases/write_pb.rs @@ -1,7 +1,7 @@ use super::scenario::{create_readable_database, create_router_to_write_buffer, rand_name}; use crate::common::server_fixture::{ServerFixture, ServerType}; use arrow_util::assert_batches_sorted_eq; -use dml::{test_util::assert_writes_eq, DmlWrite}; +use dml::{test_util::assert_write_op_eq, DmlWrite}; use futures::StreamExt; use generated_types::influxdata::pbdata::v1 as pb; use mutable_batch_lp::lines_to_batches; @@ -60,7 +60,7 @@ pub async fn test_write_pb_router() { // We don't care about the metadata here, timestamps and sequence numbers are hard to guess write_actual.meta().clone(), ); - assert_writes_eq(&write_actual, &write_expected); + assert_write_op_eq(&write_actual, &write_expected); } fn write_request(db_name: &str) -> pb::WriteRequest { diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 76aff0c907..e849fc6cbd 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -5,6 +5,7 @@ use std::{ }; use data_types::sequence::Sequence; +use dml::DmlOperation; use futures::TryStreamExt; use mutable_batch::payload::PartitionWrite; use observability_deps::tracing::{info, warn}; @@ -214,13 +215,16 @@ pub async fn perform_replay( "replay sequencer", ); - while let Some(db_write) = stream + while let Some(dml_operation) = stream .stream .try_next() .await .context(EntryError { sequencer_id })? { - let sequence = *db_write.meta().sequence().expect("entry must be sequenced"); + let sequence = *dml_operation + .meta() + .sequence() + .expect("entry must be sequenced"); if sequence.number > min_max.max() { return Err(Error::EntryLostError { sequencer_id, @@ -237,7 +241,11 @@ pub async fn perform_replay( let mut logged_hard_limit = false; let n_tries = 600; // 600*100ms = 60s for n_try in 1..=n_tries { - match db.store_filtered_write(&db_write, filter) { + let result = match &dml_operation { + DmlOperation::Write(write) => db.store_filtered_write(write, filter), + }; + + match result { Ok(_) => { break; } diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index 2eb87d4fda..e86cf128f0 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -8,7 +8,7 @@ use futures::{FutureExt, StreamExt, TryFutureExt}; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; -use dml::DmlWrite; +use dml::DmlOperation; use observability_deps::tracing::{debug, error, info, warn}; use trace::span::SpanRecorder; use write_buffer::core::{FetchHighWatermark, WriteBufferError, WriteBufferReading}; @@ -101,7 +101,7 @@ impl Drop for WriteBufferConsumer { async fn stream_in_sequenced_entries<'a>( db: Arc, sequencer_id: u32, - mut stream: BoxStream<'a, Result>, + mut stream: BoxStream<'a, Result>, f_mark: FetchHighWatermark<'a>, mut metrics: SequencerMetrics, ) { @@ -138,7 +138,7 @@ async fn stream_in_sequenced_entries<'a>( let ingest_recorder = metrics.recorder(watermark); // get entry from sequencer - let db_write = match db_write_result { + let dml_operation = match db_write_result { Ok(db_write) => db_write, // skip over invalid data in the write buffer so recovery can succeed Err(e) => { @@ -152,19 +152,23 @@ async fn stream_in_sequenced_entries<'a>( } }; - let ingest_recorder = ingest_recorder.write(&db_write); + let ingest_recorder = ingest_recorder.operation(&dml_operation); // store entry let mut logged_hard_limit = false; loop { let mut span_recorder = SpanRecorder::new( - db_write + dml_operation .meta() .span_context() .map(|parent| parent.child("IOx write buffer")), ); - match db.store_write(&db_write) { + let result = match &dml_operation { + DmlOperation::Write(db_write) => db.store_write(db_write), + }; + + match result { Ok(_) => { ingest_recorder.success(); span_recorder.ok("stored write"); @@ -221,7 +225,7 @@ mod tests { use crate::utils::TestDb; use super::*; - use dml::DmlMeta; + use dml::{DmlMeta, DmlWrite}; use metric::{Attributes, Metric, U64Counter, U64Gauge}; use mutable_batch_lp::lines_to_batches; use time::Time; diff --git a/server/src/write_buffer/metrics.rs b/server/src/write_buffer/metrics.rs index fc46686560..11a0e8d33a 100644 --- a/server/src/write_buffer/metrics.rs +++ b/server/src/write_buffer/metrics.rs @@ -1,4 +1,4 @@ -use dml::DmlWrite; +use dml::DmlOperation; use metric::{Attributes, DurationHistogram, Metric, ResultMetric, U64Counter, U64Gauge}; use std::time::Instant; @@ -122,7 +122,7 @@ impl SequencerMetrics { /// Get a recorder that automatically records an error on drop pub fn recorder(&mut self, watermark: u64) -> IngestRecorder<'_> { IngestRecorder { - write: None, + operation: None, metrics: Some(self), watermark, start_time: Instant::now(), @@ -139,8 +139,8 @@ pub struct IngestRecorder<'a> { watermark: u64, start_time: Instant, - /// The `IngestRecorder` is initially created without a write in case of decode error - write: Option<&'a DmlWrite>, + /// The `IngestRecorder` is initially created without an operation in case of decode error + operation: Option<&'a DmlOperation>, /// The SequencerMetrics are taken out of this on record to both avoid duplicate /// recording and also work around lifetime shenanigans @@ -148,10 +148,10 @@ pub struct IngestRecorder<'a> { } impl<'a> IngestRecorder<'a> { - pub fn write(mut self, write: &'a DmlWrite) -> IngestRecorder<'a> { - assert!(self.write.is_none()); + pub fn operation(mut self, operation: &'a DmlOperation) -> IngestRecorder<'a> { + assert!(self.operation.is_none()); Self { - write: Some(write), + operation: Some(operation), metrics: self.metrics.take(), watermark: self.watermark, start_time: self.start_time, @@ -166,8 +166,8 @@ impl<'a> IngestRecorder<'a> { let duration = self.start_time.elapsed(); let metrics = self.metrics.take().expect("record called twice"); - if let Some(db_write) = self.write.as_ref() { - let meta = db_write.meta(); + if let Some(operation) = self.operation.as_ref() { + let meta = operation.meta(); let producer_ts = meta .producer_ts() .expect("entry from write buffer must have a producer wallclock time"); @@ -188,15 +188,19 @@ impl<'a> IngestRecorder<'a> { .saturating_sub(1), ); - metrics.last_min_ts.set(db_write.min_timestamp() as u64); - metrics.last_max_ts.set(db_write.max_timestamp() as u64); + match operation { + DmlOperation::Write(write) => { + metrics.last_min_ts.set(write.min_timestamp() as u64); + metrics.last_max_ts.set(write.max_timestamp() as u64); + } + } metrics .last_ingest_ts .set(producer_ts.timestamp_nanos() as u64); } - match (success, self.write.is_some()) { + match (success, self.operation.is_some()) { (true, true) => { // Successfully ingested entry metrics.ingest_duration.ok.record(duration); diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 7edd14d06f..9e2d80f460 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -7,7 +7,7 @@ use http::{HeaderMap, HeaderValue}; use prost::Message; use data_types::sequence::Sequence; -use dml::{DmlMeta, DmlWrite}; +use dml::{DmlMeta, DmlOperation, DmlWrite}; use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload; use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload; use mutable_batch_pb::decode::decode_database_batch; @@ -132,22 +132,25 @@ pub fn decode( headers: IoxHeaders, sequence: Sequence, producer_ts: Time, -) -> Result { +) -> Result { match headers.content_type { ContentType::Protobuf => { let payload: WriteBufferPayload = prost::Message::decode(data) .map_err(|e| format!("failed to decode WriteBufferPayload: {}", e))?; let payload = payload.payload.ok_or_else(|| "no payload".to_string())?; - let tables = match &payload { - Payload::Write(write) => decode_database_batch(write) - .map_err(|e| format!("failed to decode database batch: {}", e))?, - }; - Ok(DmlWrite::new( - tables, - DmlMeta::sequenced(sequence, producer_ts, headers.span_context, data.len()), - )) + match &payload { + Payload::Write(write) => { + let tables = decode_database_batch(write) + .map_err(|e| format!("failed to decode database batch: {}", e))?; + + Ok(DmlOperation::Write(DmlWrite::new( + tables, + DmlMeta::sequenced(sequence, producer_ts, headers.span_context, data.len()), + ))) + } + } } } } diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 9d805c30c5..e986ad3d5e 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use dml::{DmlMeta, DmlWrite}; +use dml::{DmlMeta, DmlOperation, DmlWrite}; use futures::{future::BoxFuture, stream::BoxStream}; /// Generic boxed error type that is used in this crate. @@ -54,7 +54,7 @@ pub type FetchHighWatermark<'a> = Box FetchHighWatermarkFut<'a>) + /// Output stream of [`WriteBufferReading`]. pub struct WriteStream<'a> { /// Stream that produces entries. - pub stream: BoxStream<'a, Result>, + pub stream: BoxStream<'a, Result>, /// Get high watermark (= what we believe is the next sequence number to be added). /// @@ -105,7 +105,7 @@ pub mod test_utils { }; use async_trait::async_trait; - use dml::{test_util::assert_writes_eq, DmlMeta, DmlWrite}; + use dml::{test_util::assert_write_op_eq, DmlMeta, DmlWrite}; use futures::{StreamExt, TryStreamExt}; use time::{Time, TimeProvider}; use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector}; @@ -221,7 +221,7 @@ pub mod test_utils { // adding content allows us to get results let w1 = write(&writer, entry_1, sequencer_id, None).await; - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w1); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1); // stream is pending again assert!(stream.stream.poll_next_unpin(&mut cx).is_pending()); @@ -230,8 +230,8 @@ pub mod test_utils { let w2 = write(&writer, entry_2, sequencer_id, None).await; let w3 = write(&writer, entry_3, sequencer_id, None).await; - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w2); - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w3); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w2); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w3); // stream is pending again assert!(stream.stream.poll_next_unpin(&mut cx).is_pending()); @@ -270,7 +270,7 @@ pub mod test_utils { let mut streams = reader.streams(); assert_eq!(streams.len(), 1); let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap(); - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w1); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1); // re-creating stream after reading remembers offset drop(stream); @@ -279,8 +279,8 @@ pub mod test_utils { assert_eq!(streams.len(), 1); let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap(); - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w2); - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w3); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w2); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w3); // re-creating stream after reading everything makes it pending drop(stream); @@ -324,16 +324,16 @@ pub mod test_utils { // entries arrive at the right target stream let w1 = write(&writer, entry_1, sequencer_id_1, None).await; - assert_writes_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w1); + assert_write_op_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w1); assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending()); let w2 = write(&writer, entry_2, sequencer_id_2, None).await; assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending()); - assert_writes_eq(&stream_2.stream.next().await.unwrap().unwrap(), &w2); + assert_write_op_eq(&stream_2.stream.next().await.unwrap().unwrap(), &w2); let w3 = write(&writer, entry_3, sequencer_id_1, None).await; assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending()); - assert_writes_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w3); + assert_write_op_eq(&stream_1.stream.next().await.unwrap().unwrap(), &w3); // streams are pending again assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending()); @@ -679,7 +679,7 @@ pub mod test_utils { let actual_writes: Vec<_> = results.iter().collect(); assert_eq!(actual_writes.len(), expected_writes.len()); for (actual, expected) in actual_writes.iter().zip(expected_writes.iter()) { - assert_writes_eq(actual, expected); + assert_write_op_eq(actual, expected); } } diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index c7212e1c11..84a0ee58a7 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -122,7 +122,7 @@ use std::{ use crate::codec::{ContentType, IoxHeaders}; use async_trait::async_trait; use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig}; -use dml::{DmlMeta, DmlWrite}; +use dml::{DmlMeta, DmlOperation, DmlWrite}; use futures::{channel::mpsc::Receiver, FutureExt, SinkExt, Stream, StreamExt}; use pin_project::{pin_project, pinned_drop}; use time::{Time, TimeProvider}; @@ -345,7 +345,7 @@ impl WriteBufferReading for FileBufferConsumer { struct ConsumerStream { join_handle: JoinHandle<()>, #[pin] - rx: Receiver>, + rx: Receiver>, } impl ConsumerStream { @@ -438,7 +438,7 @@ impl ConsumerStream { mut data: Vec, sequence: Sequence, trace_collector: Option>, - ) -> Result { + ) -> Result { let mut headers = [httparse::EMPTY_HEADER; 16]; match httparse::parse_headers(&data, &mut headers)? { httparse::Status::Complete((offset, headers)) => { @@ -482,7 +482,7 @@ impl PinnedDrop for ConsumerStream { } impl Stream for ConsumerStream { - type Item = Result; + type Item = Result; fn poll_next( self: Pin<&mut Self>, @@ -630,7 +630,7 @@ pub mod test_utils { mod tests { use std::num::NonZeroU32; - use dml::test_util::assert_writes_eq; + use dml::test_util::assert_write_op_eq; use tempfile::TempDir; use trace::RingBufferTraceCollector; @@ -754,8 +754,8 @@ mod tests { let mut reader = ctx.reading(true).await.unwrap(); let mut stream = reader.streams().remove(&sequencer_id).unwrap(); - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w1); - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w4); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w1); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w4); } #[tokio::test] @@ -782,6 +782,6 @@ mod tests { let mut reader = ctx.reading(true).await.unwrap(); let mut stream = reader.streams().remove(&sequencer_id).unwrap(); - assert_writes_eq(&stream.stream.next().await.unwrap().unwrap(), &w2); + assert_write_op_eq(&stream.stream.next().await.unwrap().unwrap(), &w2); } } diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index 554f55255b..b40bd818e8 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -11,7 +11,7 @@ use parking_lot::Mutex; use data_types::sequence::Sequence; use data_types::write_buffer::WriteBufferCreationConfig; -use dml::{DmlMeta, DmlWrite}; +use dml::{DmlMeta, DmlOperation, DmlWrite}; use time::TimeProvider; use crate::core::{ @@ -25,7 +25,7 @@ struct WriteResVec { max_seqno: Option, /// The writes - writes: Vec>, + writes: Vec>, /// A list of Waker waiting for a new entry to be pushed /// @@ -35,7 +35,7 @@ struct WriteResVec { } impl WriteResVec { - pub fn push(&mut self, val: Result) { + pub fn push(&mut self, val: Result) { if let Ok(entry) = &val { if let Some(seqno) = entry.meta().sequence() { self.max_seqno = Some(match self.max_seqno { @@ -135,7 +135,7 @@ impl MockBufferSharedState { ); } - writes_vec.push(Ok(write)); + writes_vec.push(Ok(DmlOperation::Write(write))); } /// Push line protocol data with placeholder values used for write metadata @@ -165,7 +165,7 @@ impl MockBufferSharedState { /// # Panics /// - when no sequencer was initialized /// - when sequencer does not exist - pub fn get_messages(&self, sequencer_id: u32) -> Vec> { + pub fn get_messages(&self, sequencer_id: u32) -> Vec> { let mut guard = self.writes.lock(); let writes = guard.as_mut().expect("no sequencers initialized"); let writes_vec = writes.get_mut(&sequencer_id).expect("invalid sequencer ID"); @@ -269,7 +269,7 @@ impl WriteBufferWriting for MockBufferForWriting { let mut write = write.clone(); write.set_meta(meta.clone()); - writes_vec.push(Ok(write)); + writes_vec.push(Ok(DmlOperation::Write(write))); Ok(meta) }