refactor: extract codec module for write buffer (#2724) (#3017)

pull/24376/head
Raphael Taylor-Davies 2021-11-03 14:07:33 +00:00 committed by GitHub
parent e444fa4cb2
commit 51c6348e54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 235 deletions

221
write_buffer/src/codec.rs Normal file
View File

@ -0,0 +1,221 @@
//! Encode/Decode for messages
use std::borrow::Cow;
use std::sync::Arc;
use http::{HeaderMap, HeaderValue};
use data_types::sequence::Sequence;
use entry::{Entry, SequencedEntry};
use mutable_batch::DbWrite;
use mutable_batch_entry::sequenced_entry_to_write;
use time::Time;
use trace::ctx::SpanContext;
use trace::TraceCollector;
use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser};
use crate::core::WriteBufferError;
/// Current flatbuffer-based content type.
///
/// This is a value for [`HEADER_CONTENT_TYPE`].
///
/// Inspired by:
/// - <https://stackoverflow.com/a/56502135>
/// - <https://stackoverflow.com/a/48051331>
pub const CONTENT_TYPE_FLATBUFFER: &str =
r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#;
/// Message header that determines message content type.
pub const HEADER_CONTENT_TYPE: &str = "content-type";
/// Message header for tracing context.
pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ContentType {
Entry,
}
/// IOx-specific headers attached to every write buffer message.
#[derive(Debug)]
pub struct IoxHeaders {
content_type: ContentType,
span_context: Option<SpanContext>,
}
impl IoxHeaders {
/// Create new headers with sane default values and given span context.
pub fn new(content_type: ContentType, span_context: Option<SpanContext>) -> Self {
Self {
content_type,
span_context,
}
}
/// Create new headers where all information is missing.
fn empty() -> Self {
Self {
// Fallback for now https://github.com/influxdata/influxdb_iox/issues/2805
content_type: ContentType::Entry,
span_context: None,
}
}
/// Creates a new IoxHeaders from an iterator of headers
pub fn from_headers(
headers: impl IntoIterator<Item = (impl AsRef<str>, impl AsRef<[u8]>)>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
) -> Result<Self, WriteBufferError> {
let mut res = Self::empty();
for (name, value) in headers {
let name = name.as_ref();
if name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) {
res.content_type = match std::str::from_utf8(value.as_ref()) {
Ok(CONTENT_TYPE_FLATBUFFER) => ContentType::Entry,
Ok(c) => return Err(format!("Unknown message format: {}", c).into()),
Err(e) => {
return Err(format!("Error decoding content type header: {}", e).into())
}
};
}
if let Some(trace_collector) = trace_collector {
if name.eq_ignore_ascii_case(HEADER_TRACE_CONTEXT) {
if let Ok(header_value) = HeaderValue::from_bytes(value.as_ref()) {
let mut headers = HeaderMap::new();
headers.insert(HEADER_TRACE_CONTEXT, header_value);
let parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT);
res.span_context = match parser.parse(trace_collector, &headers) {
Ok(ctx) => ctx,
Err(e) => {
return Err(format!("Error decoding trace context: {}", e).into())
}
};
}
}
}
}
Ok(res)
}
/// Gets the content type
pub fn content_type(&self) -> ContentType {
self.content_type
}
/// Gets the span context if any
pub fn span_context(&self) -> Option<&SpanContext> {
self.span_context.as_ref()
}
/// Returns the header map to encode
pub fn headers(&self) -> impl Iterator<Item = (&str, Cow<'static, str>)> + '_ {
let content_type = match self.content_type {
ContentType::Entry => CONTENT_TYPE_FLATBUFFER.into(),
};
std::iter::once((HEADER_CONTENT_TYPE, content_type)).chain(
self.span_context
.as_ref()
.map(|ctx| {
(
HEADER_TRACE_CONTEXT,
format_jaeger_trace_context(ctx).into(),
)
})
.into_iter(),
)
}
}
pub fn decode(
data: &[u8],
headers: IoxHeaders,
sequence: Sequence,
producer_ts: Time,
) -> Result<DbWrite, WriteBufferError> {
match headers.content_type {
ContentType::Entry => {
let entry = Entry::try_from(data.to_vec())?;
let entry = SequencedEntry::new_from_sequence_and_span_context(
sequence,
producer_ts,
entry,
headers.span_context,
);
sequenced_entry_to_write(&entry).map_err(|e| Box::new(e) as WriteBufferError)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::test_utils::assert_span_context_eq;
use trace::RingBufferTraceCollector;
#[test]
fn headers_roundtrip() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context_parent = SpanContext::new(Arc::clone(&collector));
let span_context = span_context_parent.child("foo").ctx;
let iox_headers1 = IoxHeaders::new(ContentType::Entry, Some(span_context));
let encoded: Vec<_> = iox_headers1
.headers()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let iox_headers2 = IoxHeaders::from_headers(encoded, Some(&collector)).unwrap();
assert_eq!(iox_headers1.content_type, iox_headers2.content_type);
assert_span_context_eq(
iox_headers1.span_context.as_ref().unwrap(),
iox_headers2.span_context.as_ref().unwrap(),
);
}
#[test]
fn headers_case_handling() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let headers = vec![
("conTent-Type", CONTENT_TYPE_FLATBUFFER),
("uber-trace-id", "1:2:3:1"),
("uber-trace-ID", "5:6:7:1"),
];
let actual = IoxHeaders::from_headers(headers.into_iter(), Some(&collector)).unwrap();
assert_eq!(actual.content_type, ContentType::Entry);
let span_context = actual.span_context.unwrap();
assert_eq!(span_context.trace_id.get(), 5);
assert_eq!(span_context.span_id.get(), 6);
}
#[test]
fn headers_no_trace_collector_on_consumer_side() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context = SpanContext::new(Arc::clone(&collector));
let iox_headers1 = IoxHeaders::new(ContentType::Entry, Some(span_context));
let encoded: Vec<_> = iox_headers1
.headers()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let iox_headers2 = IoxHeaders::from_headers(encoded, None).unwrap();
assert!(iox_headers2.span_context.is_none());
}
}

View File

@ -110,7 +110,6 @@
//! [`unlink(2)`]: https://man7.org/linux/man-pages/man2/unlink.2.html
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
path::{Path, PathBuf},
pin::Pin,
str::FromStr,
@ -120,17 +119,16 @@ use std::{
},
};
use crate::codec::{ContentType, IoxHeaders};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use entry::{Entry, SequencedEntry};
use entry::Entry;
use futures::{channel::mpsc::Receiver, FutureExt, SinkExt, Stream, StreamExt};
use http::{header::HeaderName, HeaderMap, HeaderValue};
use mutable_batch::DbWrite;
use pin_project::{pin_project, pinned_drop};
use time::{Time, TimeProvider};
use tokio::task::JoinHandle;
use trace::{ctx::SpanContext, TraceCollector};
use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser};
use uuid::Uuid;
use crate::core::{
@ -138,25 +136,9 @@ use crate::core::{
WriteBufferWriting, WriteStream,
};
/// Header used to declare the content type of the message.
pub const HEADER_CONTENT_TYPE: &str = "content-type";
/// Header used to declare the creation time of the message.
pub const HEADER_TIME: &str = "last-modified";
/// Header used to declare the trace context (optional).
pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
/// Current flatbuffer-based content type.
///
/// This is a value for [`HEADER_CONTENT_TYPE`].
///
/// Inspired by:
/// - <https://stackoverflow.com/a/56502135>
/// - <https://stackoverflow.com/a/48051331>
pub const CONTENT_TYPE_FLATBUFFER: &str =
r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#;
/// File-based write buffer writer.
#[derive(Debug)]
pub struct FileBufferProducer {
@ -204,24 +186,13 @@ impl WriteBufferWriting for FileBufferProducer {
let now = self.time_provider.now();
// assemble message
let mut message: Vec<u8> = format!(
"{}: {}\n{}: {}\n",
HEADER_CONTENT_TYPE,
CONTENT_TYPE_FLATBUFFER,
HEADER_TIME,
now.to_rfc3339(),
)
.into_bytes();
if let Some(span_context) = span_context {
message.extend(
format!(
"{}: {}\n",
HEADER_TRACE_CONTEXT,
format_jaeger_trace_context(span_context),
)
.into_bytes(),
)
let mut message: Vec<u8> = format!("{}: {}\n", HEADER_TIME, now.to_rfc3339(),).into_bytes();
let iox_headers = IoxHeaders::new(ContentType::Entry, span_context.cloned());
for (name, value) in iox_headers.headers() {
message.extend(format!("{}: {}\n", name, value).into_bytes())
}
message.extend(b"\n");
message.extend(entry.data());
@ -389,7 +360,7 @@ impl ConsumerStream {
number: sequence_number,
};
match Self::decode_file(data, sequence, trace_collector.clone()) {
Ok(sequence) => {
Ok(write) => {
match next_sequence_number.compare_exchange(
sequence_number,
sequence_number + 1,
@ -398,8 +369,7 @@ impl ConsumerStream {
) {
Ok(_) => {
// can send to output
mutable_batch_entry::sequenced_entry_to_write(&sequence)
.map_err(|e| Box::new(e) as WriteBufferError)
Ok(write)
}
Err(_) => {
// interleaving change, retry
@ -457,26 +427,14 @@ impl ConsumerStream {
mut data: Vec<u8>,
sequence: Sequence,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<SequencedEntry, WriteBufferError> {
) -> Result<DbWrite, WriteBufferError> {
let mut headers = [httparse::EMPTY_HEADER; 16];
match httparse::parse_headers(&data, &mut headers)? {
httparse::Status::Complete((offset, headers)) => {
// parse content type
let mut content_type = None;
for header in headers {
if header.name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) {
if let Ok(s) = String::from_utf8(header.value.to_vec()) {
content_type = Some(s);
}
}
}
if let Some(content_type) = content_type {
if content_type != CONTENT_TYPE_FLATBUFFER {
return Err(format!("Unknown content type: {}", content_type).into());
}
} else {
return Err("Content type missing".to_string().into());
}
let iox_headers = IoxHeaders::from_headers(
headers.iter().map(|header| (header.name, header.value)),
trace_collector.as_ref(),
)?;
// parse timestamp
let mut timestamp = None;
@ -495,33 +453,10 @@ impl ConsumerStream {
return Err("Timestamp missing".to_string().into());
};
// parse span context
let mut span_context = None;
if let Some(trace_collector) = trace_collector {
let mut header_map = HeaderMap::with_capacity(headers.len());
for header in headers {
if let (Ok(header_name), Ok(header_value)) = (
HeaderName::from_str(header.name),
HeaderValue::from_bytes(header.value),
) {
header_map.insert(header_name, header_value);
}
}
let parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT);
span_context = parser.parse(&trace_collector, &header_map).ok().flatten();
}
// parse entry
let entry_data = data.split_off(offset);
let entry = Entry::try_from(entry_data)?;
Ok(SequencedEntry::new_from_sequence_and_span_context(
sequence,
timestamp,
entry,
span_context,
))
crate::codec::decode(&entry_data, iox_headers, sequence, timestamp)
}
httparse::Status::Partial => Err("Too many headers".to_string().into()),
}

View File

@ -10,10 +10,8 @@ use async_trait::async_trait;
use data_types::{
sequence::Sequence, server_id::ServerId, write_buffer::WriteBufferCreationConfig,
};
use entry::{Entry, SequencedEntry};
use entry::Entry;
use futures::{FutureExt, StreamExt};
use http::{HeaderMap, HeaderValue};
use mutable_batch_entry::sequenced_entry_to_write;
use observability_deps::tracing::{debug, info};
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
@ -28,99 +26,21 @@ use rdkafka::{
};
use time::{Time, TimeProvider};
use trace::{ctx::SpanContext, TraceCollector};
use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser};
use crate::core::{
FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
WriteBufferWriting, WriteStream,
use crate::{
codec::{ContentType, IoxHeaders},
core::{
FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
WriteBufferWriting, WriteStream,
},
};
/// Message header that determines message content type.
pub const HEADER_CONTENT_TYPE: &str = "content-type";
/// Message header for tracing context.
pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
/// Current flatbuffer-based content type.
///
/// This is a value for [`HEADER_CONTENT_TYPE`].
///
/// Inspired by:
/// - <https://stackoverflow.com/a/56502135>
/// - <https://stackoverflow.com/a/48051331>
pub const CONTENT_TYPE_FLATBUFFER: &str =
r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#;
/// IOx-specific headers attached to every Kafka message.
#[derive(Debug)]
struct IoxHeaders {
content_type: Option<String>,
span_context: Option<SpanContext>,
}
impl IoxHeaders {
/// Create new headers with sane default values and given span context.
fn new(span_context: Option<SpanContext>) -> Self {
Self {
content_type: Some(CONTENT_TYPE_FLATBUFFER.to_string()),
span_context,
}
}
/// Create new headers where all information is missing.
fn empty() -> Self {
Self {
content_type: None,
span_context: None,
}
}
/// Parse from Kafka headers.
fn from_kafka<H>(headers: &H, trace_collector: Option<&Arc<dyn TraceCollector>>) -> Self
where
H: Headers,
{
let mut res = Self::empty();
for i in 0..headers.count() {
if let Some((name, value)) = headers.get(i) {
if name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) {
res.content_type = String::from_utf8(value.to_vec()).ok();
}
if let Some(trace_collector) = trace_collector {
if name.eq_ignore_ascii_case(HEADER_TRACE_CONTEXT) {
if let Ok(header_value) = HeaderValue::from_bytes(value) {
let mut headers = HeaderMap::new();
headers.insert(HEADER_TRACE_CONTEXT, header_value);
let parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT);
res.span_context =
parser.parse(trace_collector, &headers).ok().flatten();
}
}
}
}
}
res
}
}
impl From<&IoxHeaders> for OwnedHeaders {
fn from(iox_headers: &IoxHeaders) -> Self {
let mut res = Self::new();
if let Some(content_type) = iox_headers.content_type.as_ref() {
res = res.add(HEADER_CONTENT_TYPE, content_type);
}
if let Some(span_context) = iox_headers.span_context.as_ref() {
res = res.add(
HEADER_TRACE_CONTEXT,
&format_jaeger_trace_context(span_context),
)
for (header, value) in iox_headers.headers() {
res = res.add(header, value.as_ref());
}
res
@ -165,7 +85,7 @@ impl WriteBufferWriting for KafkaBufferProducer {
let timestamp_millis = date_time.timestamp_millis();
let timestamp = Time::from_timestamp_millis(timestamp_millis);
let headers = IoxHeaders::new(span_context.cloned());
let headers = IoxHeaders::new(ContentType::Entry, span_context.cloned());
// This type annotation is necessary because `FutureRecord` is generic over key type, but
// key is optional and we're not setting a key. `String` is arbitrary.
@ -278,18 +198,12 @@ impl WriteBufferReading for KafkaBufferConsumer {
.map(move |message| {
let message = message?;
let headers: IoxHeaders = message.headers().map(|headers| IoxHeaders::from_kafka(headers, trace_collector.as_ref())).unwrap_or_else(IoxHeaders::empty);
// Fallback for now https://github.com/influxdata/influxdb_iox/issues/2805
let content_type = headers.content_type.unwrap_or_else(|| CONTENT_TYPE_FLATBUFFER.to_string());
if content_type != CONTENT_TYPE_FLATBUFFER {
return Err(format!("Unknown message format: {}", content_type).into());
}
let kafka_headers = message.headers().into_iter().flat_map(|headers| (0..headers.count()).map(|idx| headers.get(idx).unwrap()));
let headers = IoxHeaders::from_headers(kafka_headers, trace_collector.as_ref())?;
let payload = message.payload().ok_or_else::<WriteBufferError, _>(|| {
"Payload missing".to_string().into()
})?;
let entry = Entry::try_from(payload.to_vec())?;
// Timestamps were added as part of
// [KIP-32](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message).
@ -313,8 +227,7 @@ impl WriteBufferReading for KafkaBufferConsumer {
number: message.offset().try_into()?,
};
let entry = SequencedEntry::new_from_sequence_and_span_context(sequence, timestamp, entry, headers.span_context);
sequenced_entry_to_write(&entry).map_err(|e| Box::new(e) as WriteBufferError)
crate::codec::decode(payload, headers, sequence, timestamp)
})
.boxed();
@ -672,10 +585,10 @@ mod tests {
use time::TimeProvider;
use trace::{RingBufferTraceCollector, TraceCollector};
use crate::codec::HEADER_CONTENT_TYPE;
use crate::{
core::test_utils::{
assert_span_context_eq, map_pop_first, perform_generic_tests, set_pop_first,
TestAdapter, TestContext,
map_pop_first, perform_generic_tests, set_pop_first, TestAdapter, TestContext,
},
kafka::test_utils::random_kafka_topic,
maybe_skip_kafka_integration,
@ -859,56 +772,4 @@ mod tests {
let err = stream.stream.next().await.unwrap().unwrap_err();
assert_eq!(err.to_string(), "Unknown message format: foo");
}
#[test]
fn headers_roundtrip() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context_parent = SpanContext::new(Arc::clone(&collector));
let span_context = span_context_parent.child("foo").ctx;
let iox_headers1 = IoxHeaders::new(Some(span_context));
let kafka_headers: OwnedHeaders = (&iox_headers1).into();
let iox_headers2 = IoxHeaders::from_kafka(&kafka_headers, Some(&collector));
assert_eq!(iox_headers1.content_type, iox_headers2.content_type);
assert_span_context_eq(
iox_headers1.span_context.as_ref().unwrap(),
iox_headers2.span_context.as_ref().unwrap(),
);
}
#[test]
fn headers_case_handling() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let kafka_headers = OwnedHeaders::new()
.add("content-type", "a")
.add("CONTENT-TYPE", "b")
.add("content-TYPE", "c")
.add("uber-trace-id", "1:2:3:1")
.add("uber-trace-ID", "5:6:7:1");
let actual = IoxHeaders::from_kafka(&kafka_headers, Some(&collector));
assert_eq!(actual.content_type, Some("c".to_string()));
let span_context = actual.span_context.unwrap();
assert_eq!(span_context.trace_id.get(), 5);
assert_eq!(span_context.span_id.get(), 6);
}
#[test]
fn headers_no_trace_collector_on_consumer_side() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context = SpanContext::new(Arc::clone(&collector));
let iox_headers1 = IoxHeaders::new(Some(span_context));
let kafka_headers: OwnedHeaders = (&iox_headers1).into();
let iox_headers2 = IoxHeaders::from_kafka(&kafka_headers, None);
assert!(iox_headers2.span_context.is_none());
}
}

View File

@ -8,6 +8,7 @@
clippy::clone_on_ref_ptr
)]
pub(crate) mod codec;
pub mod config;
pub mod core;
pub mod file;