parent
51c6348e54
commit
08fcd87337
|
|
@ -5033,13 +5033,16 @@ dependencies = [
|
|||
"dotenv",
|
||||
"entry",
|
||||
"futures",
|
||||
"generated_types",
|
||||
"http",
|
||||
"httparse",
|
||||
"mutable_batch",
|
||||
"mutable_batch_entry",
|
||||
"mutable_batch_pb",
|
||||
"observability_deps",
|
||||
"parking_lot",
|
||||
"pin-project",
|
||||
"prost",
|
||||
"rdkafka",
|
||||
"tempfile",
|
||||
"time 0.1.0",
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ syntax = "proto3";
|
|||
package influxdata.iox.write_buffer.v1;
|
||||
option go_package = "github.com/influxdata/iox/write_buffer/v1";
|
||||
|
||||
import "influxdata/pbdata/v1/influxdb_pb_data_protocol.proto";
|
||||
|
||||
// Configures the use of a write buffer.
|
||||
message WriteBufferConnection {
|
||||
enum Direction {
|
||||
|
|
@ -56,3 +58,10 @@ message WriteBufferCreationConfig {
|
|||
// Contains 0 or more key value pairs
|
||||
map<string, string> options = 2;
|
||||
}
|
||||
|
||||
// A write payload for the write buffer
|
||||
message WriteBufferPayload {
|
||||
oneof payload {
|
||||
influxdata.pbdata.v1.DatabaseBatch write = 1;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,11 +1,11 @@
|
|||
//! Code to decode [`MutableBatch`] from pbdata protobuf
|
||||
|
||||
use hashbrown::HashSet;
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
|
||||
use generated_types::influxdata::pbdata::v1::{
|
||||
column::{SemanticType, Values as PbValues},
|
||||
Column as PbColumn, PackedStrings, TableBatch,
|
||||
Column as PbColumn, DatabaseBatch, PackedStrings, TableBatch,
|
||||
};
|
||||
use mutable_batch::{writer::Writer, MutableBatch};
|
||||
use schema::{InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME};
|
||||
|
|
@ -57,6 +57,21 @@ pub enum Error {
|
|||
/// Result type for pbdata conversion
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Decodes a [`DatabaseBatch`] to a map of [`MutableBatch`] keyed by table name
|
||||
pub fn decode_database_batch(
|
||||
database_batch: &DatabaseBatch,
|
||||
) -> Result<HashMap<String, MutableBatch>> {
|
||||
let mut ret = HashMap::with_capacity(database_batch.table_batches.len());
|
||||
for table_batch in &database_batch.table_batches {
|
||||
let (_, batch) = ret
|
||||
.raw_entry_mut()
|
||||
.from_key(table_batch.table_name.as_str())
|
||||
.or_insert_with(|| (table_batch.table_name.clone(), MutableBatch::new()));
|
||||
write_table_batch(batch, table_batch)?;
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
/// Writes the provided [`TableBatch`] to a [`MutableBatch`] on error any changes made
|
||||
/// to `batch` are reverted
|
||||
pub fn write_table_batch(batch: &mut MutableBatch, table_batch: &TableBatch) -> Result<()> {
|
||||
|
|
|
|||
|
|
@ -9,13 +9,16 @@ data_types = { path = "../data_types" }
|
|||
dotenv = "0.15.0"
|
||||
entry = { path = "../entry" }
|
||||
futures = "0.3"
|
||||
generated_types = { path = "../generated_types" }
|
||||
http = "0.2"
|
||||
httparse = "1.5"
|
||||
mutable_batch = { path = "../mutable_batch" }
|
||||
mutable_batch_entry = { path = "../mutable_batch_entry" }
|
||||
mutable_batch_pb = { path = "../mutable_batch_pb" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.11.2"
|
||||
pin-project = "1.0"
|
||||
prost = "0.8"
|
||||
rdkafka = "0.27.0"
|
||||
time = { path = "../time" }
|
||||
tokio = { version = "1.13", features = ["macros", "fs"] }
|
||||
|
|
|
|||
|
|
@ -7,8 +7,9 @@ use http::{HeaderMap, HeaderValue};
|
|||
|
||||
use data_types::sequence::Sequence;
|
||||
use entry::{Entry, SequencedEntry};
|
||||
use mutable_batch::DbWrite;
|
||||
use mutable_batch::{DbWrite, WriteMeta};
|
||||
use mutable_batch_entry::sequenced_entry_to_write;
|
||||
use mutable_batch_pb::decode::decode_database_batch;
|
||||
use time::Time;
|
||||
use trace::ctx::SpanContext;
|
||||
use trace::TraceCollector;
|
||||
|
|
@ -26,6 +27,10 @@ use crate::core::WriteBufferError;
|
|||
pub const CONTENT_TYPE_FLATBUFFER: &str =
|
||||
r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#;
|
||||
|
||||
/// Pbdata based content type
|
||||
pub const CONTENT_TYPE_PROTOBUF: &str =
|
||||
r#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#;
|
||||
|
||||
/// Message header that determines message content type.
|
||||
pub const HEADER_CONTENT_TYPE: &str = "content-type";
|
||||
|
||||
|
|
@ -35,6 +40,7 @@ pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
|
|||
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
|
||||
pub enum ContentType {
|
||||
Entry,
|
||||
Protobuf,
|
||||
}
|
||||
|
||||
/// IOx-specific headers attached to every write buffer message.
|
||||
|
|
@ -119,6 +125,7 @@ impl IoxHeaders {
|
|||
pub fn headers(&self) -> impl Iterator<Item = (&str, Cow<'static, str>)> + '_ {
|
||||
let content_type = match self.content_type {
|
||||
ContentType::Entry => CONTENT_TYPE_FLATBUFFER.into(),
|
||||
ContentType::Protobuf => CONTENT_TYPE_PROTOBUF.into(),
|
||||
};
|
||||
|
||||
std::iter::once((HEADER_CONTENT_TYPE, content_type)).chain(
|
||||
|
|
@ -141,6 +148,9 @@ pub fn decode(
|
|||
sequence: Sequence,
|
||||
producer_ts: Time,
|
||||
) -> Result<DbWrite, WriteBufferError> {
|
||||
use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload;
|
||||
use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload;
|
||||
|
||||
match headers.content_type {
|
||||
ContentType::Entry => {
|
||||
let entry = Entry::try_from(data.to_vec())?;
|
||||
|
|
@ -152,15 +162,37 @@ pub fn decode(
|
|||
);
|
||||
sequenced_entry_to_write(&entry).map_err(|e| Box::new(e) as WriteBufferError)
|
||||
}
|
||||
ContentType::Protobuf => {
|
||||
let payload: WriteBufferPayload = prost::Message::decode(data)
|
||||
.map_err(|e| format!("failed to decode WriteBufferPayload: {}", e))?;
|
||||
|
||||
let payload = payload.payload.ok_or_else(|| "no payload".to_string())?;
|
||||
let tables = match &payload {
|
||||
Payload::Write(write) => decode_database_batch(write)
|
||||
.map_err(|e| format!("failed to decode database batch: {}", e))?,
|
||||
};
|
||||
|
||||
Ok(DbWrite::new(
|
||||
tables,
|
||||
WriteMeta::new(
|
||||
Some(sequence),
|
||||
Some(producer_ts),
|
||||
headers.span_context,
|
||||
Some(data.len()),
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::core::test_utils::assert_span_context_eq;
|
||||
use trace::RingBufferTraceCollector;
|
||||
|
||||
use crate::core::test_utils::assert_span_context_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn headers_roundtrip() {
|
||||
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
|
|
|
|||
Loading…
Reference in New Issue