feat: propagate trace information through write buffer

pull/24376/head
Marco Neumann 2021-10-13 12:19:20 +02:00
parent d752b79cbe
commit 5e06519afb
11 changed files with 284 additions and 53 deletions

4
Cargo.lock generated
View File

@ -1019,6 +1019,7 @@ dependencies = [
"schema",
"snafu",
"time 0.1.0",
"trace",
]
[[package]]
@ -4982,11 +4983,14 @@ dependencies = [
"dotenv",
"entry",
"futures",
"http",
"observability_deps",
"parking_lot",
"rdkafka",
"time 0.1.0",
"tokio",
"trace",
"trace_http",
"uuid",
]

View File

@ -13,6 +13,7 @@ data_types = { path = "../data_types" }
flatbuffers = "2"
snafu = "0.6"
time = { path = "../time" }
trace = { path = "../trace" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ouroboros = "0.13.0"
schema = { path = "../schema" }

View File

@ -18,6 +18,7 @@ use schema::{
IOxValueType, InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME,
};
use time::Time;
use trace::ctx::SpanContext;
use crate::entry_fb;
@ -1750,6 +1751,9 @@ pub struct SequencedEntry {
/// At the time of writing, sequences will not be present when there is no configured mechanism to define the order
/// of all writes.
sequence_and_producer_ts: Option<(Sequence, Time)>,
// Optional span context associated w/ this entry.
span_context: Option<SpanContext>,
}
impl SequencedEntry {
@ -1761,6 +1765,20 @@ impl SequencedEntry {
Self {
entry,
sequence_and_producer_ts: Some((sequence, producer_wallclock_timestamp)),
span_context: None,
}
}
pub fn new_from_sequence_and_span_context(
sequence: Sequence,
producer_wallclock_timestamp: Time,
entry: Entry,
span_context: Option<SpanContext>,
) -> Self {
Self {
entry,
sequence_and_producer_ts: Some((sequence, producer_wallclock_timestamp)),
span_context,
}
}
@ -1768,6 +1786,7 @@ impl SequencedEntry {
Self {
entry,
sequence_and_producer_ts: None,
span_context: None,
}
}
@ -1790,6 +1809,10 @@ impl SequencedEntry {
pub fn entry(&self) -> &Entry {
&self.entry
}
pub fn span_context(&self) -> Option<&SpanContext> {
self.span_context.as_ref()
}
}
pub mod test_helpers {

View File

@ -26,6 +26,7 @@ use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{future::Future, sync::Arc, time::Duration};
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken;
use trace::{RingBufferTraceCollector, TraceCollector};
use uuid::Uuid;
const INIT_BACKOFF: Duration = Duration::from_secs(1);
@ -1151,6 +1152,9 @@ impl DatabaseStateCatalogLoaded {
) -> Result<DatabaseStateInitialized, InitError> {
let db = Arc::clone(&self.db);
// TODO: use proper trace collector
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let rules = self.provided_rules.rules();
let write_buffer_factory = shared.application.write_buffer_factory();
let write_buffer_consumer = match rules.write_buffer_connection.as_ref() {
@ -1159,6 +1163,7 @@ impl DatabaseStateCatalogLoaded {
.new_config_read(
shared.config.server_id,
shared.config.name.as_str(),
&trace_collector,
connection,
)
.await

View File

@ -978,8 +978,9 @@ impl Db {
// will get picked up when data is read from the write buffer.
// TODO: be smarter than always using sequencer 0
// TODO: propgate span context
let _ = write_buffer
.store_entry(&entry, 0)
.store_entry(&entry, 0, None)
.await
.context(WriteBufferWritingError)?;
Ok(())
@ -989,8 +990,9 @@ impl Db {
// buffer to return success before adding the entry to the mutable buffer.
// TODO: be smarter than always using sequencer 0
// TODO: propagate span context
let (sequence, producer_wallclock_timestamp) = write_buffer
.store_entry(&entry, 0)
.store_entry(&entry, 0, None)
.await
.context(WriteBufferWritingError)?;
let sequenced_entry = Arc::new(SequencedEntry::new_from_sequence(

View File

@ -727,6 +727,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
.pop()
.unwrap(),
0,
None,
)
.await
.unwrap();
@ -736,6 +737,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
.pop()
.unwrap(),
0,
None,
)
.await
.unwrap();
@ -745,6 +747,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
.pop()
.unwrap(),
0,
None,
)
.await
.unwrap();
@ -771,6 +774,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
.pop()
.unwrap(),
0,
None,
)
.await
.unwrap();

View File

@ -9,11 +9,14 @@ data_types = { path = "../data_types" }
dotenv = "0.15.0"
entry = { path = "../entry" }
futures = "0.3"
http = "0.2"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2"
rdkafka = "0.26.0"
time = { path = "../time" }
tokio = { version = "1.11", features = ["macros", "fs"] }
trace = { path = "../trace" }
trace_http = { path = "../trace_http" }
uuid = { version = "0.8", features = ["serde", "v4"] }
[package.metadata.cargo-udeps.ignore]

View File

@ -8,6 +8,7 @@ use data_types::{
server_id::ServerId,
};
use time::TimeProvider;
use trace::TraceCollector;
use crate::{
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
@ -135,6 +136,7 @@ impl WriteBufferConfigFactory {
&self,
server_id: ServerId,
db_name: &str,
trace_collector: &Arc<dyn TraceCollector>,
cfg: &WriteBufferConnection,
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
assert_eq!(cfg.direction, WriteBufferDirection::Read);
@ -147,6 +149,7 @@ impl WriteBufferConfigFactory {
db_name,
&cfg.connection_config,
cfg.creation_config.as_ref(),
trace_collector,
)
.await?;
Box::new(kafka_buffer) as _
@ -176,6 +179,7 @@ mod tests {
use std::{convert::TryFrom, num::NonZeroU32};
use data_types::{database_rules::WriteBufferCreationConfig, DatabaseName};
use trace::RingBufferTraceCollector;
use crate::{
kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration,
@ -207,6 +211,8 @@ mod tests {
#[tokio::test]
async fn test_reading_kafka() {
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let conn = maybe_skip_kafka_integration!();
let time = Arc::new(time::SystemProvider::new());
let factory = WriteBufferConfigFactory::new(time);
@ -222,7 +228,7 @@ mod tests {
};
let conn = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
@ -268,6 +274,8 @@ mod tests {
#[tokio::test]
async fn test_reading_mock() {
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let time = Arc::new(time::SystemProvider::new());
let mut factory = WriteBufferConfigFactory::new(time);
@ -286,7 +294,7 @@ mod tests {
};
let conn = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock");
@ -299,7 +307,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
@ -343,6 +351,8 @@ mod tests {
#[tokio::test]
async fn test_reading_mock_failing() {
let trace_collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let time = Arc::new(time::SystemProvider::new());
let mut factory = WriteBufferConfigFactory::new(time);
@ -360,7 +370,7 @@ mod tests {
};
let conn = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
@ -373,7 +383,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_read(server_id, db_name.as_str(), &cfg)
.new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));

View File

@ -8,6 +8,7 @@ use data_types::sequence::Sequence;
use entry::{Entry, SequencedEntry};
use futures::{future::BoxFuture, stream::BoxStream};
use time::Time;
use trace::ctx::SpanContext;
/// Generic boxed error type that is used in this crate.
///
@ -25,11 +26,14 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
/// Send an `Entry` to the write buffer using the specified sequencer ID.
///
/// You can attach an optional [`SpanContext`] which will be propagated to the reader side.
///
/// Returns information that can be used to restore entries at a later time.
async fn store_entry(
&self,
entry: &Entry,
sequencer_id: u32,
span_context: Option<&SpanContext>,
) -> Result<(Sequence, Time), WriteBufferError>;
/// Return type (like `"mock"` or `"kafka"`) of this writer.
@ -96,6 +100,7 @@ pub mod test_utils {
use entry::{test_helpers::lp_to_entry, Entry};
use futures::{StreamExt, TryStreamExt};
use time::{Time, TimeProvider};
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
@ -158,6 +163,7 @@ pub mod test_utils {
test_timestamp(&adapter).await;
test_sequencer_auto_creation(&adapter).await;
test_sequencer_ids(&adapter).await;
test_span_context(&adapter).await;
}
/// Test IO with a single writer and single reader stream.
@ -190,7 +196,10 @@ pub mod test_utils {
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
// adding content allows us to get results
writer.store_entry(&entry_1, sequencer_id).await.unwrap();
writer
.store_entry(&entry_1, sequencer_id, None)
.await
.unwrap();
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_1
@ -200,8 +209,14 @@ pub mod test_utils {
assert!(stream.stream.poll_next_unpin(&mut cx).is_pending());
// adding more data unblocks the stream
writer.store_entry(&entry_2, sequencer_id).await.unwrap();
writer.store_entry(&entry_3, sequencer_id).await.unwrap();
writer
.store_entry(&entry_2, sequencer_id, None)
.await
.unwrap();
writer
.store_entry(&entry_3, sequencer_id, None)
.await
.unwrap();
assert_eq!(
stream.stream.next().await.unwrap().unwrap().entry(),
&entry_2
@ -235,9 +250,9 @@ pub mod test_utils {
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
writer.store_entry(&entry_1, 0).await.unwrap();
writer.store_entry(&entry_2, 0).await.unwrap();
writer.store_entry(&entry_3, 0).await.unwrap();
writer.store_entry(&entry_1, 0, None).await.unwrap();
writer.store_entry(&entry_2, 0, None).await.unwrap();
writer.store_entry(&entry_3, 0, None).await.unwrap();
// creating stream, drop stream, re-create it => still starts at first entry
let mut streams = reader.streams();
@ -309,21 +324,30 @@ pub mod test_utils {
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
// entries arrive at the right target stream
writer.store_entry(&entry_1, sequencer_id_1).await.unwrap();
writer
.store_entry(&entry_1, sequencer_id_1, None)
.await
.unwrap();
assert_eq!(
stream_1.stream.next().await.unwrap().unwrap().entry(),
&entry_1
);
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
writer.store_entry(&entry_2, sequencer_id_2).await.unwrap();
writer
.store_entry(&entry_2, sequencer_id_2, None)
.await
.unwrap();
assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending());
assert_eq!(
stream_2.stream.next().await.unwrap().unwrap().entry(),
&entry_2
);
writer.store_entry(&entry_3, sequencer_id_1).await.unwrap();
writer
.store_entry(&entry_3, sequencer_id_1, None)
.await
.unwrap();
assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending());
assert_eq!(
stream_1.stream.next().await.unwrap().unwrap().entry(),
@ -365,15 +389,15 @@ pub mod test_utils {
let sequencer_id_2 = set_pop_first(&mut sequencer_ids_1).unwrap();
writer_1
.store_entry(&entry_east_1, sequencer_id_1)
.store_entry(&entry_east_1, sequencer_id_1, None)
.await
.unwrap();
writer_1
.store_entry(&entry_west_1, sequencer_id_2)
.store_entry(&entry_west_1, sequencer_id_2, None)
.await
.unwrap();
writer_2
.store_entry(&entry_east_2, sequencer_id_1)
.store_entry(&entry_east_2, sequencer_id_1, None)
.await
.unwrap();
@ -417,9 +441,24 @@ pub mod test_utils {
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
let writer = context.writing(true).await.unwrap();
let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().0.number;
let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().0.number;
let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().0.number;
let _sequence_number_east_1 = writer
.store_entry(&entry_east_1, 0, None)
.await
.unwrap()
.0
.number;
let sequence_number_east_2 = writer
.store_entry(&entry_east_2, 0, None)
.await
.unwrap()
.0
.number;
let _sequence_number_west_1 = writer
.store_entry(&entry_west_1, 1, None)
.await
.unwrap()
.0
.number;
let mut reader_1 = context.reading(true).await.unwrap();
let mut reader_2 = context.reading(true).await.unwrap();
@ -447,7 +486,12 @@ pub mod test_utils {
// seek to far end and then at data
reader_1.seek(0, 1_000_000).await.unwrap();
let _sequence_number_east_3 = writer.store_entry(&entry_east_3, 0).await.unwrap().0.number;
let _sequence_number_east_3 = writer
.store_entry(&entry_east_3, 0, None)
.await
.unwrap()
.0
.number;
let mut streams = reader_1.streams();
assert_eq!(streams.len(), 2);
let (_sequencer_id, mut stream_1) = map_pop_first(&mut streams).unwrap();
@ -491,17 +535,17 @@ pub mod test_utils {
// high water mark moves
writer
.store_entry(&entry_east_1, sequencer_id_1)
.store_entry(&entry_east_1, sequencer_id_1, None)
.await
.unwrap();
let mark_1 = writer
.store_entry(&entry_east_2, sequencer_id_1)
.store_entry(&entry_east_2, sequencer_id_1, None)
.await
.unwrap()
.0
.number;
let mark_2 = writer
.store_entry(&entry_west_1, sequencer_id_2)
.store_entry(&entry_west_1, sequencer_id_2, None)
.await
.unwrap()
.0
@ -534,7 +578,11 @@ pub mod test_utils {
assert_eq!(streams.len(), 1);
let (sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
let reported_ts = writer.store_entry(&entry, sequencer_id).await.unwrap().1;
let reported_ts = writer
.store_entry(&entry, sequencer_id, None)
.await
.unwrap()
.1;
// advance time
time.inc(Duration::from_secs(10));
@ -595,6 +643,59 @@ pub mod test_utils {
assert_eq!(sequencer_ids_1.len(), n_sequencers as usize);
}
/// Test that span contexts are propagated through the system.
async fn test_span_context<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await;
let entry = lp_to_entry("upc user=1 100");
let writer = context.writing(true).await.unwrap();
let mut reader = context.reading(true).await.unwrap();
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
// 1: no context
writer
.store_entry(&entry, sequencer_id, None)
.await
.unwrap();
// 2: some context
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context_1 = SpanContext::new(Arc::clone(&collector));
writer
.store_entry(&entry, sequencer_id, Some(&span_context_1))
.await
.unwrap();
// 2: another context
let span_context_parent = SpanContext::new(collector);
let span_context_2 = span_context_parent.child("foo").ctx;
writer
.store_entry(&entry, sequencer_id, Some(&span_context_2))
.await
.unwrap();
// check entry 1
let sequenced_entry_1 = stream.stream.next().await.unwrap().unwrap();
assert!(sequenced_entry_1.span_context().is_none());
// check entry 2
let sequenced_entry_2 = stream.stream.next().await.unwrap().unwrap();
let actual_context_1 = sequenced_entry_2.span_context().unwrap();
assert_span_context_eq(actual_context_1, &span_context_1);
// check entry 3
let sequenced_entry_3 = stream.stream.next().await.unwrap().unwrap();
let actual_context_2 = sequenced_entry_3.span_context().unwrap();
assert_span_context_eq(actual_context_2, &span_context_2);
}
/// Assert that the content of the reader is as expected.
///
/// This will read `expected.len()` from the reader and then ensures that the stream is pending.
@ -648,6 +749,18 @@ pub mod test_utils {
}
}
/// Asserts that given span context are the same.
///
/// "Same" means:
/// - identical trace ID
/// - identical span ID
/// - identical parent span ID
pub(crate) fn assert_span_context_eq(lhs: &SpanContext, rhs: &SpanContext) {
assert_eq!(lhs.trace_id, rhs.trace_id);
assert_eq!(lhs.span_id, rhs.span_id);
assert_eq!(lhs.parent_span_id, rhs.parent_span_id);
}
/// Pops first entry from map.
///
/// Helper until <https://github.com/rust-lang/rust/issues/62924> is stable.

View File

@ -12,6 +12,7 @@ use data_types::{
};
use entry::{Entry, SequencedEntry};
use futures::{FutureExt, StreamExt};
use http::{HeaderMap, HeaderValue};
use observability_deps::tracing::{debug, info};
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
@ -25,6 +26,8 @@ use rdkafka::{
ClientConfig, Message, Offset, TopicPartitionList,
};
use time::{Time, TimeProvider};
use trace::{ctx::SpanContext, TraceCollector};
use trace_http::ctx::TraceHeaderParser;
use crate::core::{
EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
@ -34,6 +37,9 @@ use crate::core::{
/// Message header that determines message content type.
pub const HEADER_CONTENT_TYPE: &str = "content-type";
/// Message header for tracing context.
pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
/// Current flatbuffer-based content type.
///
/// This is a value for [`HEADER_CONTENT_TYPE`].
@ -45,37 +51,52 @@ pub const CONTENT_TYPE_FLATBUFFER: &str =
r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#;
/// IOx-specific headers attached to every Kafka message.
#[derive(Debug, PartialEq)]
#[derive(Debug)]
struct IoxHeaders {
content_type: Option<String>,
span_context: Option<SpanContext>,
}
impl IoxHeaders {
/// Create new headers with sane default values.
fn new() -> Self {
/// Create new headers with sane default values and given span context.
fn new(span_context: Option<SpanContext>) -> Self {
Self {
content_type: Some(CONTENT_TYPE_FLATBUFFER.to_string()),
span_context,
}
}
/// Create new headers where all information is missing.
fn empty() -> Self {
Self { content_type: None }
Self {
content_type: None,
span_context: None,
}
}
}
impl<H> From<&H> for IoxHeaders
where
H: Headers,
{
fn from(headers: &H) -> Self {
let mut res = Self { content_type: None };
/// Parse from Kafka headers.
fn from_kafka<H>(headers: &H, collector: &Arc<dyn TraceCollector>) -> Self
where
H: Headers,
{
let mut res = Self::empty();
for i in 0..headers.count() {
if let Some((name, value)) = headers.get(i) {
if name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) {
res.content_type = String::from_utf8(value.to_vec()).ok();
}
if name.eq_ignore_ascii_case(HEADER_TRACE_CONTEXT) {
if let Ok(header_value) = HeaderValue::from_bytes(value) {
let mut headers = HeaderMap::new();
headers.insert(HEADER_TRACE_CONTEXT, header_value);
let parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT);
res.span_context = parser.parse(collector, &headers).ok().flatten();
}
}
}
}
@ -91,6 +112,22 @@ impl From<&IoxHeaders> for OwnedHeaders {
res = res.add(HEADER_CONTENT_TYPE, content_type);
}
if let Some(span_context) = iox_headers.span_context.as_ref() {
res = res.add(
HEADER_TRACE_CONTEXT,
&format!(
"{:x}:{:x}:{:x}:1",
span_context.trace_id.get(),
span_context.span_id.get(),
span_context
.parent_span_id
.as_ref()
.map(|span_id| span_id.get())
.unwrap_or_default()
),
)
}
res
}
}
@ -124,6 +161,7 @@ impl WriteBufferWriting for KafkaBufferProducer {
&self,
entry: &Entry,
sequencer_id: u32,
span_context: Option<&SpanContext>,
) -> Result<(Sequence, Time), WriteBufferError> {
let partition = i32::try_from(sequencer_id)?;
@ -132,7 +170,7 @@ impl WriteBufferWriting for KafkaBufferProducer {
let timestamp_millis = date_time.timestamp_millis();
let timestamp = Time::from_timestamp_millis(timestamp_millis);
let headers = IoxHeaders::new();
let headers = IoxHeaders::new(span_context.cloned());
// This type annotation is necessary because `FutureRecord` is generic over key type, but
// key is optional and we're not setting a key. `String` is arbitrary.
@ -216,6 +254,7 @@ pub struct KafkaBufferConsumer {
conn: String,
database_name: String,
consumers: BTreeMap<u32, Arc<StreamConsumer>>,
trace_collector: Arc<dyn TraceCollector>,
}
// Needed because rdkafka's StreamConsumer doesn't impl Debug
@ -237,13 +276,14 @@ impl WriteBufferReading for KafkaBufferConsumer {
let sequencer_id = *sequencer_id;
let consumer_cloned = Arc::clone(consumer);
let database_name = self.database_name.clone();
let trace_collector = Arc::clone(&self.trace_collector);
let stream = consumer
.stream()
.map(move |message| {
let message = message?;
let headers: IoxHeaders = message.headers().map(|headers| headers.into()).unwrap_or_else(IoxHeaders::empty);
let headers: IoxHeaders = message.headers().map(|headers| IoxHeaders::from_kafka(headers, &trace_collector)).unwrap_or_else(IoxHeaders::empty);
// Fallback for now https://github.com/influxdata/influxdb_iox/issues/2805
let content_type = headers.content_type.unwrap_or_else(|| CONTENT_TYPE_FLATBUFFER.to_string());
@ -278,7 +318,7 @@ impl WriteBufferReading for KafkaBufferConsumer {
number: message.offset().try_into()?,
};
Ok(SequencedEntry::new_from_sequence(sequence, timestamp, entry))
Ok(SequencedEntry::new_from_sequence_and_span_context(sequence, timestamp, entry, headers.span_context))
})
.boxed();
@ -360,6 +400,8 @@ impl KafkaBufferConsumer {
database_name: impl Into<String> + Send + Sync,
connection_config: &HashMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
// `trace_collector` has to be a reference due to https://github.com/rust-lang/rust/issues/63033
trace_collector: &Arc<dyn TraceCollector>,
) -> Result<Self, WriteBufferError> {
let conn = conn.into();
let database_name = database_name.into();
@ -422,6 +464,7 @@ impl KafkaBufferConsumer {
conn,
database_name,
consumers,
trace_collector: Arc::clone(trace_collector),
})
}
}
@ -628,13 +671,15 @@ mod tests {
num::NonZeroU32,
sync::atomic::{AtomicU32, Ordering},
};
use time::TimeProvider;
use entry::test_helpers::lp_to_entry;
use time::TimeProvider;
use trace::{RingBufferTraceCollector, TraceCollector};
use crate::{
core::test_utils::{
map_pop_first, perform_generic_tests, set_pop_first, TestAdapter, TestContext,
assert_span_context_eq, map_pop_first, perform_generic_tests, set_pop_first,
TestAdapter, TestContext,
},
kafka::test_utils::random_kafka_topic,
maybe_skip_kafka_integration,
@ -709,12 +754,15 @@ mod tests {
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
let server_id = ServerId::try_from(server_id).unwrap();
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
KafkaBufferConsumer::new(
&self.conn,
server_id,
&self.database_name,
&Default::default(),
self.creation_config(creation_config).as_ref(),
&collector,
)
.await
}
@ -818,22 +866,32 @@ mod tests {
#[test]
fn headers_roundtrip() {
let iox_headers1 = IoxHeaders::new();
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context_parent = SpanContext::new(Arc::clone(&collector));
let span_context = span_context_parent.child("foo").ctx;
let iox_headers1 = IoxHeaders::new(Some(span_context));
let kafka_headers: OwnedHeaders = (&iox_headers1).into();
let iox_headers2: IoxHeaders = (&kafka_headers).into();
assert_eq!(iox_headers1, iox_headers2);
let iox_headers2 = IoxHeaders::from_kafka(&kafka_headers, &collector);
assert_eq!(iox_headers1.content_type, iox_headers2.content_type);
assert_span_context_eq(
iox_headers1.span_context.as_ref().unwrap(),
iox_headers2.span_context.as_ref().unwrap(),
);
}
#[test]
fn headers_case_handling() {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let kafka_headers = OwnedHeaders::new()
.add("content-type", "a")
.add("CONTENT-TYPE", "b")
.add("content-TYPE", "c");
let actual: IoxHeaders = (&kafka_headers).into();
let expected = IoxHeaders {
content_type: Some("c".to_string()),
};
assert_eq!(actual, expected);
let actual = IoxHeaders::from_kafka(&kafka_headers, &collector);
assert_eq!(actual.content_type, Some("c".to_string()));
}
}

View File

@ -13,6 +13,7 @@ use data_types::database_rules::WriteBufferCreationConfig;
use data_types::sequence::Sequence;
use entry::{Entry, SequencedEntry};
use time::{Time, TimeProvider};
use trace::ctx::SpanContext;
use crate::core::{
EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
@ -236,6 +237,7 @@ impl WriteBufferWriting for MockBufferForWriting {
&self,
entry: &Entry,
sequencer_id: u32,
span_context: Option<&SpanContext>,
) -> Result<(Sequence, Time), WriteBufferError> {
let mut guard = self.state.entries.lock();
let entries = guard.as_mut().unwrap();
@ -248,10 +250,11 @@ impl WriteBufferWriting for MockBufferForWriting {
number: sequence_number,
};
let timestamp = self.time_provider.now();
sequencer_entries.push(Ok(SequencedEntry::new_from_sequence(
sequencer_entries.push(Ok(SequencedEntry::new_from_sequence_and_span_context(
sequence,
timestamp,
entry.clone(),
span_context.cloned(),
)));
Ok((sequence, timestamp))
@ -276,6 +279,7 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
&self,
_entry: &Entry,
_sequencer_id: u32,
_span_context: Option<&SpanContext>,
) -> Result<(Sequence, Time), WriteBufferError> {
Err(String::from(
"Something bad happened on the way to writing an entry in the write buffer",
@ -748,7 +752,11 @@ mod tests {
let entry = lp_to_entry("upc user=1 100");
assert_eq!(
writer.store_entry(&entry, 0).await.unwrap_err().to_string(),
writer
.store_entry(&entry, 0, None)
.await
.unwrap_err()
.to_string(),
"Something bad happened on the way to writing an entry in the write buffer"
);
}