feat: Swap Kafka Producer implementation back to rdkafka as diagnosis of latency problem (#5800)
* feat: Add back rdkafka dependency * feat: Remove RSKafkaProducer * feat: Remove write buffer RecordAggregator * feat: Add back rdkafka producer Using code from 58a2a0b9c8311303c796495db4f167c99a2ea3aa then getting it to compile with the latest * feat: Add a metric around enqueue * fix: Remove unused imports * fix: Increase Kafka timeout to 20s * docs: Clarify that Kafka topics should only be created in test/dev envs * fix: Remove metrics that aren't needed for this experiment Co-authored-by: Dom <dom@itsallbroken.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
2b5ca28374
commit
33391af973
|
|
@ -2686,6 +2686,18 @@ version = "0.2.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565"
|
||||
|
||||
[[package]]
|
||||
name = "libz-sys"
|
||||
version = "1.1.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
"pkg-config",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.0.46"
|
||||
|
|
@ -3104,6 +3116,27 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum"
|
||||
version = "0.5.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9"
|
||||
dependencies = [
|
||||
"num_enum_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num_enum_derive"
|
||||
version = "0.5.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce"
|
||||
dependencies = [
|
||||
"proc-macro-crate",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.29.0"
|
||||
|
|
@ -3520,6 +3553,12 @@ version = "0.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pkg-config"
|
||||
version = "0.3.25"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"
|
||||
|
||||
[[package]]
|
||||
name = "pprof"
|
||||
version = "0.10.1"
|
||||
|
|
@ -3623,6 +3662,17 @@ dependencies = [
|
|||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-crate"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"thiserror",
|
||||
"toml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
|
|
@ -4017,6 +4067,35 @@ dependencies = [
|
|||
"num_cpus",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rdkafka"
|
||||
version = "0.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1de127f294f2dba488ed46760b129d5ecbeabbd337ccbf3739cb29d50db2161c"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"libc",
|
||||
"log",
|
||||
"rdkafka-sys",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"slab",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rdkafka-sys"
|
||||
version = "4.2.0+1.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e542c6863b04ce0fa0c5719bc6b7b348cf8dd21af1bb03c9db5f9805b2a6473"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"libz-sys",
|
||||
"num_enum",
|
||||
"pkg-config",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "read_buffer"
|
||||
version = "0.1.0"
|
||||
|
|
@ -5632,6 +5711,12 @@ version = "0.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
|
||||
|
||||
[[package]]
|
||||
name = "version_check"
|
||||
version = "0.9.4"
|
||||
|
|
@ -5981,6 +6066,7 @@ dependencies = [
|
|||
"parking_lot 0.12.1",
|
||||
"pin-project",
|
||||
"prost 0.11.0",
|
||||
"rdkafka",
|
||||
"rskafka",
|
||||
"schema",
|
||||
"tempfile",
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ observability_deps = { path = "../observability_deps" }
|
|||
parking_lot = "0.12"
|
||||
pin-project = "1.0"
|
||||
prost = "0.11"
|
||||
rdkafka = "0.28.0"
|
||||
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3208e4742f08048bbab4e8fc4e0a775507fe3e66", default-features = false, features = ["compression-snappy", "transport-socks5"] }
|
||||
schema = { path = "../schema" }
|
||||
tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use crate::{
|
||||
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
|
||||
file::{FileBufferConsumer, FileBufferProducer},
|
||||
kafka::{RSKafkaConsumer, RSKafkaProducer},
|
||||
kafka::RSKafkaConsumer,
|
||||
mock::{
|
||||
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
|
||||
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
|
||||
|
|
@ -152,8 +152,8 @@ impl WriteBufferConfigFactory {
|
|||
pub async fn new_config_write(
|
||||
&self,
|
||||
db_name: &str,
|
||||
partitions: Option<Range<i32>>,
|
||||
trace_collector: Option<&Arc<dyn TraceCollector>>,
|
||||
_partitions: Option<Range<i32>>,
|
||||
_trace_collector: Option<&Arc<dyn TraceCollector>>,
|
||||
cfg: &WriteBufferConnection,
|
||||
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
|
||||
let writer = match &cfg.type_[..] {
|
||||
|
|
@ -168,20 +168,7 @@ impl WriteBufferConfigFactory {
|
|||
.await?;
|
||||
Arc::new(file_buffer) as _
|
||||
}
|
||||
"kafka" => {
|
||||
let rskafa_buffer = RSKafkaProducer::new(
|
||||
cfg.connection.clone(),
|
||||
db_name.to_owned(),
|
||||
&cfg.connection_config,
|
||||
Arc::clone(&self.time_provider),
|
||||
cfg.creation_config.as_ref(),
|
||||
partitions,
|
||||
trace_collector.map(Arc::clone),
|
||||
&*self.metric_registry,
|
||||
)
|
||||
.await?;
|
||||
Arc::new(rskafa_buffer) as _
|
||||
}
|
||||
"kafka" => self.kafka_buffer_producer(db_name, cfg).await?,
|
||||
"mock" => match self.get_mock(&cfg.connection)? {
|
||||
Mock::Normal(state) => {
|
||||
let mock_buffer = MockBufferForWriting::new(
|
||||
|
|
@ -204,6 +191,24 @@ impl WriteBufferConfigFactory {
|
|||
Ok(writer)
|
||||
}
|
||||
|
||||
async fn kafka_buffer_producer(
|
||||
&self,
|
||||
db_name: &str,
|
||||
cfg: &WriteBufferConnection,
|
||||
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
|
||||
let kafka_buffer = crate::kafka::rdkafka::KafkaBufferProducer::new(
|
||||
&cfg.connection,
|
||||
db_name,
|
||||
&cfg.connection_config,
|
||||
cfg.creation_config.as_ref(),
|
||||
Arc::clone(&self.time_provider),
|
||||
&self.metric_registry,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Arc::new(kafka_buffer) as _)
|
||||
}
|
||||
|
||||
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
|
||||
pub async fn new_config_read(
|
||||
&self,
|
||||
|
|
|
|||
|
|
@ -80,6 +80,15 @@ impl From<std::io::Error> for WriteBufferError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<rdkafka::error::KafkaError> for WriteBufferError {
|
||||
fn from(e: rdkafka::error::KafkaError) -> Self {
|
||||
Self {
|
||||
inner: Box::new(e),
|
||||
kind: WriteBufferErrorKind::IO,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<rskafka::client::error::Error> for WriteBufferError {
|
||||
fn from(e: rskafka::client::error::Error) -> Self {
|
||||
Self {
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ pub struct KafkaProducerMetrics<P = SystemProvider> {
|
|||
impl KafkaProducerMetrics {
|
||||
/// Decorate the specified [`ProducerClient`] implementation with an
|
||||
/// instrumentation layer.
|
||||
#[allow(dead_code)]
|
||||
pub fn new(
|
||||
client: Box<dyn ProducerClient>,
|
||||
kafka_topic_name: String,
|
||||
|
|
|
|||
|
|
@ -1,24 +1,17 @@
|
|||
use self::{
|
||||
config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig},
|
||||
instrumentation::KafkaProducerMetrics,
|
||||
record_aggregator::RecordAggregator,
|
||||
};
|
||||
use self::config::{ClientConfig, ConsumerConfig, TopicCreationConfig};
|
||||
use crate::{
|
||||
codec::IoxHeaders,
|
||||
config::WriteBufferCreationConfig,
|
||||
core::{
|
||||
WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler,
|
||||
WriteBufferWriting,
|
||||
},
|
||||
core::{WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{Sequence, SequenceNumber, ShardIndex};
|
||||
use dml::{DmlMeta, DmlOperation};
|
||||
use dml::DmlOperation;
|
||||
use futures::{
|
||||
stream::{self, BoxStream},
|
||||
StreamExt, TryStreamExt,
|
||||
};
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use iox_time::Time;
|
||||
use observability_deps::tracing::warn;
|
||||
use parking_lot::Mutex;
|
||||
use rskafka::{
|
||||
|
|
@ -26,7 +19,6 @@ use rskafka::{
|
|||
consumer::{StartOffset, StreamConsumerBuilder},
|
||||
error::{Error as RSKafkaError, ProtocolError},
|
||||
partition::{OffsetAt, PartitionClient, UnknownTopicHandling},
|
||||
producer::{BatchProducer, BatchProducerBuilder},
|
||||
ClientBuilder,
|
||||
},
|
||||
record::RecordAndOffset,
|
||||
|
|
@ -43,112 +35,13 @@ use trace::TraceCollector;
|
|||
|
||||
mod config;
|
||||
mod instrumentation;
|
||||
mod record_aggregator;
|
||||
pub(crate) mod rdkafka;
|
||||
|
||||
/// Maximum number of jobs buffered and decoded concurrently.
|
||||
const CONCURRENT_DECODE_JOBS: usize = 10;
|
||||
|
||||
type Result<T, E = WriteBufferError> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RSKafkaProducer {
|
||||
producers: BTreeMap<ShardIndex, BatchProducer<RecordAggregator>>,
|
||||
}
|
||||
|
||||
impl RSKafkaProducer {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new<'a>(
|
||||
conn: String,
|
||||
topic_name: String,
|
||||
connection_config: &'a BTreeMap<String, String>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
creation_config: Option<&'a WriteBufferCreationConfig>,
|
||||
partitions: Option<Range<i32>>,
|
||||
_trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
metric_registry: &'a metric::Registry,
|
||||
) -> Result<Self> {
|
||||
let partition_clients = setup_topic(
|
||||
conn,
|
||||
topic_name.clone(),
|
||||
connection_config,
|
||||
creation_config,
|
||||
partitions,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let producer_config = ProducerConfig::try_from(connection_config)?;
|
||||
|
||||
let producers = partition_clients
|
||||
.into_iter()
|
||||
.map(|(shard_index, partition_client)| {
|
||||
// Instrument this kafka partition client.
|
||||
let partition_client = KafkaProducerMetrics::new(
|
||||
Box::new(partition_client),
|
||||
topic_name.clone(),
|
||||
shard_index,
|
||||
metric_registry,
|
||||
);
|
||||
|
||||
let mut producer_builder =
|
||||
BatchProducerBuilder::new_with_client(Arc::new(partition_client));
|
||||
if let Some(linger) = producer_config.linger {
|
||||
producer_builder = producer_builder.with_linger(linger);
|
||||
}
|
||||
let producer = producer_builder.build(RecordAggregator::new(
|
||||
shard_index,
|
||||
producer_config.max_batch_size,
|
||||
Arc::clone(&time_provider),
|
||||
));
|
||||
|
||||
(shard_index, producer)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Self { producers })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WriteBufferWriting for RSKafkaProducer {
|
||||
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
|
||||
self.producers.keys().copied().collect()
|
||||
}
|
||||
|
||||
async fn store_operation(
|
||||
&self,
|
||||
shard_index: ShardIndex,
|
||||
operation: DmlOperation,
|
||||
) -> Result<DmlMeta, WriteBufferError> {
|
||||
// Sanity check to ensure only partitioned writes are pushed into Kafka.
|
||||
if let DmlOperation::Write(w) = &operation {
|
||||
assert!(
|
||||
w.partition_key().is_some(),
|
||||
"enqueuing unpartitioned write into kafka"
|
||||
)
|
||||
}
|
||||
|
||||
let producer = self
|
||||
.producers
|
||||
.get(&shard_index)
|
||||
.ok_or_else::<WriteBufferError, _>(|| {
|
||||
format!("Unknown shard index: {}", shard_index).into()
|
||||
})?;
|
||||
|
||||
Ok(producer.produce(operation).await?)
|
||||
}
|
||||
|
||||
async fn flush(&self) -> Result<(), WriteBufferError> {
|
||||
for producer in self.producers.values() {
|
||||
producer.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn type_name(&self) -> &'static str {
|
||||
"kafka"
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RSKafkaStreamHandler {
|
||||
partition_client: Arc<PartitionClient>,
|
||||
|
|
@ -525,14 +418,17 @@ async fn setup_topic(
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
core::test_utils::{
|
||||
assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name,
|
||||
set_pop_first, TestAdapter, TestContext,
|
||||
core::{
|
||||
test_utils::{
|
||||
assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name,
|
||||
set_pop_first, TestAdapter, TestContext,
|
||||
},
|
||||
WriteBufferWriting,
|
||||
},
|
||||
maybe_skip_kafka_integration,
|
||||
};
|
||||
use data_types::{DeletePredicate, PartitionKey, TimestampRange};
|
||||
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
|
||||
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlMeta, DmlWrite};
|
||||
use futures::{stream::FuturesUnordered, TryStreamExt};
|
||||
use iox_time::TimeProvider;
|
||||
use rskafka::{client::partition::Compression, record::Record};
|
||||
|
|
@ -595,19 +491,17 @@ mod tests {
|
|||
|
||||
#[async_trait]
|
||||
impl TestContext for RSKafkaTestContext {
|
||||
type Writing = RSKafkaProducer;
|
||||
type Writing = rdkafka::KafkaBufferProducer;
|
||||
|
||||
type Reading = RSKafkaConsumer;
|
||||
|
||||
async fn writing(&self, creation_config: bool) -> Result<Self::Writing, WriteBufferError> {
|
||||
RSKafkaProducer::new(
|
||||
rdkafka::KafkaBufferProducer::new(
|
||||
self.conn.clone(),
|
||||
self.topic_name.clone(),
|
||||
&BTreeMap::default(),
|
||||
Arc::clone(&self.time_provider),
|
||||
self.creation_config(creation_config).as_ref(),
|
||||
None,
|
||||
Some(self.trace_collector() as Arc<_>),
|
||||
Arc::clone(&self.time_provider),
|
||||
&self.metrics,
|
||||
)
|
||||
.await
|
||||
|
|
@ -850,9 +744,9 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
async fn write(
|
||||
async fn write<T: WriteBufferWriting>(
|
||||
namespace: &str,
|
||||
producer: &RSKafkaProducer,
|
||||
producer: &T,
|
||||
trace_collector: &Arc<RingBufferTraceCollector>,
|
||||
shard_index: ShardIndex,
|
||||
partition_key: impl Into<PartitionKey> + Send,
|
||||
|
|
@ -869,9 +763,9 @@ mod tests {
|
|||
producer.store_operation(shard_index, op).await.unwrap()
|
||||
}
|
||||
|
||||
async fn delete(
|
||||
async fn delete<T: WriteBufferWriting>(
|
||||
namespace: &str,
|
||||
producer: &RSKafkaProducer,
|
||||
producer: &T,
|
||||
trace_collector: &Arc<RingBufferTraceCollector>,
|
||||
shard_index: ShardIndex,
|
||||
) -> DmlMeta {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,404 @@
|
|||
use crate::{
|
||||
codec::{ContentType, IoxHeaders},
|
||||
core::{WriteBufferError, WriteBufferWriting},
|
||||
kafka::WriteBufferCreationConfig,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{Sequence, SequenceNumber, ShardIndex};
|
||||
use dml::{DmlMeta, DmlOperation};
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use metric::{Attributes, DurationHistogram, Metric};
|
||||
use observability_deps::tracing::{debug, info};
|
||||
use rdkafka::{
|
||||
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
|
||||
client::DefaultClientContext,
|
||||
consumer::{BaseConsumer, Consumer},
|
||||
error::KafkaError,
|
||||
message::{Headers, OwnedHeaders},
|
||||
producer::{FutureProducer, FutureRecord, Producer},
|
||||
types::RDKafkaErrorCode,
|
||||
util::Timeout,
|
||||
ClientConfig,
|
||||
};
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
num::NonZeroU32,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// Default timeout supplied to rdkafka client for kafka operations.
|
||||
///
|
||||
/// Chosen to be a value less than the default gRPC timeout (30
|
||||
/// seconds) so we can detect kafka errors and return them prior to
|
||||
/// the gRPC requests to IOx timing out.
|
||||
///
|
||||
/// More context in
|
||||
/// <https://github.com/influxdata/influxdb_iox/issues/3029>
|
||||
const KAFKA_OPERATION_TIMEOUT_MS: u64 = 20_000;
|
||||
|
||||
impl From<&IoxHeaders> for OwnedHeaders {
|
||||
fn from(iox_headers: &IoxHeaders) -> Self {
|
||||
let mut res = Self::new();
|
||||
|
||||
for (header, value) in iox_headers.headers() {
|
||||
res = res.add(header, value.as_ref());
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
pub struct KafkaBufferProducer {
|
||||
conn: String,
|
||||
database_name: String,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
producer: Arc<FutureProducer<DefaultClientContext>>,
|
||||
partitions: BTreeSet<ShardIndex>,
|
||||
enqueue: Metric<DurationHistogram>,
|
||||
}
|
||||
|
||||
// Needed because rdkafka's FutureProducer doesn't impl Debug
|
||||
impl std::fmt::Debug for KafkaBufferProducer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("KafkaBufferProducer")
|
||||
.field("conn", &self.conn)
|
||||
.field("database_name", &self.database_name)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WriteBufferWriting for KafkaBufferProducer {
|
||||
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
|
||||
self.partitions.clone()
|
||||
}
|
||||
|
||||
/// Send a [`DmlOperation`] to the write buffer using the specified shard index.
|
||||
async fn store_operation(
|
||||
&self,
|
||||
shard_index: ShardIndex,
|
||||
operation: DmlOperation,
|
||||
) -> Result<DmlMeta, WriteBufferError> {
|
||||
// Sanity check to ensure only partitioned writes are pushed into Kafka.
|
||||
if let DmlOperation::Write(w) = &operation {
|
||||
assert!(
|
||||
w.partition_key().is_some(),
|
||||
"enqueuing unpartitioned write into kafka"
|
||||
)
|
||||
}
|
||||
|
||||
// Only send writes with known shard indexes to Kafka.
|
||||
if !self.partitions.contains(&shard_index) {
|
||||
return Err(format!("Unknown shard index: {}", shard_index).into());
|
||||
}
|
||||
|
||||
let kafka_partition_id = shard_index.get();
|
||||
|
||||
let enqueue_start = self.time_provider.now();
|
||||
|
||||
// truncate milliseconds from timestamps because that's what Kafka supports
|
||||
let now = operation
|
||||
.meta()
|
||||
.producer_ts()
|
||||
.unwrap_or_else(|| self.time_provider.now());
|
||||
|
||||
let timestamp_millis = now.date_time().timestamp_millis();
|
||||
let timestamp = Time::from_timestamp_millis(timestamp_millis);
|
||||
|
||||
let headers = IoxHeaders::new(
|
||||
ContentType::Protobuf,
|
||||
operation.meta().span_context().cloned(),
|
||||
operation.namespace().to_string(),
|
||||
);
|
||||
|
||||
let mut buf = Vec::new();
|
||||
crate::codec::encode_operation(&self.database_name, &operation, &mut buf)?;
|
||||
|
||||
// This type annotation is necessary because `FutureRecord` is generic over key type, but
|
||||
// key is optional and we're not setting a key. `String` is arbitrary.
|
||||
let record: FutureRecord<'_, String, _> = FutureRecord::to(&self.database_name)
|
||||
.payload(&buf)
|
||||
.partition(kafka_partition_id)
|
||||
.timestamp(timestamp_millis)
|
||||
.headers((&headers).into());
|
||||
let kafka_write_size = estimate_message_size(
|
||||
record.payload.map(|v| v.as_ref()),
|
||||
record.key.map(|s| s.as_bytes()),
|
||||
record.headers.as_ref(),
|
||||
);
|
||||
|
||||
debug!(db_name=%self.database_name, kafka_partition_id, size=buf.len(), "writing to kafka");
|
||||
|
||||
let res = self.producer.send(record, Timeout::Never).await;
|
||||
|
||||
if let Some(delta) = self
|
||||
.time_provider
|
||||
.now()
|
||||
.checked_duration_since(enqueue_start)
|
||||
{
|
||||
let result_attr = match &res {
|
||||
Ok(_) => "success",
|
||||
Err(_) => "error",
|
||||
};
|
||||
|
||||
let attr = Attributes::from([
|
||||
("kafka_partition", shard_index.to_string().into()),
|
||||
("kafka_topic", self.database_name.clone().into()),
|
||||
("result", result_attr.into()),
|
||||
]);
|
||||
|
||||
let recorder = self.enqueue.recorder(attr);
|
||||
recorder.record(delta);
|
||||
}
|
||||
|
||||
let (partition, offset) = res.map_err(|(e, _owned_message)| e)?;
|
||||
|
||||
debug!(db_name=%self.database_name, %offset, %partition, size=buf.len(), "wrote to kafka");
|
||||
|
||||
Ok(DmlMeta::sequenced(
|
||||
Sequence::new(shard_index, SequenceNumber::new(offset)),
|
||||
timestamp,
|
||||
operation.meta().span_context().cloned(),
|
||||
kafka_write_size,
|
||||
))
|
||||
}
|
||||
|
||||
async fn flush(&self) -> Result<(), WriteBufferError> {
|
||||
let producer = Arc::clone(&self.producer);
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
producer.flush(Timeout::Never);
|
||||
})
|
||||
.await
|
||||
.expect("subtask failed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn type_name(&self) -> &'static str {
|
||||
"kafka"
|
||||
}
|
||||
}
|
||||
|
||||
impl KafkaBufferProducer {
|
||||
pub async fn new(
|
||||
conn: impl Into<String> + Send,
|
||||
database_name: impl Into<String> + Send,
|
||||
connection_config: &BTreeMap<String, String>,
|
||||
creation_config: Option<&WriteBufferCreationConfig>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
metric_registry: &metric::Registry,
|
||||
) -> Result<Self, WriteBufferError> {
|
||||
let conn = conn.into();
|
||||
let database_name = database_name.into();
|
||||
|
||||
let mut cfg = ClientConfig::new();
|
||||
|
||||
// these configs can be overwritten
|
||||
cfg.set("message.timeout.ms", "5000");
|
||||
cfg.set("message.max.bytes", "31457280");
|
||||
cfg.set("message.send.max.retries", "10");
|
||||
cfg.set("queue.buffering.max.kbytes", "31457280");
|
||||
cfg.set("request.required.acks", "all"); // equivalent to acks=-1
|
||||
cfg.set("compression.type", "snappy");
|
||||
cfg.set("statistics.interval.ms", "15000");
|
||||
|
||||
// user overrides
|
||||
for (k, v) in connection_config {
|
||||
cfg.set(k, v);
|
||||
}
|
||||
|
||||
// these configs are set in stone
|
||||
cfg.set("bootstrap.servers", &conn);
|
||||
cfg.set("allow.auto.create.topics", "false");
|
||||
|
||||
// handle auto-creation
|
||||
let partitions =
|
||||
maybe_auto_create_topics(&conn, &database_name, creation_config, &cfg).await?;
|
||||
|
||||
let producer = cfg.create()?;
|
||||
|
||||
let enqueue = metric_registry.register_metric::<DurationHistogram>(
|
||||
"write_buffer_client_produce_duration",
|
||||
"duration of time taken to push a set of records to kafka \
|
||||
- includes codec, protocol, and network overhead",
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
conn,
|
||||
database_name,
|
||||
time_provider,
|
||||
producer: Arc::new(producer),
|
||||
partitions,
|
||||
enqueue,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate over the kafka messages
|
||||
fn header_iter<H>(headers: Option<&H>) -> impl Iterator<Item = (&str, &[u8])>
|
||||
where
|
||||
H: Headers,
|
||||
{
|
||||
headers
|
||||
.into_iter()
|
||||
.flat_map(|headers| (0..headers.count()).map(|idx| headers.get(idx).unwrap()))
|
||||
}
|
||||
|
||||
/// Estimate size of data read from kafka as payload len + key len + headers
|
||||
fn estimate_message_size<H>(
|
||||
payload: Option<&[u8]>,
|
||||
key: Option<&[u8]>,
|
||||
headers: Option<&H>,
|
||||
) -> usize
|
||||
where
|
||||
H: Headers,
|
||||
{
|
||||
payload.map(|payload| payload.len()).unwrap_or_default()
|
||||
+ key.map(|key| key.len()).unwrap_or_default()
|
||||
+ header_iter(headers)
|
||||
.map(|(key, value)| key.len() + value.len())
|
||||
.sum::<usize>()
|
||||
}
|
||||
|
||||
/// Get Kafka partition IDs (IOx ShardIndexes) for the database-specific Kafka topic.
|
||||
///
|
||||
/// Will return `None` if the topic is unknown and has to be created.
|
||||
///
|
||||
/// This will check that the partition is is non-empty.
|
||||
async fn get_partitions(
|
||||
database_name: &str,
|
||||
cfg: &ClientConfig,
|
||||
) -> Result<Option<BTreeSet<ShardIndex>>, WriteBufferError> {
|
||||
let database_name = database_name.to_string();
|
||||
let cfg = cfg.clone();
|
||||
|
||||
let metadata = tokio::task::spawn_blocking(move || {
|
||||
let probe_consumer: BaseConsumer = cfg.create()?;
|
||||
|
||||
probe_consumer.fetch_metadata(
|
||||
Some(&database_name),
|
||||
Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS),
|
||||
)
|
||||
})
|
||||
.await
|
||||
.expect("subtask failed")?;
|
||||
|
||||
let topic_metadata = metadata.topics().get(0).expect("requested a single topic");
|
||||
|
||||
match topic_metadata.error() {
|
||||
None => {
|
||||
let partitions: BTreeSet<_> = topic_metadata
|
||||
.partitions()
|
||||
.iter()
|
||||
.map(|partition_metdata| ShardIndex::new(partition_metdata.id()))
|
||||
.collect();
|
||||
|
||||
if partitions.is_empty() {
|
||||
Err("Topic exists but has no partitions".to_string().into())
|
||||
} else {
|
||||
Ok(Some(partitions))
|
||||
}
|
||||
}
|
||||
Some(error_code) => {
|
||||
let error_code: RDKafkaErrorCode = error_code.into();
|
||||
match error_code {
|
||||
RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::UnknownTopicOrPartition => {
|
||||
// The caller is responsible for creating the topic, so this is somewhat OK.
|
||||
Ok(None)
|
||||
}
|
||||
_ => Err(KafkaError::MetadataFetch(error_code).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn admin_client(kafka_connection: &str) -> Result<AdminClient<DefaultClientContext>, KafkaError> {
|
||||
let mut cfg = ClientConfig::new();
|
||||
cfg.set("bootstrap.servers", kafka_connection);
|
||||
cfg.set("message.timeout.ms", "5000");
|
||||
cfg.create()
|
||||
}
|
||||
|
||||
/// Create Kafka topic based on the provided configs.
|
||||
///
|
||||
/// This will create a topic with `n_sequencers` Kafka partitions.
|
||||
///
|
||||
/// This will NOT fail if the topic already exists! `maybe_auto_create_topics` will only call this
|
||||
/// if there are no partitions. Production should always have partitions already created, so
|
||||
/// `create_kafka_topic` shouldn't run in production and is only for test/dev environments.
|
||||
async fn create_kafka_topic(
|
||||
kafka_connection: &str,
|
||||
database_name: &str,
|
||||
n_sequencers: NonZeroU32,
|
||||
cfg: &BTreeMap<String, String>,
|
||||
) -> Result<(), WriteBufferError> {
|
||||
let admin = admin_client(kafka_connection)?;
|
||||
|
||||
let mut topic = NewTopic::new(
|
||||
database_name,
|
||||
n_sequencers.get() as i32,
|
||||
TopicReplication::Fixed(1),
|
||||
);
|
||||
for (k, v) in cfg {
|
||||
topic = topic.set(k, v);
|
||||
}
|
||||
|
||||
let opts = AdminOptions::default();
|
||||
let mut results = admin.create_topics([&topic], &opts).await?;
|
||||
assert_eq!(results.len(), 1, "created exactly one topic");
|
||||
let result = results.pop().expect("just checked the vector length");
|
||||
match result {
|
||||
Ok(topic) | Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => {
|
||||
assert_eq!(topic, database_name);
|
||||
Ok(())
|
||||
}
|
||||
Err((topic, code)) => {
|
||||
assert_eq!(topic, database_name);
|
||||
Err(format!("Cannot create topic '{}': {}", topic, code).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If there are no Kafka partitions, then create a topic. Production should have Kafka partitions
|
||||
/// created already, so this should only create a topic in test/dev environments.
|
||||
async fn maybe_auto_create_topics(
|
||||
kafka_connection: &str,
|
||||
database_name: &str,
|
||||
creation_config: Option<&WriteBufferCreationConfig>,
|
||||
cfg: &ClientConfig,
|
||||
) -> Result<BTreeSet<ShardIndex>, WriteBufferError> {
|
||||
const N_TRIES: usize = 10;
|
||||
|
||||
for i in 0..N_TRIES {
|
||||
if let Some(partitions) = get_partitions(database_name, cfg).await? {
|
||||
return Ok(partitions);
|
||||
}
|
||||
|
||||
// debounce after first round
|
||||
if i > 0 {
|
||||
info!(
|
||||
topic=%database_name,
|
||||
"Topic does not have partitions after creating it, wait a bit and try again."
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
}
|
||||
|
||||
if let Some(creation_config) = creation_config {
|
||||
create_kafka_topic(
|
||||
kafka_connection,
|
||||
database_name,
|
||||
creation_config.n_shards,
|
||||
&creation_config.options,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
return Err("no partitions found and auto-creation not requested"
|
||||
.to_string()
|
||||
.into());
|
||||
}
|
||||
}
|
||||
|
||||
Err(format!("Could not auto-create topic after {} tries.", N_TRIES).into())
|
||||
}
|
||||
|
|
@ -1,324 +0,0 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use data_types::{Sequence, SequenceNumber, ShardIndex};
|
||||
use dml::{DmlMeta, DmlOperation};
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use observability_deps::tracing::warn;
|
||||
use rskafka::{
|
||||
client::producer::aggregator::{
|
||||
Aggregator, Error, RecordAggregator as RecordAggregatorDelegate,
|
||||
RecordAggregatorStatusDeaggregator, StatusDeaggregator, TryPush,
|
||||
},
|
||||
record::Record,
|
||||
};
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
use crate::codec::{ContentType, IoxHeaders};
|
||||
|
||||
/// The [`Tag`] is a data-carrying token identifier used to de-aggregate
|
||||
/// responses from a batch aggregated of requests using the
|
||||
/// [`DmlMetaDeaggregator`].
|
||||
#[derive(Debug)]
|
||||
pub struct Tag {
|
||||
/// The tag into the batch returned by the
|
||||
/// [`RecordAggregatorDelegate::try_push()`] call.
|
||||
idx: usize,
|
||||
|
||||
/// The timestamp assigned to the resulting Kafka [`Record`].
|
||||
timestamp: Time,
|
||||
/// A span extracted from the original [`DmlOperation`].
|
||||
span_ctx: Option<SpanContext>,
|
||||
/// The approximate byte size of the serialised [`Record`], as calculated by
|
||||
/// [`Record::approximate_size()`].
|
||||
approx_kafka_write_size: usize,
|
||||
}
|
||||
|
||||
/// A [`RecordAggregator`] implements [rskafka]'s abstract [`Aggregator`]
|
||||
/// behaviour to provide batching of requests for a single Kafka partition.
|
||||
///
|
||||
/// Specifically the [`RecordAggregator`] maps [`DmlOperation`] instances to
|
||||
/// Kafka [`Record`] instances, and delegates the batching to the
|
||||
/// [`RecordAggregatorDelegate`] implementation maintained within [rskafka]
|
||||
/// itself.
|
||||
///
|
||||
/// [rskafka]: https://github.com/influxdata/rskafka
|
||||
#[derive(Debug)]
|
||||
pub struct RecordAggregator {
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
|
||||
/// The shard index (Kafka partition number) this aggregator batches ops for (from Kafka,
|
||||
/// not the catalog).
|
||||
shard_index: ShardIndex,
|
||||
|
||||
/// The underlying record aggregator the non-IOx-specific batching is
|
||||
/// delegated to.
|
||||
aggregator: RecordAggregatorDelegate,
|
||||
}
|
||||
|
||||
impl RecordAggregator {
|
||||
/// Initialise a new [`RecordAggregator`] to aggregate up to
|
||||
/// `max_batch_size` number of bytes per message.
|
||||
pub fn new(
|
||||
shard_index: ShardIndex,
|
||||
max_batch_size: usize,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
) -> Self {
|
||||
Self {
|
||||
shard_index,
|
||||
aggregator: RecordAggregatorDelegate::new(max_batch_size),
|
||||
time_provider,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RecordAggregator {
|
||||
/// Serialise the [`DmlOperation`] destined for the specified `db_name` into a
|
||||
/// [`Record`], returning the producer timestamp assigned to it.
|
||||
fn to_record(&self, op: &DmlOperation) -> Result<(Record, Time), Error> {
|
||||
let now = op
|
||||
.meta()
|
||||
.producer_ts()
|
||||
.unwrap_or_else(|| self.time_provider.now());
|
||||
|
||||
let headers = IoxHeaders::new(
|
||||
ContentType::Protobuf,
|
||||
op.meta().span_context().cloned(),
|
||||
op.namespace().to_owned(),
|
||||
);
|
||||
|
||||
let mut buf = Vec::new();
|
||||
crate::codec::encode_operation(op.namespace(), op, &mut buf)?;
|
||||
buf.shrink_to_fit();
|
||||
|
||||
let record = Record {
|
||||
key: None,
|
||||
value: Some(buf),
|
||||
headers: headers
|
||||
.headers()
|
||||
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec()))
|
||||
.collect(),
|
||||
timestamp: now.date_time(),
|
||||
};
|
||||
|
||||
Ok((record, now))
|
||||
}
|
||||
}
|
||||
|
||||
impl Aggregator for RecordAggregator {
|
||||
type Input = DmlOperation;
|
||||
type Tag = <DmlMetaDeaggregator as StatusDeaggregator>::Tag;
|
||||
type StatusDeaggregator = DmlMetaDeaggregator;
|
||||
|
||||
/// Callers should retain the returned [`Tag`] in order to de-aggregate the
|
||||
/// [`DmlMeta`] from the request response.
|
||||
fn try_push(&mut self, op: Self::Input) -> Result<TryPush<Self::Input, Self::Tag>, Error> {
|
||||
// Encode the DML op to a Record
|
||||
let (record, timestamp) = self.to_record(&op)?;
|
||||
|
||||
// Capture various metadata necessary to construct the Tag/DmlMeta for
|
||||
// the caller once a batch has been flushed.
|
||||
let span_ctx = op.meta().span_context().cloned();
|
||||
let approx_kafka_write_size = record.approximate_size();
|
||||
|
||||
// And delegate batching to rskafka's RecordAggregator implementation
|
||||
Ok(match self.aggregator.try_push(record)? {
|
||||
// NoCapacity returns the original input to the caller when the
|
||||
// batching fails.
|
||||
//
|
||||
// The RecordBatcher delegate is returning the Record encoded from
|
||||
// op above, but the caller of this fn is expecting the original op.
|
||||
//
|
||||
// Map to the original input op this fn was called with, discarding
|
||||
// the encoded Record.
|
||||
TryPush::NoCapacity(_) => {
|
||||
// Log a warning if this occurs - this allows an operator to
|
||||
// increase the maximum Kafka message size, or lower the linger
|
||||
// time to minimise latency while still producing large enough
|
||||
// batches for it to be worth while.
|
||||
warn!("aggregated batch reached maximum capacity");
|
||||
TryPush::NoCapacity(op)
|
||||
}
|
||||
|
||||
// A successful delegate aggregation returns the tag for offset
|
||||
// de-aggregation later. For simplicity, the tag this layer returns
|
||||
// also carries the various (small) metadata elements needed to
|
||||
// construct the DmlMeta at the point of de-aggregation.
|
||||
TryPush::Aggregated(idx) => TryPush::Aggregated(Tag {
|
||||
idx,
|
||||
timestamp,
|
||||
span_ctx,
|
||||
approx_kafka_write_size,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> Result<(Vec<Record>, Self::StatusDeaggregator), Error> {
|
||||
let records = self.aggregator.flush()?.0;
|
||||
Ok((records, DmlMetaDeaggregator::new(self.shard_index)))
|
||||
}
|
||||
}
|
||||
|
||||
/// The de-aggregation half of the [`RecordAggregator`], this type consumes the
|
||||
/// caller's [`Tag`] obtained from the aggregator to return the corresponding
|
||||
/// [`DmlMeta`] from the batched response.
|
||||
///
|
||||
/// The [`DmlMetaDeaggregator`] is a stateless wrapper over the (also stateless)
|
||||
/// [`RecordAggregatorStatusDeaggregator`] delegate, with most of the metadata
|
||||
/// elements carried in the [`Tag`] itself.
|
||||
#[derive(Debug)]
|
||||
pub struct DmlMetaDeaggregator {
|
||||
shard_index: ShardIndex,
|
||||
}
|
||||
|
||||
impl DmlMetaDeaggregator {
|
||||
pub fn new(shard_index: ShardIndex) -> Self {
|
||||
Self { shard_index }
|
||||
}
|
||||
}
|
||||
|
||||
impl StatusDeaggregator for DmlMetaDeaggregator {
|
||||
type Status = DmlMeta;
|
||||
type Tag = Tag;
|
||||
|
||||
fn deaggregate(&self, input: &[i64], tag: Self::Tag) -> Result<Self::Status, Error> {
|
||||
// Delegate de-aggregation to the (stateless) record batch
|
||||
// de-aggregator for forwards compatibility.
|
||||
let offset = RecordAggregatorStatusDeaggregator::default()
|
||||
.deaggregate(input, tag.idx)
|
||||
.expect("invalid de-aggregation index");
|
||||
|
||||
Ok(DmlMeta::sequenced(
|
||||
Sequence::new(self.shard_index, SequenceNumber::new(offset)),
|
||||
tag.timestamp,
|
||||
tag.span_ctx,
|
||||
tag.approx_kafka_write_size,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use dml::DmlWrite;
|
||||
use hashbrown::HashMap;
|
||||
use iox_time::MockProvider;
|
||||
use mutable_batch::{writer::Writer, MutableBatch};
|
||||
use trace::LogTraceCollector;
|
||||
|
||||
use crate::codec::{
|
||||
CONTENT_TYPE_PROTOBUF, HEADER_CONTENT_TYPE, HEADER_NAMESPACE, HEADER_TRACE_CONTEXT,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
const NAMESPACE: &str = "bananas";
|
||||
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);
|
||||
const TIMESTAMP_MILLIS: i64 = 1659990497000;
|
||||
|
||||
fn test_op() -> DmlOperation {
|
||||
let mut batch = MutableBatch::new();
|
||||
let mut writer = Writer::new(&mut batch, 1);
|
||||
writer
|
||||
// Date: "1970-01-01"
|
||||
.write_time("time", [42].into_iter())
|
||||
.unwrap();
|
||||
writer
|
||||
.write_i64("A", Some(&[0b00000001]), [1].into_iter())
|
||||
.unwrap();
|
||||
writer.commit();
|
||||
|
||||
let mut m = HashMap::default();
|
||||
m.insert("table".to_string(), batch);
|
||||
|
||||
let span = SpanContext::new(Arc::new(LogTraceCollector::new()));
|
||||
|
||||
DmlOperation::Write(DmlWrite::new(
|
||||
NAMESPACE.to_string(),
|
||||
m,
|
||||
Some("1970-01-01".into()),
|
||||
DmlMeta::unsequenced(Some(span)),
|
||||
))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_aggregate() {
|
||||
let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis(
|
||||
TIMESTAMP_MILLIS,
|
||||
)));
|
||||
let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MAX, clock);
|
||||
let write = test_op();
|
||||
|
||||
let res = agg.try_push(write).expect("aggregate call should succeed");
|
||||
let tag = match res {
|
||||
TryPush::NoCapacity(_) => panic!("unexpected no capacity"),
|
||||
TryPush::Aggregated(tag) => tag,
|
||||
};
|
||||
|
||||
// Flush the aggregator to acquire the records
|
||||
let (records, deagg) = agg.flush().expect("should flush");
|
||||
assert_eq!(records.len(), 1);
|
||||
|
||||
// Another flush should not yield the same records
|
||||
let (records2, _) = agg.flush().expect("should flush");
|
||||
assert!(records2.is_empty());
|
||||
|
||||
// Assert properties of the resulting record
|
||||
let record = records[0].clone();
|
||||
assert_eq!(record.key, None);
|
||||
assert!(record.value.is_some());
|
||||
assert_eq!(
|
||||
*record
|
||||
.headers
|
||||
.get(HEADER_CONTENT_TYPE)
|
||||
.expect("no content type"),
|
||||
Vec::<u8>::from(CONTENT_TYPE_PROTOBUF),
|
||||
);
|
||||
assert_eq!(
|
||||
*record
|
||||
.headers
|
||||
.get(HEADER_NAMESPACE)
|
||||
.expect("no namespace header"),
|
||||
Vec::<u8>::from(NAMESPACE),
|
||||
);
|
||||
assert!(record.headers.get(HEADER_TRACE_CONTEXT).is_some());
|
||||
assert_eq!(record.timestamp.timestamp(), 1659990497);
|
||||
|
||||
// Extract the DmlMeta from the de-aggregator
|
||||
let got = deagg
|
||||
.deaggregate(&[4242], tag)
|
||||
.expect("de-aggregate should succeed");
|
||||
|
||||
// Assert the metadata properties
|
||||
assert!(got.span_context().is_some());
|
||||
assert_eq!(
|
||||
*got.sequence().expect("should be sequenced"),
|
||||
Sequence::new(SHARD_INDEX, SequenceNumber::new(4242))
|
||||
);
|
||||
assert_eq!(
|
||||
got.producer_ts().expect("no producer timestamp"),
|
||||
Time::from_timestamp_millis(TIMESTAMP_MILLIS)
|
||||
);
|
||||
assert_eq!(
|
||||
got.bytes_read().expect("no approx size"),
|
||||
record.approximate_size()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_record_aggregate_no_capacity() {
|
||||
let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis(
|
||||
TIMESTAMP_MILLIS,
|
||||
)));
|
||||
let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MIN, clock);
|
||||
let write = test_op();
|
||||
|
||||
let res = agg
|
||||
.try_push(write.clone())
|
||||
.expect("aggregate call should succeed");
|
||||
match res {
|
||||
TryPush::NoCapacity(res) => assert_eq!(res.namespace(), write.namespace()),
|
||||
TryPush::Aggregated(_) => panic!("expected no capacity"),
|
||||
};
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue