feat: buffer writes when writing to RSKafka (#3520)

pull/24376/head
Marco Neumann 2022-02-01 10:07:52 +00:00 committed by GitHub
parent bf6691e56f
commit b326b62b44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 744 additions and 118 deletions

13
Cargo.lock generated
View File

@ -3880,12 +3880,13 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.1.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=63cf0a541b864d5376b2f96537a5bbb610d0e4bc#63cf0a541b864d5376b2f96537a5bbb610d0e4bc"
source = "git+https://github.com/influxdata/rskafka.git?rev=8a2ae6b372aaf831a4586c5732a7881bda612674#8a2ae6b372aaf831a4586c5732a7881bda612674"
dependencies = [
"async-trait",
"bytes",
"crc",
"futures",
"integer-encoding 3.0.2",
"parking_lot",
"pin-project-lite",
"rand",
@ -3895,7 +3896,6 @@ dependencies = [
"tokio",
"tokio-rustls",
"tracing",
"varint-rs",
]
[[package]]
@ -5295,12 +5295,6 @@ dependencies = [
"getrandom",
]
[[package]]
name = "varint-rs"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -5620,9 +5614,11 @@ dependencies = [
"dotenv",
"futures",
"generated_types",
"hashbrown 0.12.0",
"http",
"httparse",
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
@ -5631,6 +5627,7 @@ dependencies = [
"prost",
"rdkafka",
"rskafka",
"schema",
"tempfile",
"test_helpers",
"time 0.1.0",

View File

@ -22,6 +22,7 @@ use router2::{
};
use thiserror::Error;
use time::SystemProvider;
use trace::TraceCollector;
use write_buffer::{config::WriteBufferConfigFactory, core::WriteBufferError};
#[derive(Debug, Error)]
@ -80,7 +81,12 @@ pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let metrics = Arc::new(metric::Registry::default());
let write_buffer = init_write_buffer(&config, Arc::clone(&metrics)).await?;
let write_buffer = init_write_buffer(
&config,
Arc::clone(&metrics),
common_state.trace_collector(),
)
.await?;
let http = HttpDelegate::new(config.run_config.max_http_request_size, write_buffer);
let router_server = RouterServer::new(
@ -102,6 +108,7 @@ pub async fn command(config: Config) -> Result<()> {
async fn init_write_buffer(
config: &Config,
metrics: Arc<metric::Registry>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<ShardedWriteBuffer<TableNamespaceSharder<Arc<Sequencer>>>> {
let write_buffer_config = WriteBufferConnection {
type_: config.write_buffer_type.clone(),
@ -113,7 +120,11 @@ async fn init_write_buffer(
let write_buffer = WriteBufferConfigFactory::new(Arc::new(SystemProvider::default()), metrics);
let write_buffer = Arc::new(
write_buffer
.new_config_write(&config.write_buffer_topic, &write_buffer_config)
.new_config_write(
&config.write_buffer_topic,
trace_collector.as_ref(),
&write_buffer_config,
)
.await?,
);

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use cache_loader_async::cache_api::LoadingCache;
use data_types::write_buffer::WriteBufferConnection;
use observability_deps::tracing::debug;
use trace::TraceCollector;
use write_buffer::{
config::WriteBufferConfigFactory,
core::{WriteBufferError, WriteBufferWriting},
@ -46,7 +47,11 @@ impl ConnectionPool {
/// Create new connection pool.
///
/// If `use_mock_grpc` is set only mock gRPC clients are created.
pub async fn new(use_mock_grpc: bool, wb_factory: Arc<WriteBufferConfigFactory>) -> Self {
pub async fn new(
use_mock_grpc: bool,
wb_factory: Arc<WriteBufferConfigFactory>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Self {
// Note: this function is async even though it does not contain any `.await` calls because `LoadingCache::new`
// requires tokio to be running and even if documented people will forget about this.
@ -71,9 +76,10 @@ impl ConnectionPool {
let write_buffer_producers = LoadingCache::new(move |key: KeyWriteBufferProducer| {
let wb_factory = Arc::clone(&wb_factory);
let trace_collector = trace_collector.clone();
async move {
wb_factory
.new_config_write(&key.0, &key.1)
.new_config_write(&key.0, trace_collector.as_ref(), &key.1)
.await
.map_err(|e| Arc::new(EWrapper(e)) as ConnectionError)
}
@ -98,6 +104,7 @@ impl ConnectionPool {
time_provider,
metric_registry,
)),
None,
)
.await
}
@ -151,6 +158,7 @@ mod tests {
Arc::clone(&time_provider),
Arc::clone(&metric_registry),
)),
None,
)
.await;
// connection will fail
@ -162,6 +170,7 @@ mod tests {
time_provider,
metric_registry,
)),
None,
)
.await;
let client2 = pool2.grpc_client("foo").await.unwrap();

View File

@ -71,7 +71,8 @@ impl RouterServer {
Arc::clone(&metric_registry),
))
});
let connection_pool = Arc::new(ConnectionPool::new(use_mock_grpc, wb_factory).await);
let connection_pool =
Arc::new(ConnectionPool::new(use_mock_grpc, wb_factory, trace_collector.clone()).await);
Self {
server_id: RwLock::new(None),

View File

@ -227,7 +227,7 @@ mod tests {
metric_registry,
));
wb_factory.register_always_fail_mock(String::from("failing_wb"));
let connection_pool = Arc::new(ConnectionPool::new(true, wb_factory).await);
let connection_pool = Arc::new(ConnectionPool::new(true, wb_factory, None).await);
let client_grpc = connection_pool.grpc_client("1.2.3.4").await.unwrap();
let client_grpc = client_grpc.as_any().downcast_ref::<MockClient>().unwrap();

View File

@ -10,9 +10,11 @@ dml = { path = "../dml" }
dotenv = "0.15.0"
futures = "0.3"
generated_types = { path = "../generated_types" }
hashbrown = "0.12"
http = "0.2"
httparse = "1.5"
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }
@ -20,7 +22,8 @@ parking_lot = "0.11.2"
pin-project = "1.0"
prost = "0.9"
rdkafka = { version = "0.28.0", optional = true }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="63cf0a541b864d5376b2f96537a5bbb610d0e4bc" }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="8a2ae6b372aaf831a4586c5732a7881bda612674" }
schema = { path = "../schema" }
time = { path = "../time" }
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.6.9"

View File

@ -227,7 +227,7 @@ pub fn encode_operation(
mod tests {
use trace::RingBufferTraceCollector;
use crate::core::test_utils::assert_span_context_eq;
use crate::core::test_utils::assert_span_context_eq_or_linked;
use super::*;
@ -251,9 +251,10 @@ mod tests {
let iox_headers2 = IoxHeaders::from_headers(encoded, Some(&collector)).unwrap();
assert_eq!(iox_headers1.content_type, iox_headers2.content_type);
assert_span_context_eq(
assert_span_context_eq_or_linked(
iox_headers1.span_context.as_ref().unwrap(),
iox_headers2.span_context.as_ref().unwrap(),
vec![],
);
assert_eq!(iox_headers1.namespace, iox_headers2.namespace);
}

View File

@ -87,6 +87,7 @@ impl WriteBufferConfigFactory {
pub async fn new_config_write(
&self,
db_name: &str,
trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
let writer = match &cfg.type_[..] {
@ -122,6 +123,7 @@ impl WriteBufferConfigFactory {
db_name.to_owned(),
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
trace_collector.map(Arc::clone),
)
.await?;
Arc::new(rskafa_buffer) as _
@ -281,7 +283,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "file");
@ -324,7 +326,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock");
@ -336,7 +338,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
@ -393,7 +395,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
@ -405,7 +407,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().starts_with("Unknown mock ID:"));
@ -477,7 +479,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "rskafka");
@ -521,7 +523,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
@ -564,7 +566,7 @@ mod tests {
};
let err = factory
.new_config_write(db_name.as_str(), &cfg)
.new_config_write(db_name.as_str(), None, &cfg)
.await
.unwrap_err();
assert_eq!(

View File

@ -111,7 +111,7 @@ pub mod test_utils {
time::Duration,
};
use time::{Time, TimeProvider};
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
use trace::{ctx::SpanContext, span::Span, RingBufferTraceCollector};
use uuid::Uuid;
/// Generated random topic name for testing.
@ -157,6 +157,9 @@ pub mod test_utils {
/// Create new reader.
async fn reading(&self, creation_config: bool) -> Result<Self::Reading, WriteBufferError>;
/// Trace collector that is used in this context.
fn trace_collector(&self) -> Arc<RingBufferTraceCollector>;
}
/// Generic test suite that must be passed by all proper write buffer implementations.
@ -629,8 +632,8 @@ pub mod test_utils {
write("namespace", &writer, entry, sequencer_id, None).await;
// 2: some context
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span_context_1 = SpanContext::new(Arc::clone(&collector));
let collector = context.trace_collector();
let span_context_1 = SpanContext::new(Arc::clone(&collector) as Arc<_>);
write(
"namespace",
&writer,
@ -641,7 +644,7 @@ pub mod test_utils {
.await;
// 2: another context
let span_context_parent = SpanContext::new(collector);
let span_context_parent = SpanContext::new(Arc::clone(&collector) as Arc<_>);
let span_context_2 = span_context_parent.child("foo").ctx;
write(
"namespace",
@ -659,12 +662,12 @@ pub mod test_utils {
// check write 2
let write_2 = stream.stream.next().await.unwrap().unwrap();
let actual_context_1 = write_2.meta().span_context().unwrap();
assert_span_context_eq(actual_context_1, &span_context_1);
assert_span_context_eq_or_linked(&span_context_1, actual_context_1, collector.spans());
// check write 3
let write_3 = stream.stream.next().await.unwrap().unwrap();
let actual_context_2 = write_3.meta().span_context().unwrap();
assert_span_context_eq(actual_context_2, &span_context_2);
assert_span_context_eq_or_linked(&span_context_2, actual_context_2, collector.spans());
}
/// Test that writing to an unknown sequencer produces an error
@ -770,16 +773,35 @@ pub mod test_utils {
}
}
/// Asserts that given span context are the same.
/// Asserts that given span context are the same or that `second` links back to `first`.
///
/// "Same" means:
/// - identical trace ID
/// - identical span ID
/// - identical parent span ID
pub(crate) fn assert_span_context_eq(lhs: &SpanContext, rhs: &SpanContext) {
assert_eq!(lhs.trace_id, rhs.trace_id);
assert_eq!(lhs.span_id, rhs.span_id);
assert_eq!(lhs.parent_span_id, rhs.parent_span_id);
pub(crate) fn assert_span_context_eq_or_linked(
first: &SpanContext,
second: &SpanContext,
spans: Vec<Span>,
) {
// search for links
for span in spans {
if (span.ctx.trace_id == second.trace_id) && (span.ctx.span_id == second.span_id) {
// second context was emitted as span
// check if it links to first context
for (trace_id, span_id) in span.ctx.links {
if (trace_id == first.trace_id) && (span_id == first.span_id) {
return;
}
}
}
}
// no link found
assert_eq!(first.trace_id, second.trace_id);
assert_eq!(first.span_id, second.span_id);
assert_eq!(first.parent_span_id, second.parent_span_id);
}
/// Pops first entry from map.

View File

@ -699,6 +699,7 @@ mod tests {
database_name: format!("test_db_{}", Uuid::new_v4()),
n_sequencers,
time_provider,
trace_collector: Arc::new(RingBufferTraceCollector::new(100)),
}
}
}
@ -708,6 +709,7 @@ mod tests {
database_name: String,
n_sequencers: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Arc<RingBufferTraceCollector>,
}
impl FileTestContext {
@ -735,17 +737,18 @@ mod tests {
}
async fn reading(&self, creation_config: bool) -> Result<Self::Reading, WriteBufferError> {
let trace_collector: Arc<dyn TraceCollector> =
Arc::new(RingBufferTraceCollector::new(5));
FileBufferConsumer::new(
&self.path,
&self.database_name,
self.creation_config(creation_config).as_ref(),
Some(&trace_collector),
Some(&(self.trace_collector() as Arc<_>)),
)
.await
}
fn trace_collector(&self) -> Arc<RingBufferTraceCollector> {
Arc::clone(&self.trace_collector)
}
}
#[tokio::test]

View File

@ -755,7 +755,7 @@ mod tests {
};
use test_helpers::maybe_start_logging;
use time::TimeProvider;
use trace::{RingBufferTraceCollector, TraceCollector};
use trace::RingBufferTraceCollector;
struct KafkaTestAdapter {
conn: String,
@ -783,6 +783,7 @@ mod tests {
n_sequencers,
time_provider,
metric_registry: metric::Registry::new(),
trace_collector: Arc::new(RingBufferTraceCollector::new(100)),
}
}
}
@ -794,6 +795,7 @@ mod tests {
n_sequencers: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
metric_registry: metric::Registry,
trace_collector: Arc<RingBufferTraceCollector>,
}
impl KafkaTestContext {
@ -836,19 +838,21 @@ mod tests {
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
let server_id = ServerId::try_from(server_id).unwrap();
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
KafkaBufferConsumer::new(
&self.conn,
server_id,
&self.database_name,
&self.connection_config(),
self.creation_config(creation_config).as_ref(),
Some(&collector),
Some(&(self.trace_collector() as Arc<_>)),
&self.metric_registry,
)
.await
}
fn trace_collector(&self) -> Arc<RingBufferTraceCollector> {
Arc::clone(&self.trace_collector)
}
}
#[tokio::test]

View File

@ -546,6 +546,7 @@ mod tests {
use mutable_batch_lp::lines_to_batches;
use time::TimeProvider;
use trace::RingBufferTraceCollector;
use crate::core::test_utils::{map_pop_first, perform_generic_tests, TestAdapter, TestContext};
@ -566,6 +567,7 @@ mod tests {
state: MockBufferSharedState::uninitialized(),
n_sequencers,
time_provider,
trace_collector: Arc::new(RingBufferTraceCollector::new(100)),
}
}
}
@ -574,6 +576,7 @@ mod tests {
state: MockBufferSharedState,
n_sequencers: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Arc<RingBufferTraceCollector>,
}
impl MockTestContext {
@ -605,6 +608,10 @@ mod tests {
self.creation_config(creation_config).as_ref(),
)
}
fn trace_collector(&self) -> Arc<RingBufferTraceCollector> {
Arc::clone(&self.trace_collector)
}
}
#[tokio::test]

View File

@ -0,0 +1,429 @@
use std::sync::Arc;
use data_types::sequence::Sequence;
use dml::{DmlMeta, DmlOperation, DmlWrite};
use hashbrown::{hash_map::Entry, HashMap};
use mutable_batch::MutableBatch;
use observability_deps::tracing::debug;
use rskafka::{
client::producer::aggregator::{self, Aggregator, StatusDeaggregator, TryPush},
record::Record,
};
use schema::selection::Selection;
use time::{Time, TimeProvider};
use trace::{ctx::SpanContext, span::SpanRecorder, TraceCollector};
use crate::codec::{ContentType, IoxHeaders};
/// Newtype wrapper for tags given back to the aggregator framework.
///
/// We cannot just use a simple `usize` to get the offsets from the produced records because we can have writes for
/// different namespaces open at the same time, so the actual record offset will only be known after the tag has been
/// produced. Internally we use simple lookup table "tag -> record" to solve this issue.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Tag(usize);
/// Aggregate writes of a single namespace.
#[derive(Debug)]
struct WriteAggregator {
/// Namespace.
namespace: String,
/// Data for every table.
tables: HashMap<String, MutableBatch>,
/// Span recorder to link spans from incoming writes to aggregated write.
span_recorder: SpanRecorder,
/// Tag, so we can later find the offset of the produced record.
tag: Tag,
}
impl WriteAggregator {
fn new(write: DmlWrite, collector: Option<Arc<dyn TraceCollector>>, tag: Tag) -> Self {
// TODO: `.into_tables()` could be helpful
let tables = write
.tables()
.map(|(name, batch)| (name.to_owned(), batch.clone()))
.collect();
let mut span_recorder = SpanRecorder::new(
collector.map(|collector| SpanContext::new(collector).child("write buffer aggregator")),
);
if let Some(ctx) = write.meta().span_context() {
span_recorder.link(ctx);
}
Self {
namespace: write.namespace().to_owned(),
tables,
span_recorder,
tag,
}
}
/// Check if we can push the given write to this aggregator (mostly if the schemas match).
fn can_push(&self, write: &DmlWrite) -> bool {
assert_eq!(write.namespace(), self.namespace);
for (table, batch) in write.tables() {
if let Some(existing) = self.tables.get(table) {
match (
existing.schema(Selection::All),
batch.schema(Selection::All),
) {
(Ok(schema_a), Ok(schema_b)) if schema_a == schema_b => {}
_ => {
return false;
}
}
}
}
true
}
/// Push write to this aggregator.
///
/// The caller MUST call [`can_push`](Self::can_push) beforehand to check if the schemas match.
fn push(&mut self, write: DmlWrite) {
assert_eq!(write.namespace(), self.namespace);
// TODO: `.into_tables()` could be helpful
for (table, batch) in write.tables() {
self.tables
.entry(table.to_owned())
.and_modify(|existing| {
existing
.extend_from(batch)
.expect("Caller should have checked if schemas are compatible")
})
.or_insert_with(|| batch.clone());
}
if let Some(ctx) = write.meta().span_context() {
self.span_recorder.link(ctx);
}
}
/// Flush aggregator to a ready-to-use DML write.
fn flush(mut self) -> DmlWrite {
self.span_recorder.ok("aggregated");
// attach a span if there is at least 1 active link
let meta = DmlMeta::unsequenced(
self.span_recorder
.span()
.map(|span| (!span.ctx.links.is_empty()).then(|| span.ctx.clone()))
.flatten(),
);
DmlWrite::new(self.namespace, self.tables, meta)
}
/// Current estimated size of the aggregated data.
fn size(&self) -> usize {
self.namespace.len()
+ self
.tables
.iter()
.map(|(k, v)| std::mem::size_of_val(k) + k.capacity() + v.size())
.sum::<usize>()
}
}
/// Inner state of [`DmlAggregator`].
// TODO: in theory we could also fold the deletes into the writes already at this point.
#[derive(Debug, Default)]
struct DmlAggregatorState {
/// Completed (i.e. aggregated or flushed) operations in correct order.
completed_ops: Vec<DmlOperation>,
/// Sum of all sizes of `completed_ops` for fast access.
completed_size: usize,
/// Current writes per namespace.
current_writes: HashMap<String, WriteAggregator>,
/// Maps tags to record.
tag_to_record: Vec<usize>,
}
impl DmlAggregatorState {
/// Current estimated size of all aggregated data.
fn size(&self) -> usize {
self.completed_size
+ self
.current_writes
.values()
.map(|agg| agg.size())
.sum::<usize>()
}
/// Reserve so it can be used with [`push_op`](Self::push_op).
///
/// This method takes `tag_to_offset` instead of `self` so we can use it while holding a reference to other struct
/// members.
fn reserve_tag(tag_to_offset: &mut Vec<usize>) -> Tag {
let tag = Tag(tag_to_offset.len());
tag_to_offset.push(usize::MAX);
tag
}
/// Push given DML operation to completed operations.
///
/// Takes an optional pre-calculated size.
fn push_op(&mut self, op: DmlOperation, size: Option<usize>, tag: Tag) {
self.completed_size += size.unwrap_or_else(|| op.size());
let offset = self.completed_ops.len();
self.tag_to_record[tag.0] = offset;
self.completed_ops.push(op);
}
/// Flushes write for given namespace to completed operations.
///
/// This is a no-op if no active write exists.
fn flush_write(&mut self, namespace: &str) {
if let Some(agg) = self.current_writes.remove(namespace) {
let tag = agg.tag;
self.push_op(DmlOperation::Write(agg.flush()), None, tag);
}
}
/// Flushes writes for all namespaces to completed operations in sorted order (by namespace).
fn flush_writes(&mut self) {
let mut writes: Vec<_> = self.current_writes.drain().collect();
writes.sort_by_key(|(k, _v)| k.clone());
for (_k, agg) in writes {
let tag = agg.tag;
self.push_op(DmlOperation::Write(agg.flush()), None, tag);
}
}
}
/// Aggregator of [`DmlOperation`].
#[derive(Debug)]
pub struct DmlAggregator {
/// Optional trace collector.
collector: Option<Arc<dyn TraceCollector>>,
/// Database name.
database_name: String,
/// Maximum batch size in bytes.
max_size: usize,
/// Sequencer ID.
sequencer_id: u32,
/// Inner state that will be modified via `try_push` and reset via `flush`.
state: DmlAggregatorState,
/// Time provider.
time_provider: Arc<dyn TimeProvider>,
}
impl DmlAggregator {
pub fn new(
collector: Option<Arc<dyn TraceCollector>>,
database_name: String,
max_size: usize,
sequencer_id: u32,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self {
collector,
database_name,
max_size,
sequencer_id,
state: DmlAggregatorState::default(),
time_provider,
}
}
}
impl Aggregator for DmlAggregator {
type Input = DmlOperation;
type Tag = Tag;
type StatusDeaggregator = Deaggregator;
fn try_push(
&mut self,
op: Self::Input,
) -> Result<TryPush<Self::Input, Self::Tag>, aggregator::Error> {
let op_size = op.size();
if self.state.size() + op_size > self.max_size {
return Ok(TryPush::NoCapacity(op));
}
match op {
DmlOperation::Write(write) => {
let tag = match self
.state
.current_writes
.entry(write.namespace().to_string())
{
Entry::Occupied(mut o) => {
// Open write aggregator => check if we can push to it.
let agg = o.get_mut();
if agg.can_push(&write) {
// Schemas match => use this aggregator.
agg.push(write);
agg.tag
} else {
// Schemas don't match => use new aggregator (the write will likely fail on the ingester
// side though).
let new_tag =
DmlAggregatorState::reserve_tag(&mut self.state.tag_to_record);
let mut agg2 = WriteAggregator::new(
write,
self.collector.as_ref().map(Arc::clone),
new_tag,
);
std::mem::swap(agg, &mut agg2);
let flushed_tag = agg2.tag;
self.state.push_op(
DmlOperation::Write(agg2.flush()),
None,
flushed_tag,
);
new_tag
}
}
Entry::Vacant(v) => {
// No open write aggregator yet => create one.
let tag = DmlAggregatorState::reserve_tag(&mut self.state.tag_to_record);
v.insert(WriteAggregator::new(
write,
self.collector.as_ref().map(Arc::clone),
tag,
));
tag
}
};
Ok(TryPush::Aggregated(tag))
}
DmlOperation::Delete(_) => {
// must flush write aggregate to prevent deletes from "bypassing" deletes
self.state.flush_write(op.namespace());
let tag = DmlAggregatorState::reserve_tag(&mut self.state.tag_to_record);
self.state.push_op(op, Some(op_size), tag);
Ok(TryPush::Aggregated(tag))
}
}
}
fn flush(&mut self) -> Result<(Vec<Record>, Self::StatusDeaggregator), aggregator::Error> {
let mut state = std::mem::take(&mut self.state);
state.flush_writes();
let mut records = Vec::with_capacity(state.completed_ops.len());
let mut metadata = Vec::with_capacity(state.completed_ops.len());
for op in state.completed_ops {
// truncate milliseconds from timestamps because that's what Kafka supports
let now = op
.meta()
.producer_ts()
.unwrap_or_else(|| self.time_provider.now());
let timestamp_millis = now.date_time().timestamp_millis();
let timestamp = Time::from_timestamp_millis(timestamp_millis);
let headers = IoxHeaders::new(
ContentType::Protobuf,
op.meta().span_context().cloned(),
op.namespace().to_owned(),
);
let mut buf = Vec::new();
crate::codec::encode_operation(&self.database_name, &op, &mut buf)?;
let buf_len = buf.len();
let record = Record {
key: Default::default(),
value: buf,
headers: headers
.headers()
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec()))
.collect(),
timestamp: rskafka::time::OffsetDateTime::from_unix_timestamp_nanos(
timestamp_millis as i128 * 1_000_000,
)?,
};
debug!(db_name=%self.database_name, partition=self.sequencer_id, size=buf_len, "writing to kafka");
let kafka_write_size = record.approximate_size();
metadata.push(Metadata {
timestamp,
span_ctx: op.meta().span_context().cloned(),
kafka_write_size,
});
records.push(record);
}
Ok((
records,
Deaggregator {
sequencer_id: self.sequencer_id,
metadata,
tag_to_record: state.tag_to_record,
},
))
}
}
/// Metadata that we carry over for each pushed [`DmlOperation`] so we can return a proper [`DmlMeta`].
#[derive(Debug)]
struct Metadata {
timestamp: Time,
span_ctx: Option<SpanContext>,
kafka_write_size: usize,
}
#[derive(Debug)]
pub struct Deaggregator {
/// Sequencer ID.
sequencer_id: u32,
/// Metadata for every record.
///
/// This is NOT per-tag, use `tag_to_record` to map tags to records first.
metadata: Vec<Metadata>,
/// Maps tags to records.
tag_to_record: Vec<usize>,
}
impl StatusDeaggregator for Deaggregator {
type Status = DmlMeta;
type Tag = Tag;
fn deaggregate(
&self,
input: &[i64],
tag: Self::Tag,
) -> Result<Self::Status, aggregator::Error> {
let record = self.tag_to_record[tag.0];
let offset = input[record];
let md = &self.metadata[record];
Ok(DmlMeta::sequenced(
Sequence::new(self.sequencer_id, offset.try_into()?),
md.timestamp,
md.span_ctx.clone(),
md.kafka_write_size,
))
}
}

View File

@ -4,41 +4,40 @@ use std::{
atomic::{AtomicI64, Ordering},
Arc,
},
time::Duration,
};
use async_trait::async_trait;
use data_types::{sequence::Sequence, write_buffer::WriteBufferCreationConfig};
use dml::{DmlMeta, DmlOperation};
use futures::{FutureExt, StreamExt};
use observability_deps::tracing::debug;
use rskafka::{
client::{
consumer::StreamConsumerBuilder,
error::{Error as RSKafkaError, ProtocolError},
partition::PartitionClient,
ClientBuilder,
},
record::Record,
use rskafka::client::{
consumer::StreamConsumerBuilder,
error::{Error as RSKafkaError, ProtocolError},
partition::PartitionClient,
producer::{BatchProducer, BatchProducerBuilder},
ClientBuilder,
};
use time::{Time, TimeProvider};
use trace::TraceCollector;
use crate::{
codec::{ContentType, IoxHeaders},
codec::IoxHeaders,
core::{
FetchHighWatermark, FetchHighWatermarkFut, WriteBufferError, WriteBufferReading,
WriteBufferWriting, WriteStream,
},
};
use self::aggregator::DmlAggregator;
mod aggregator;
type Result<T, E = WriteBufferError> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct RSKafkaProducer {
database_name: String,
time_provider: Arc<dyn TimeProvider>,
// TODO: batched writes
partition_clients: BTreeMap<u32, PartitionClient>,
producers: BTreeMap<u32, BatchProducer<DmlAggregator>>,
}
impl RSKafkaProducer {
@ -47,21 +46,34 @@ impl RSKafkaProducer {
database_name: String,
creation_config: Option<&WriteBufferCreationConfig>,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Self> {
let partition_clients = setup_topic(conn, database_name.clone(), creation_config).await?;
let producers = partition_clients
.into_iter()
.map(|(sequencer_id, partition_client)| {
let producer = BatchProducerBuilder::new(Arc::new(partition_client))
.with_linger(Duration::from_millis(100))
.build(DmlAggregator::new(
trace_collector.clone(),
database_name.clone(),
1024 * 500,
sequencer_id,
Arc::clone(&time_provider),
));
Ok(Self {
database_name,
time_provider,
partition_clients,
})
(sequencer_id, producer)
})
.collect();
Ok(Self { producers })
}
}
#[async_trait]
impl WriteBufferWriting for RSKafkaProducer {
fn sequencer_ids(&self) -> BTreeSet<u32> {
self.partition_clients.keys().copied().collect()
self.producers.keys().copied().collect()
}
async fn store_operation(
@ -69,59 +81,15 @@ impl WriteBufferWriting for RSKafkaProducer {
sequencer_id: u32,
operation: &DmlOperation,
) -> Result<DmlMeta, WriteBufferError> {
let partition_client = self
.partition_clients
let producer = self
.producers
.get(&sequencer_id)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown partition: {}", sequencer_id).into()
})?;
// truncate milliseconds from timestamps because that's what Kafka supports
let now = operation
.meta()
.producer_ts()
.unwrap_or_else(|| self.time_provider.now());
let timestamp_millis = now.date_time().timestamp_millis();
let timestamp = Time::from_timestamp_millis(timestamp_millis);
let headers = IoxHeaders::new(
ContentType::Protobuf,
operation.meta().span_context().cloned(),
operation.namespace().to_string(),
);
let mut buf = Vec::new();
crate::codec::encode_operation(&self.database_name, operation, &mut buf)?;
let buf_len = buf.len();
let record = Record {
key: Default::default(),
value: buf,
headers: headers
.headers()
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec()))
.collect(),
timestamp: rskafka::time::OffsetDateTime::from_unix_timestamp_nanos(
timestamp_millis as i128 * 1_000_000,
)?,
};
let kafka_write_size = record.approximate_size();
debug!(db_name=%self.database_name, partition=sequencer_id, size=buf_len, "writing to kafka");
let offsets = partition_client.produce(vec![record]).await?;
let offset = offsets[0];
debug!(db_name=%self.database_name, %offset, partition=sequencer_id, size=buf_len, "wrote to kafka");
Ok(DmlMeta::sequenced(
Sequence::new(sequencer_id, offset.try_into()?),
timestamp,
operation.meta().span_context().cloned(),
kafka_write_size,
))
// TODO: don't clone!
Ok(producer.produce(operation.clone()).await?)
}
fn type_name(&self) -> &'static str {
@ -294,7 +262,12 @@ async fn setup_topic(
// create topic
if let Some(creation_config) = creation_config {
match controller_client
.create_topic(&database_name, creation_config.n_sequencers.get() as i32, 1)
.create_topic(
&database_name,
creation_config.n_sequencers.get() as i32,
1,
5_000,
)
.await
{
Ok(_) => {}
@ -316,11 +289,16 @@ async fn setup_topic(
mod tests {
use std::num::NonZeroU32;
use data_types::{delete_predicate::DeletePredicate, timestamp::TimestampRange};
use dml::{DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use trace::RingBufferTraceCollector;
use trace::{ctx::SpanContext, RingBufferTraceCollector};
use crate::{
core::test_utils::{perform_generic_tests, random_topic_name, TestAdapter, TestContext},
core::test_utils::{
assert_span_context_eq_or_linked, map_pop_first, perform_generic_tests,
random_topic_name, set_pop_first, TestAdapter, TestContext,
},
maybe_skip_kafka_integration,
};
@ -350,6 +328,7 @@ mod tests {
database_name: random_topic_name(),
n_sequencers,
time_provider,
trace_collector: Arc::new(RingBufferTraceCollector::new(100)),
}
}
}
@ -359,6 +338,7 @@ mod tests {
database_name: String,
n_sequencers: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Arc<RingBufferTraceCollector>,
}
impl RSKafkaTestContext {
@ -382,21 +362,24 @@ mod tests {
self.database_name.clone(),
self.creation_config(creation_config).as_ref(),
Arc::clone(&self.time_provider),
Some(self.trace_collector() as Arc<_>),
)
.await
}
async fn reading(&self, creation_config: bool) -> Result<Self::Reading, WriteBufferError> {
let collector: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
RSKafkaConsumer::new(
self.conn.clone(),
self.database_name.clone(),
self.creation_config(creation_config).as_ref(),
Some(collector),
Some(self.trace_collector() as Arc<_>),
)
.await
}
fn trace_collector(&self) -> Arc<RingBufferTraceCollector> {
Arc::clone(&self.trace_collector)
}
}
#[tokio::test]
@ -434,4 +417,158 @@ mod tests {
while jobs.try_next().await.unwrap().is_some() {}
}
#[tokio::test]
async fn test_batching() {
let conn = maybe_skip_kafka_integration!();
let adapter = RSKafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let trace_collector = ctx.trace_collector();
let producer = ctx.writing(true).await.unwrap();
let sequencer_id = set_pop_first(&mut producer.sequencer_ids()).unwrap();
let (w1_1, w1_2, w2_1, d1_1, d1_2, w1_3, w1_4, w2_2) = tokio::join!(
// ns1: batch 1
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id),
// ns2: batch 1, part A
write("ns2", &producer, &trace_collector, sequencer_id),
// ns1: batch 2
delete("ns1", &producer, &trace_collector, sequencer_id),
// ns1: batch 3
delete("ns1", &producer, &trace_collector, sequencer_id),
// ns1: batch 4
write("ns1", &producer, &trace_collector, sequencer_id),
write("ns1", &producer, &trace_collector, sequencer_id),
// ns2: batch 1, part B
write("ns2", &producer, &trace_collector, sequencer_id),
);
// ensure that write operations were fused
assert_eq!(w1_1.sequence().unwrap(), w1_2.sequence().unwrap());
assert_ne!(w1_2.sequence().unwrap(), d1_1.sequence().unwrap());
assert_ne!(d1_1.sequence().unwrap(), d1_2.sequence().unwrap());
assert_ne!(d1_2.sequence().unwrap(), w1_3.sequence().unwrap());
assert_eq!(w1_3.sequence().unwrap(), w1_4.sequence().unwrap());
assert_ne!(w1_4.sequence().unwrap(), w1_1.sequence().unwrap());
assert_ne!(w2_1.sequence().unwrap(), w1_1.sequence().unwrap());
assert_eq!(w2_1.sequence().unwrap(), w2_2.sequence().unwrap());
let mut consumer = ctx.reading(true).await.unwrap();
let mut streams = consumer.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
// get output, note that the write operations were fused
let op_w1_12 = stream.stream.next().await.unwrap().unwrap();
let op_d1_1 = stream.stream.next().await.unwrap().unwrap();
let op_d1_2 = stream.stream.next().await.unwrap().unwrap();
let op_w1_34 = stream.stream.next().await.unwrap().unwrap();
let op_w2_12 = stream.stream.next().await.unwrap().unwrap();
// ensure that sequence numbers map as expected
assert_eq!(
op_w1_12.meta().sequence().unwrap(),
w1_1.sequence().unwrap(),
);
assert_eq!(
op_w1_12.meta().sequence().unwrap(),
w1_2.sequence().unwrap(),
);
assert_eq!(op_d1_1.meta().sequence().unwrap(), d1_1.sequence().unwrap(),);
assert_eq!(op_d1_2.meta().sequence().unwrap(), d1_2.sequence().unwrap(),);
assert_eq!(
op_w1_34.meta().sequence().unwrap(),
w1_3.sequence().unwrap(),
);
assert_eq!(
op_w1_34.meta().sequence().unwrap(),
w1_4.sequence().unwrap(),
);
assert_eq!(
op_w2_12.meta().sequence().unwrap(),
w2_1.sequence().unwrap(),
);
assert_eq!(
op_w2_12.meta().sequence().unwrap(),
w2_2.sequence().unwrap(),
);
// check tracing span links
assert_span_context_eq_or_linked(
w1_1.span_context().unwrap(),
op_w1_12.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w1_2.span_context().unwrap(),
op_w1_12.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
d1_1.span_context().unwrap(),
op_d1_1.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
d1_2.span_context().unwrap(),
op_d1_2.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w1_3.span_context().unwrap(),
op_w1_34.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w1_4.span_context().unwrap(),
op_w1_34.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w2_1.span_context().unwrap(),
op_w2_12.meta().span_context().unwrap(),
trace_collector.spans(),
);
assert_span_context_eq_or_linked(
w2_2.span_context().unwrap(),
op_w2_12.meta().span_context().unwrap(),
trace_collector.spans(),
);
}
async fn write(
namespace: &str,
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
sequencer_id: u32,
) -> DmlMeta {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let tables = mutable_batch_lp::lines_to_batches("table foo=1", 0).unwrap();
let write = DmlWrite::new(namespace, tables, DmlMeta::unsequenced(Some(span_ctx)));
let op = DmlOperation::Write(write);
producer.store_operation(sequencer_id, &op).await.unwrap()
}
async fn delete(
namespace: &str,
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
sequencer_id: u32,
) -> DmlMeta {
let span_ctx = SpanContext::new(Arc::clone(trace_collector) as Arc<_>);
let op = DmlOperation::Delete(DmlDelete::new(
namespace,
DeletePredicate {
range: TimestampRange::new(0, 1),
exprs: vec![],
},
None,
DmlMeta::unsequenced(Some(span_ctx)),
));
producer.store_operation(sequencer_id, &op).await.unwrap()
}
}