diff --git a/Cargo.lock b/Cargo.lock index 41791564ae..ed81e039f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1019,6 +1019,7 @@ dependencies = [ "schema", "snafu", "time 0.1.0", + "trace", ] [[package]] @@ -4982,11 +4983,14 @@ dependencies = [ "dotenv", "entry", "futures", + "http", "observability_deps", "parking_lot", "rdkafka", "time 0.1.0", "tokio", + "trace", + "trace_http", "uuid", ] diff --git a/entry/Cargo.toml b/entry/Cargo.toml index ba241cc335..c4c5178c1d 100644 --- a/entry/Cargo.toml +++ b/entry/Cargo.toml @@ -13,6 +13,7 @@ data_types = { path = "../data_types" } flatbuffers = "2" snafu = "0.6" time = { path = "../time" } +trace = { path = "../trace" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } ouroboros = "0.13.0" schema = { path = "../schema" } diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 57cfe30927..f2cbc1a315 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -18,6 +18,7 @@ use schema::{ IOxValueType, InfluxColumnType, InfluxFieldType, Schema, TIME_COLUMN_NAME, }; use time::Time; +use trace::ctx::SpanContext; use crate::entry_fb; @@ -1750,6 +1751,9 @@ pub struct SequencedEntry { /// At the time of writing, sequences will not be present when there is no configured mechanism to define the order /// of all writes. sequence_and_producer_ts: Option<(Sequence, Time)>, + + // Optional span context associated w/ this entry. + span_context: Option, } impl SequencedEntry { @@ -1761,6 +1765,20 @@ impl SequencedEntry { Self { entry, sequence_and_producer_ts: Some((sequence, producer_wallclock_timestamp)), + span_context: None, + } + } + + pub fn new_from_sequence_and_span_context( + sequence: Sequence, + producer_wallclock_timestamp: Time, + entry: Entry, + span_context: Option, + ) -> Self { + Self { + entry, + sequence_and_producer_ts: Some((sequence, producer_wallclock_timestamp)), + span_context, } } @@ -1768,6 +1786,7 @@ impl SequencedEntry { Self { entry, sequence_and_producer_ts: None, + span_context: None, } } @@ -1790,6 +1809,10 @@ impl SequencedEntry { pub fn entry(&self) -> &Entry { &self.entry } + + pub fn span_context(&self) -> Option<&SpanContext> { + self.span_context.as_ref() + } } pub mod test_helpers { diff --git a/server/src/database.rs b/server/src/database.rs index 0dd6bb9f60..ae0269c196 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -26,6 +26,7 @@ use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{future::Future, sync::Arc, time::Duration}; use tokio::{sync::Notify, task::JoinError}; use tokio_util::sync::CancellationToken; +use trace::{RingBufferTraceCollector, TraceCollector}; use uuid::Uuid; const INIT_BACKOFF: Duration = Duration::from_secs(1); @@ -1151,6 +1152,9 @@ impl DatabaseStateCatalogLoaded { ) -> Result { let db = Arc::clone(&self.db); + // TODO: use proper trace collector + let trace_collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let rules = self.provided_rules.rules(); let write_buffer_factory = shared.application.write_buffer_factory(); let write_buffer_consumer = match rules.write_buffer_connection.as_ref() { @@ -1159,6 +1163,7 @@ impl DatabaseStateCatalogLoaded { .new_config_read( shared.config.server_id, shared.config.name.as_str(), + &trace_collector, connection, ) .await diff --git a/server/src/db.rs b/server/src/db.rs index f808e7d4c9..349ad50838 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -978,8 +978,9 @@ impl Db { // will get picked up when data is read from the write buffer. // TODO: be smarter than always using sequencer 0 + // TODO: propgate span context let _ = write_buffer - .store_entry(&entry, 0) + .store_entry(&entry, 0, None) .await .context(WriteBufferWritingError)?; Ok(()) @@ -989,8 +990,9 @@ impl Db { // buffer to return success before adding the entry to the mutable buffer. // TODO: be smarter than always using sequencer 0 + // TODO: propagate span context let (sequence, producer_wallclock_timestamp) = write_buffer - .store_entry(&entry, 0) + .store_entry(&entry, 0, None) .await .context(WriteBufferWritingError)?; let sequenced_entry = Arc::new(SequencedEntry::new_from_sequence( diff --git a/tests/end_to_end_cases/scenario.rs b/tests/end_to_end_cases/scenario.rs index 55abb8ec70..2b1f3cb9c5 100644 --- a/tests/end_to_end_cases/scenario.rs +++ b/tests/end_to_end_cases/scenario.rs @@ -727,6 +727,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser .pop() .unwrap(), 0, + None, ) .await .unwrap(); @@ -736,6 +737,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser .pop() .unwrap(), 0, + None, ) .await .unwrap(); @@ -745,6 +747,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser .pop() .unwrap(), 0, + None, ) .await .unwrap(); @@ -771,6 +774,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser .pop() .unwrap(), 0, + None, ) .await .unwrap(); diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index ecd6a6df86..f07776de72 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -9,11 +9,14 @@ data_types = { path = "../data_types" } dotenv = "0.15.0" entry = { path = "../entry" } futures = "0.3" +http = "0.2" observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" rdkafka = "0.26.0" time = { path = "../time" } tokio = { version = "1.11", features = ["macros", "fs"] } +trace = { path = "../trace" } +trace_http = { path = "../trace_http" } uuid = { version = "0.8", features = ["serde", "v4"] } [package.metadata.cargo-udeps.ignore] diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index e3f4dc1d3d..cf7cd6c3a3 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -8,6 +8,7 @@ use data_types::{ server_id::ServerId, }; use time::TimeProvider; +use trace::TraceCollector; use crate::{ core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, @@ -135,6 +136,7 @@ impl WriteBufferConfigFactory { &self, server_id: ServerId, db_name: &str, + trace_collector: &Arc, cfg: &WriteBufferConnection, ) -> Result, WriteBufferError> { assert_eq!(cfg.direction, WriteBufferDirection::Read); @@ -147,6 +149,7 @@ impl WriteBufferConfigFactory { db_name, &cfg.connection_config, cfg.creation_config.as_ref(), + trace_collector, ) .await?; Box::new(kafka_buffer) as _ @@ -176,6 +179,7 @@ mod tests { use std::{convert::TryFrom, num::NonZeroU32}; use data_types::{database_rules::WriteBufferCreationConfig, DatabaseName}; + use trace::RingBufferTraceCollector; use crate::{ kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration, @@ -207,6 +211,8 @@ mod tests { #[tokio::test] async fn test_reading_kafka() { + let trace_collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let conn = maybe_skip_kafka_integration!(); let time = Arc::new(time::SystemProvider::new()); let factory = WriteBufferConfigFactory::new(time); @@ -222,7 +228,7 @@ mod tests { }; let conn = factory - .new_config_read(server_id, db_name.as_str(), &cfg) + .new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "kafka"); @@ -268,6 +274,8 @@ mod tests { #[tokio::test] async fn test_reading_mock() { + let trace_collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let time = Arc::new(time::SystemProvider::new()); let mut factory = WriteBufferConfigFactory::new(time); @@ -286,7 +294,7 @@ mod tests { }; let conn = factory - .new_config_read(server_id, db_name.as_str(), &cfg) + .new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "mock"); @@ -299,7 +307,7 @@ mod tests { ..Default::default() }; let err = factory - .new_config_read(server_id, db_name.as_str(), &cfg) + .new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg) .await .unwrap_err(); assert!(err.to_string().starts_with("Unknown mock ID:")); @@ -343,6 +351,8 @@ mod tests { #[tokio::test] async fn test_reading_mock_failing() { + let trace_collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let time = Arc::new(time::SystemProvider::new()); let mut factory = WriteBufferConfigFactory::new(time); @@ -360,7 +370,7 @@ mod tests { }; let conn = factory - .new_config_read(server_id, db_name.as_str(), &cfg) + .new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg) .await .unwrap(); assert_eq!(conn.type_name(), "mock_failing"); @@ -373,7 +383,7 @@ mod tests { ..Default::default() }; let err = factory - .new_config_read(server_id, db_name.as_str(), &cfg) + .new_config_read(server_id, db_name.as_str(), &trace_collector, &cfg) .await .unwrap_err(); assert!(err.to_string().starts_with("Unknown mock ID:")); diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 6d65e49464..2566f2e376 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -8,6 +8,7 @@ use data_types::sequence::Sequence; use entry::{Entry, SequencedEntry}; use futures::{future::BoxFuture, stream::BoxStream}; use time::Time; +use trace::ctx::SpanContext; /// Generic boxed error type that is used in this crate. /// @@ -25,11 +26,14 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { /// Send an `Entry` to the write buffer using the specified sequencer ID. /// + /// You can attach an optional [`SpanContext`] which will be propagated to the reader side. + /// /// Returns information that can be used to restore entries at a later time. async fn store_entry( &self, entry: &Entry, sequencer_id: u32, + span_context: Option<&SpanContext>, ) -> Result<(Sequence, Time), WriteBufferError>; /// Return type (like `"mock"` or `"kafka"`) of this writer. @@ -96,6 +100,7 @@ pub mod test_utils { use entry::{test_helpers::lp_to_entry, Entry}; use futures::{StreamExt, TryStreamExt}; use time::{Time, TimeProvider}; + use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector}; use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; @@ -158,6 +163,7 @@ pub mod test_utils { test_timestamp(&adapter).await; test_sequencer_auto_creation(&adapter).await; test_sequencer_ids(&adapter).await; + test_span_context(&adapter).await; } /// Test IO with a single writer and single reader stream. @@ -190,7 +196,10 @@ pub mod test_utils { assert!(stream.stream.poll_next_unpin(&mut cx).is_pending()); // adding content allows us to get results - writer.store_entry(&entry_1, sequencer_id).await.unwrap(); + writer + .store_entry(&entry_1, sequencer_id, None) + .await + .unwrap(); assert_eq!( stream.stream.next().await.unwrap().unwrap().entry(), &entry_1 @@ -200,8 +209,14 @@ pub mod test_utils { assert!(stream.stream.poll_next_unpin(&mut cx).is_pending()); // adding more data unblocks the stream - writer.store_entry(&entry_2, sequencer_id).await.unwrap(); - writer.store_entry(&entry_3, sequencer_id).await.unwrap(); + writer + .store_entry(&entry_2, sequencer_id, None) + .await + .unwrap(); + writer + .store_entry(&entry_3, sequencer_id, None) + .await + .unwrap(); assert_eq!( stream.stream.next().await.unwrap().unwrap().entry(), &entry_2 @@ -235,9 +250,9 @@ pub mod test_utils { let waker = futures::task::noop_waker(); let mut cx = futures::task::Context::from_waker(&waker); - writer.store_entry(&entry_1, 0).await.unwrap(); - writer.store_entry(&entry_2, 0).await.unwrap(); - writer.store_entry(&entry_3, 0).await.unwrap(); + writer.store_entry(&entry_1, 0, None).await.unwrap(); + writer.store_entry(&entry_2, 0, None).await.unwrap(); + writer.store_entry(&entry_3, 0, None).await.unwrap(); // creating stream, drop stream, re-create it => still starts at first entry let mut streams = reader.streams(); @@ -309,21 +324,30 @@ pub mod test_utils { assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending()); // entries arrive at the right target stream - writer.store_entry(&entry_1, sequencer_id_1).await.unwrap(); + writer + .store_entry(&entry_1, sequencer_id_1, None) + .await + .unwrap(); assert_eq!( stream_1.stream.next().await.unwrap().unwrap().entry(), &entry_1 ); assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending()); - writer.store_entry(&entry_2, sequencer_id_2).await.unwrap(); + writer + .store_entry(&entry_2, sequencer_id_2, None) + .await + .unwrap(); assert!(stream_1.stream.poll_next_unpin(&mut cx).is_pending()); assert_eq!( stream_2.stream.next().await.unwrap().unwrap().entry(), &entry_2 ); - writer.store_entry(&entry_3, sequencer_id_1).await.unwrap(); + writer + .store_entry(&entry_3, sequencer_id_1, None) + .await + .unwrap(); assert!(stream_2.stream.poll_next_unpin(&mut cx).is_pending()); assert_eq!( stream_1.stream.next().await.unwrap().unwrap().entry(), @@ -365,15 +389,15 @@ pub mod test_utils { let sequencer_id_2 = set_pop_first(&mut sequencer_ids_1).unwrap(); writer_1 - .store_entry(&entry_east_1, sequencer_id_1) + .store_entry(&entry_east_1, sequencer_id_1, None) .await .unwrap(); writer_1 - .store_entry(&entry_west_1, sequencer_id_2) + .store_entry(&entry_west_1, sequencer_id_2, None) .await .unwrap(); writer_2 - .store_entry(&entry_east_2, sequencer_id_1) + .store_entry(&entry_east_2, sequencer_id_1, None) .await .unwrap(); @@ -417,9 +441,24 @@ pub mod test_utils { let entry_west_1 = lp_to_entry("upc,region=west user=1 200"); let writer = context.writing(true).await.unwrap(); - let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().0.number; - let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().0.number; - let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().0.number; + let _sequence_number_east_1 = writer + .store_entry(&entry_east_1, 0, None) + .await + .unwrap() + .0 + .number; + let sequence_number_east_2 = writer + .store_entry(&entry_east_2, 0, None) + .await + .unwrap() + .0 + .number; + let _sequence_number_west_1 = writer + .store_entry(&entry_west_1, 1, None) + .await + .unwrap() + .0 + .number; let mut reader_1 = context.reading(true).await.unwrap(); let mut reader_2 = context.reading(true).await.unwrap(); @@ -447,7 +486,12 @@ pub mod test_utils { // seek to far end and then at data reader_1.seek(0, 1_000_000).await.unwrap(); - let _sequence_number_east_3 = writer.store_entry(&entry_east_3, 0).await.unwrap().0.number; + let _sequence_number_east_3 = writer + .store_entry(&entry_east_3, 0, None) + .await + .unwrap() + .0 + .number; let mut streams = reader_1.streams(); assert_eq!(streams.len(), 2); let (_sequencer_id, mut stream_1) = map_pop_first(&mut streams).unwrap(); @@ -491,17 +535,17 @@ pub mod test_utils { // high water mark moves writer - .store_entry(&entry_east_1, sequencer_id_1) + .store_entry(&entry_east_1, sequencer_id_1, None) .await .unwrap(); let mark_1 = writer - .store_entry(&entry_east_2, sequencer_id_1) + .store_entry(&entry_east_2, sequencer_id_1, None) .await .unwrap() .0 .number; let mark_2 = writer - .store_entry(&entry_west_1, sequencer_id_2) + .store_entry(&entry_west_1, sequencer_id_2, None) .await .unwrap() .0 @@ -534,7 +578,11 @@ pub mod test_utils { assert_eq!(streams.len(), 1); let (sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap(); - let reported_ts = writer.store_entry(&entry, sequencer_id).await.unwrap().1; + let reported_ts = writer + .store_entry(&entry, sequencer_id, None) + .await + .unwrap() + .1; // advance time time.inc(Duration::from_secs(10)); @@ -595,6 +643,59 @@ pub mod test_utils { assert_eq!(sequencer_ids_1.len(), n_sequencers as usize); } + /// Test that span contexts are propagated through the system. + async fn test_span_context(adapter: &T) + where + T: TestAdapter, + { + let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await; + + let entry = lp_to_entry("upc user=1 100"); + + let writer = context.writing(true).await.unwrap(); + let mut reader = context.reading(true).await.unwrap(); + + let mut streams = reader.streams(); + assert_eq!(streams.len(), 1); + let (sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap(); + + // 1: no context + writer + .store_entry(&entry, sequencer_id, None) + .await + .unwrap(); + + // 2: some context + let collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let span_context_1 = SpanContext::new(Arc::clone(&collector)); + writer + .store_entry(&entry, sequencer_id, Some(&span_context_1)) + .await + .unwrap(); + + // 2: another context + let span_context_parent = SpanContext::new(collector); + let span_context_2 = span_context_parent.child("foo").ctx; + writer + .store_entry(&entry, sequencer_id, Some(&span_context_2)) + .await + .unwrap(); + + // check entry 1 + let sequenced_entry_1 = stream.stream.next().await.unwrap().unwrap(); + assert!(sequenced_entry_1.span_context().is_none()); + + // check entry 2 + let sequenced_entry_2 = stream.stream.next().await.unwrap().unwrap(); + let actual_context_1 = sequenced_entry_2.span_context().unwrap(); + assert_span_context_eq(actual_context_1, &span_context_1); + + // check entry 3 + let sequenced_entry_3 = stream.stream.next().await.unwrap().unwrap(); + let actual_context_2 = sequenced_entry_3.span_context().unwrap(); + assert_span_context_eq(actual_context_2, &span_context_2); + } + /// Assert that the content of the reader is as expected. /// /// This will read `expected.len()` from the reader and then ensures that the stream is pending. @@ -648,6 +749,18 @@ pub mod test_utils { } } + /// Asserts that given span context are the same. + /// + /// "Same" means: + /// - identical trace ID + /// - identical span ID + /// - identical parent span ID + pub(crate) fn assert_span_context_eq(lhs: &SpanContext, rhs: &SpanContext) { + assert_eq!(lhs.trace_id, rhs.trace_id); + assert_eq!(lhs.span_id, rhs.span_id); + assert_eq!(lhs.parent_span_id, rhs.parent_span_id); + } + /// Pops first entry from map. /// /// Helper until is stable. diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 776e9c6305..0b32dfa9fa 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -12,6 +12,7 @@ use data_types::{ }; use entry::{Entry, SequencedEntry}; use futures::{FutureExt, StreamExt}; +use http::{HeaderMap, HeaderValue}; use observability_deps::tracing::{debug, info}; use rdkafka::{ admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, @@ -25,6 +26,8 @@ use rdkafka::{ ClientConfig, Message, Offset, TopicPartitionList, }; use time::{Time, TimeProvider}; +use trace::{ctx::SpanContext, TraceCollector}; +use trace_http::ctx::TraceHeaderParser; use crate::core::{ EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading, @@ -34,6 +37,9 @@ use crate::core::{ /// Message header that determines message content type. pub const HEADER_CONTENT_TYPE: &str = "content-type"; +/// Message header for tracing context. +pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id"; + /// Current flatbuffer-based content type. /// /// This is a value for [`HEADER_CONTENT_TYPE`]. @@ -45,37 +51,52 @@ pub const CONTENT_TYPE_FLATBUFFER: &str = r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#; /// IOx-specific headers attached to every Kafka message. -#[derive(Debug, PartialEq)] +#[derive(Debug)] struct IoxHeaders { content_type: Option, + span_context: Option, } impl IoxHeaders { - /// Create new headers with sane default values. - fn new() -> Self { + /// Create new headers with sane default values and given span context. + fn new(span_context: Option) -> Self { Self { content_type: Some(CONTENT_TYPE_FLATBUFFER.to_string()), + span_context, } } /// Create new headers where all information is missing. fn empty() -> Self { - Self { content_type: None } + Self { + content_type: None, + span_context: None, + } } -} -impl From<&H> for IoxHeaders -where - H: Headers, -{ - fn from(headers: &H) -> Self { - let mut res = Self { content_type: None }; + /// Parse from Kafka headers. + fn from_kafka(headers: &H, collector: &Arc) -> Self + where + H: Headers, + { + let mut res = Self::empty(); for i in 0..headers.count() { if let Some((name, value)) = headers.get(i) { if name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) { res.content_type = String::from_utf8(value.to_vec()).ok(); } + + if name.eq_ignore_ascii_case(HEADER_TRACE_CONTEXT) { + if let Ok(header_value) = HeaderValue::from_bytes(value) { + let mut headers = HeaderMap::new(); + headers.insert(HEADER_TRACE_CONTEXT, header_value); + + let parser = TraceHeaderParser::new() + .with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT); + res.span_context = parser.parse(collector, &headers).ok().flatten(); + } + } } } @@ -91,6 +112,22 @@ impl From<&IoxHeaders> for OwnedHeaders { res = res.add(HEADER_CONTENT_TYPE, content_type); } + if let Some(span_context) = iox_headers.span_context.as_ref() { + res = res.add( + HEADER_TRACE_CONTEXT, + &format!( + "{:x}:{:x}:{:x}:1", + span_context.trace_id.get(), + span_context.span_id.get(), + span_context + .parent_span_id + .as_ref() + .map(|span_id| span_id.get()) + .unwrap_or_default() + ), + ) + } + res } } @@ -124,6 +161,7 @@ impl WriteBufferWriting for KafkaBufferProducer { &self, entry: &Entry, sequencer_id: u32, + span_context: Option<&SpanContext>, ) -> Result<(Sequence, Time), WriteBufferError> { let partition = i32::try_from(sequencer_id)?; @@ -132,7 +170,7 @@ impl WriteBufferWriting for KafkaBufferProducer { let timestamp_millis = date_time.timestamp_millis(); let timestamp = Time::from_timestamp_millis(timestamp_millis); - let headers = IoxHeaders::new(); + let headers = IoxHeaders::new(span_context.cloned()); // This type annotation is necessary because `FutureRecord` is generic over key type, but // key is optional and we're not setting a key. `String` is arbitrary. @@ -216,6 +254,7 @@ pub struct KafkaBufferConsumer { conn: String, database_name: String, consumers: BTreeMap>, + trace_collector: Arc, } // Needed because rdkafka's StreamConsumer doesn't impl Debug @@ -237,13 +276,14 @@ impl WriteBufferReading for KafkaBufferConsumer { let sequencer_id = *sequencer_id; let consumer_cloned = Arc::clone(consumer); let database_name = self.database_name.clone(); + let trace_collector = Arc::clone(&self.trace_collector); let stream = consumer .stream() .map(move |message| { let message = message?; - let headers: IoxHeaders = message.headers().map(|headers| headers.into()).unwrap_or_else(IoxHeaders::empty); + let headers: IoxHeaders = message.headers().map(|headers| IoxHeaders::from_kafka(headers, &trace_collector)).unwrap_or_else(IoxHeaders::empty); // Fallback for now https://github.com/influxdata/influxdb_iox/issues/2805 let content_type = headers.content_type.unwrap_or_else(|| CONTENT_TYPE_FLATBUFFER.to_string()); @@ -278,7 +318,7 @@ impl WriteBufferReading for KafkaBufferConsumer { number: message.offset().try_into()?, }; - Ok(SequencedEntry::new_from_sequence(sequence, timestamp, entry)) + Ok(SequencedEntry::new_from_sequence_and_span_context(sequence, timestamp, entry, headers.span_context)) }) .boxed(); @@ -360,6 +400,8 @@ impl KafkaBufferConsumer { database_name: impl Into + Send + Sync, connection_config: &HashMap, creation_config: Option<&WriteBufferCreationConfig>, + // `trace_collector` has to be a reference due to https://github.com/rust-lang/rust/issues/63033 + trace_collector: &Arc, ) -> Result { let conn = conn.into(); let database_name = database_name.into(); @@ -422,6 +464,7 @@ impl KafkaBufferConsumer { conn, database_name, consumers, + trace_collector: Arc::clone(trace_collector), }) } } @@ -628,13 +671,15 @@ mod tests { num::NonZeroU32, sync::atomic::{AtomicU32, Ordering}, }; - use time::TimeProvider; use entry::test_helpers::lp_to_entry; + use time::TimeProvider; + use trace::{RingBufferTraceCollector, TraceCollector}; use crate::{ core::test_utils::{ - map_pop_first, perform_generic_tests, set_pop_first, TestAdapter, TestContext, + assert_span_context_eq, map_pop_first, perform_generic_tests, set_pop_first, + TestAdapter, TestContext, }, kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration, @@ -709,12 +754,15 @@ mod tests { let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst); let server_id = ServerId::try_from(server_id).unwrap(); + let collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + KafkaBufferConsumer::new( &self.conn, server_id, &self.database_name, &Default::default(), self.creation_config(creation_config).as_ref(), + &collector, ) .await } @@ -818,22 +866,32 @@ mod tests { #[test] fn headers_roundtrip() { - let iox_headers1 = IoxHeaders::new(); + let collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + + let span_context_parent = SpanContext::new(Arc::clone(&collector)); + let span_context = span_context_parent.child("foo").ctx; + + let iox_headers1 = IoxHeaders::new(Some(span_context)); + let kafka_headers: OwnedHeaders = (&iox_headers1).into(); - let iox_headers2: IoxHeaders = (&kafka_headers).into(); - assert_eq!(iox_headers1, iox_headers2); + let iox_headers2 = IoxHeaders::from_kafka(&kafka_headers, &collector); + + assert_eq!(iox_headers1.content_type, iox_headers2.content_type); + assert_span_context_eq( + iox_headers1.span_context.as_ref().unwrap(), + iox_headers2.span_context.as_ref().unwrap(), + ); } #[test] fn headers_case_handling() { + let collector: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let kafka_headers = OwnedHeaders::new() .add("content-type", "a") .add("CONTENT-TYPE", "b") .add("content-TYPE", "c"); - let actual: IoxHeaders = (&kafka_headers).into(); - let expected = IoxHeaders { - content_type: Some("c".to_string()), - }; - assert_eq!(actual, expected); + let actual = IoxHeaders::from_kafka(&kafka_headers, &collector); + assert_eq!(actual.content_type, Some("c".to_string())); } } diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index d381b9fa67..ec3d999a0e 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -13,6 +13,7 @@ use data_types::database_rules::WriteBufferCreationConfig; use data_types::sequence::Sequence; use entry::{Entry, SequencedEntry}; use time::{Time, TimeProvider}; +use trace::ctx::SpanContext; use crate::core::{ EntryStream, FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading, @@ -236,6 +237,7 @@ impl WriteBufferWriting for MockBufferForWriting { &self, entry: &Entry, sequencer_id: u32, + span_context: Option<&SpanContext>, ) -> Result<(Sequence, Time), WriteBufferError> { let mut guard = self.state.entries.lock(); let entries = guard.as_mut().unwrap(); @@ -248,10 +250,11 @@ impl WriteBufferWriting for MockBufferForWriting { number: sequence_number, }; let timestamp = self.time_provider.now(); - sequencer_entries.push(Ok(SequencedEntry::new_from_sequence( + sequencer_entries.push(Ok(SequencedEntry::new_from_sequence_and_span_context( sequence, timestamp, entry.clone(), + span_context.cloned(), ))); Ok((sequence, timestamp)) @@ -276,6 +279,7 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { &self, _entry: &Entry, _sequencer_id: u32, + _span_context: Option<&SpanContext>, ) -> Result<(Sequence, Time), WriteBufferError> { Err(String::from( "Something bad happened on the way to writing an entry in the write buffer", @@ -748,7 +752,11 @@ mod tests { let entry = lp_to_entry("upc user=1 100"); assert_eq!( - writer.store_entry(&entry, 0).await.unwrap_err().to_string(), + writer + .store_entry(&entry, 0, None) + .await + .unwrap_err() + .to_string(), "Something bad happened on the way to writing an entry in the write buffer" ); }