From 08fcd87337bbace034fd95adce4510374b21e944 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 3 Nov 2021 15:19:05 +0000 Subject: [PATCH] feat: use protobuf encoding in write buffer (#2724) (#3018) --- Cargo.lock | 3 ++ .../iox/write_buffer/v1/write_buffer.proto | 9 +++++ mutable_batch_pb/src/decode.rs | 19 +++++++++- write_buffer/Cargo.toml | 3 ++ write_buffer/src/codec.rs | 38 +++++++++++++++++-- 5 files changed, 67 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 951d8db2b3..a2fec20177 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5033,13 +5033,16 @@ dependencies = [ "dotenv", "entry", "futures", + "generated_types", "http", "httparse", "mutable_batch", "mutable_batch_entry", + "mutable_batch_pb", "observability_deps", "parking_lot", "pin-project", + "prost", "rdkafka", "tempfile", "time 0.1.0", diff --git a/generated_types/protos/influxdata/iox/write_buffer/v1/write_buffer.proto b/generated_types/protos/influxdata/iox/write_buffer/v1/write_buffer.proto index c328c0850b..d234d38a50 100644 --- a/generated_types/protos/influxdata/iox/write_buffer/v1/write_buffer.proto +++ b/generated_types/protos/influxdata/iox/write_buffer/v1/write_buffer.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package influxdata.iox.write_buffer.v1; option go_package = "github.com/influxdata/iox/write_buffer/v1"; +import "influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"; + // Configures the use of a write buffer. message WriteBufferConnection { enum Direction { @@ -56,3 +58,10 @@ message WriteBufferCreationConfig { // Contains 0 or more key value pairs map options = 2; } + +// A write payload for the write buffer +message WriteBufferPayload { + oneof payload { + influxdata.pbdata.v1.DatabaseBatch write = 1; + } +} \ No newline at end of file diff --git a/mutable_batch_pb/src/decode.rs b/mutable_batch_pb/src/decode.rs index 0e4aa62416..aa2b266c1a 100644 --- a/mutable_batch_pb/src/decode.rs +++ b/mutable_batch_pb/src/decode.rs @@ -1,11 +1,11 @@ //! Code to decode [`MutableBatch`] from pbdata protobuf -use hashbrown::HashSet; +use hashbrown::{HashMap, HashSet}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use generated_types::influxdata::pbdata::v1::{ column::{SemanticType, Values as PbValues}, - Column as PbColumn, PackedStrings, TableBatch, + Column as PbColumn, DatabaseBatch, PackedStrings, TableBatch, }; use mutable_batch::{writer::Writer, MutableBatch}; use schema::{InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME}; @@ -57,6 +57,21 @@ pub enum Error { /// Result type for pbdata conversion pub type Result = std::result::Result; +/// Decodes a [`DatabaseBatch`] to a map of [`MutableBatch`] keyed by table name +pub fn decode_database_batch( + database_batch: &DatabaseBatch, +) -> Result> { + let mut ret = HashMap::with_capacity(database_batch.table_batches.len()); + for table_batch in &database_batch.table_batches { + let (_, batch) = ret + .raw_entry_mut() + .from_key(table_batch.table_name.as_str()) + .or_insert_with(|| (table_batch.table_name.clone(), MutableBatch::new())); + write_table_batch(batch, table_batch)?; + } + Ok(ret) +} + /// Writes the provided [`TableBatch`] to a [`MutableBatch`] on error any changes made /// to `batch` are reverted pub fn write_table_batch(batch: &mut MutableBatch, table_batch: &TableBatch) -> Result<()> { diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index 6b4f2d4815..9ea3c3ea15 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -9,13 +9,16 @@ data_types = { path = "../data_types" } dotenv = "0.15.0" entry = { path = "../entry" } futures = "0.3" +generated_types = { path = "../generated_types" } http = "0.2" httparse = "1.5" mutable_batch = { path = "../mutable_batch" } mutable_batch_entry = { path = "../mutable_batch_entry" } +mutable_batch_pb = { path = "../mutable_batch_pb" } observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" pin-project = "1.0" +prost = "0.8" rdkafka = "0.27.0" time = { path = "../time" } tokio = { version = "1.13", features = ["macros", "fs"] } diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 9a1aa16814..de06da78af 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -7,8 +7,9 @@ use http::{HeaderMap, HeaderValue}; use data_types::sequence::Sequence; use entry::{Entry, SequencedEntry}; -use mutable_batch::DbWrite; +use mutable_batch::{DbWrite, WriteMeta}; use mutable_batch_entry::sequenced_entry_to_write; +use mutable_batch_pb::decode::decode_database_batch; use time::Time; use trace::ctx::SpanContext; use trace::TraceCollector; @@ -26,6 +27,10 @@ use crate::core::WriteBufferError; pub const CONTENT_TYPE_FLATBUFFER: &str = r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#; +/// Pbdata based content type +pub const CONTENT_TYPE_PROTOBUF: &str = + r#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#; + /// Message header that determines message content type. pub const HEADER_CONTENT_TYPE: &str = "content-type"; @@ -35,6 +40,7 @@ pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id"; #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum ContentType { Entry, + Protobuf, } /// IOx-specific headers attached to every write buffer message. @@ -119,6 +125,7 @@ impl IoxHeaders { pub fn headers(&self) -> impl Iterator)> + '_ { let content_type = match self.content_type { ContentType::Entry => CONTENT_TYPE_FLATBUFFER.into(), + ContentType::Protobuf => CONTENT_TYPE_PROTOBUF.into(), }; std::iter::once((HEADER_CONTENT_TYPE, content_type)).chain( @@ -141,6 +148,9 @@ pub fn decode( sequence: Sequence, producer_ts: Time, ) -> Result { + use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload; + use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload; + match headers.content_type { ContentType::Entry => { let entry = Entry::try_from(data.to_vec())?; @@ -152,15 +162,37 @@ pub fn decode( ); sequenced_entry_to_write(&entry).map_err(|e| Box::new(e) as WriteBufferError) } + ContentType::Protobuf => { + let payload: WriteBufferPayload = prost::Message::decode(data) + .map_err(|e| format!("failed to decode WriteBufferPayload: {}", e))?; + + let payload = payload.payload.ok_or_else(|| "no payload".to_string())?; + let tables = match &payload { + Payload::Write(write) => decode_database_batch(write) + .map_err(|e| format!("failed to decode database batch: {}", e))?, + }; + + Ok(DbWrite::new( + tables, + WriteMeta::new( + Some(sequence), + Some(producer_ts), + headers.span_context, + Some(data.len()), + ), + )) + } } } #[cfg(test)] mod tests { - use super::*; - use crate::core::test_utils::assert_span_context_eq; use trace::RingBufferTraceCollector; + use crate::core::test_utils::assert_span_context_eq; + + use super::*; + #[test] fn headers_roundtrip() { let collector: Arc = Arc::new(RingBufferTraceCollector::new(5));