* chore: upgrade rskafka
* refactor: less cloning
* fix: defined behaviour when seeking to an unknown sequence number
The new, defined behavior is: "return an error once and then end the
stream".
Co-authored-by: Edd Robinson <me@edd.io>
Co-authored-by: Edd Robinson <me@edd.io>
For sparse data the PB-encoded data (our Kafka wire format) is way
smaller than the MutableBatch (up to a factor 20). So lets use this one
to estimate the size during batching.
Prod has a larger max msg. size for Kafka (10MB instead of 1MB), but
currently we're unable to wire all the write buffer configs through. As
a quick fix lets hard code the config. This however breaks the write
buffer when running under default Kafka (1MB), so we should reverse this
(tracked under #3723).
When creating a new aggregation span, you MUST NOT just create a new
random span context and put its child span into a span recorder, because
the then only the child will be reported to the trace collector. Instead
create a new root span w/o any parent directly.
This makes jaeger slightly more happy and it won't complain about broken
spans anymore.
* refactor: improve writer buffer consumer interface
The change looks huge but is actually rather simple. To
understand the interface change, let me first explain what we want:
- be able to fetch watermarks for any sequencer
- have streams:
- each streams tracks a sequencer and has an offset state (no read
multiplexing)
- we can seek a stream
- seeking and streaming cannot be done at the same time (that would be
weird and likely leads to many bugs both in write buffer and in the
user code)
- ideally we don't need to create streams of all sequencers but can
choose a subset
Before this change we had one mutable consumer struct where you can get
all streams and watermark functions (this mutable-borrows the consumer)
or you can seek a single stream (this also mutable-borrows the
consumer). This is a bit weird for multiple reasons:
- you cannot seek a single stream without dropping all of them
- the mutable-borrow construct makes it really difficult to pass the
streams into separate threads
- the consumer is boxed (because its mutable) which makes it more
difficult to handle in a large-scale application
What this change does is the following:
- you have an immutable consumer (similar to the producer)
- the consumer offers the following methods:
- get the set of sequencer IDs
- get watermark for any sequencer
- get a stream handler (see next point) for any sequencer
- the stream handler captures the stream state (offset) and provides you
a standard `Stream<_>` interface as well as a seek function.
Mutable-borrows ensure that you cannot use both at the same time.
The stream handler provides you the stream via `handler.stream()`. It
doesn't implement `Stream<_>` itself because the way boxing, dynamic
dispatch work, and pinning interact (i.e. I couldn't get it to work
without the indirection).
As a bonus point (which we don't use however) you can now create
multiple streams for the same sequencer and they all have their own
offset.
* fix: review comments
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
All features are now covered by rskafka. This also removes the need to
specify a server ID for write buffer consumers. This was only used for
rdkafka since there we needed to specify a consumer group, even though
we did not use any transactions.
* chore: upgrade rskafka + enable snappy support
* test: ensure that rskafka and rdkakfa work together
Before removing rdkafka ensure that:
- rskafka can consume existing messages produced by rdkafka so we do not
need to drain existing topics
- rdkafka can consume new messages produced by rskafka so we can roll
back
I ran the whole `write_buffer` test suite (including the newly added
tests) using Apache Kafka as well as Redpanda.
* test: ensure we handle consumer offset in error case correctly
* docs: explain test setup
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: Add db_name/namespace to DmlWrite and DmlDelete
This is required for the new ingester to be able to work with the write buffer. The protobuf that gets serialized over Kafka already includes the database name, it just wasn't getting carried through to the marshaled Dml operation.
* fix: database != namespace, propagation through write buffer
Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
- Use a more standard way to setup the tracing subsystem (as described
in tracing-subscriber docs)
- Also capture content from `log` crate
- Play nice w/ Rust's libtest message capture
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
* feat: Sequencer wrapper
This type wraps an underlying WriteBufferWriter implementation, tagging
it with a sequencer ID it should use when enqueuing operations to the
buffer.
* feat: mock sharder
Implements a mock Sharder impl that returns pre-configured responses to
shard(), and captures the input to the call.
* feat: sharded write buffer
Implements sharding of ops into an underlying WriteBuffer.
Writes are sharded by some abstract Sharder impl, collated per shard to
maximise the size of each op (and therefore compression efficiency),
converted into a DML operation and then enqueued in parallel to the
underlying WriteBuffer implementation.
Deletes are modelled as being mapped to a single write buffer shard,
which is the case while we support sharding based on the table &
namespace only. Deletes will be extended to support (potentially)
multiple shards when column overrides are implemented.
* refactor: runtime write buffers
Switch from using static dispatch, to using a runtime specified
WriteBufferWriting implementation.
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
Some crates listed features they don't use; other crates ware relying on
feature flags enabled by something else. I tested these changes by
disabling the workspace hack crate and testing each crate.
- for file-based write buffers: Use headers + payload
- for Kafka-based write buffers: Use the estimation that we also use for
other metrics
- as a side effect we can now just use `PartialEq` for more types
Fixes#3186.
With the new rust-rdkafka release (merged in #3234) managing multiple
consumer streams becomes a bit easier. Also we can just reuse consumer
clients for multiple metadata requests. In total that provides:
- use only a single client connection for consumers (we had multiple
connection attempts during startup and one client per stream)
- use only two clients for producers (sadly we need a consumer client to
probe the partitions during startup)
- consumers no longer need to poll the stream to receive statistics
The direction was required when a database could read or write from/to a
write buffer. Now it is clear from the usage context of a write buffer
context which of the two applications is meant (databases read, routers
write) so the direction flag is no longer required.
The existing channel construction could lead to cases where streams
would consume messages, put them into the channel but then when the
stream gets dropped the message would be gone forever. So lets move from
a channel-based implementation to directly invoke the generator future,
so this buffering doesn't occur.
Fixes#3179.
When a Kafka broker pod is recreated (for whatever reason) and gets a
new IP while doing so, the following happened:
1. Old broker pod gets terminated, but is still reachable via DNS and
TCP.
2. rdkafka looses its connection, re-creates it using the old IP. The
TCP connection can be established (this heavily depends on the K8s
network setup), but won't be able to send any messages because the
old broker is already shutting down / dead.
3. New broker gets created w/ new IP (but same DNS name).
4. Somewhat in parallel to step 3: rdkafka gets informed by other
brokers that the topic lost its leader and then that the topic has
the new leader (which has the same identity as the old one). Since
leader changes in Kafka can also happen when brokers are totally
healthy, it doesn't conclude that its TCP connection might be broken
and tries to send messages to the new broker via the old TCP
connection.
5. It takes very long (~130s on my test setup) for the old
rdkafka->broker TCP connection to break. Since
`message.send.max.retries` has a default of `2147483647` rdkafka will
not give up on the application level.
5. rdkafka re-connects, while doing so resolves via DNS the new broker
IP and is happy.
An alternative fix that was tried: Use the `connect` rdkafka callback to
hook into the place where it would issue the UNIX `connect` call. There
we can manipulate the socket. Setting `TCP_USER_TIMEOUT` to 5000ms also
solves the issue somewhat, but might have different implications (also
it then takes around 5s to kill the connection). Since this is a more
hackish implementation and somewhat an unofficial way to configure
rdkafka, I decided against it.
Test Setup
==========
```rust
\#[tokio::test]
async fn write_forever() {
maybe_start_logging();
let conn = maybe_skip_kafka_integration!();
let adapter = KafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let writer = ctx.writing(true).await.unwrap();
let lp = "upc user=1 100";
let sequencer_id = set_pop_first(&mut writer.sequencer_ids()).unwrap();
for i in 1.. {
println!("{}", i);
let tables = mutable_batch_lp::lines_to_batches(lp, 0).unwrap();
let write = DmlWrite::new(tables, DmlMeta::unsequenced(None));
let operation = DmlOperation::Write(write);
let res = writer.store_operation(sequencer_id, &operation).await;
dbg!(res);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
```
Make sure to set the the rdkafka `log` config to `all`. Then use KinD,
setup a 3-node Strimzi cluster and start the test binary within the K8s
cluster. You need to start a debug container that is close enough to
your developer system (e.g. an old Debian DOES NOT work if you run
bleeding edge Arch):
```console
$(host) kubectl run -i --tty --rm debug --image=archlinux --restart=Never -n kafka -- bash
````
Then you copy over the test binary the container using [cargo-with](https://github.com/cbourjau/cargo-with):
```console
$(host) cargo with 'kubectl cp {bin} kafka/debug:/foo' -- test -p write_buffe
````
Within the container shell that you've just created, start the
forever-running test (make sure to set `KAFKA_CONNECT` according to your
Strimzi setup!):
```console
$(container) TEST_INTEGRATION=1 KAFKA_CONNECT=my-cluster-kafka-bootstrap:9092 RUST_BACKTRACE=1 RUST_LOG=debug ./foo write_forever --nocapture
````
The test should run and tell you that it is delivering messages. It also
tells you within the debug logs which broker it sends the messages to.
Now you need to kill the broker (in my example it was `my-cluster-kafka-1`):
```console
$(host) kubectl -n kafka delete pod my-cluster-kafka-1
````
The test should now stop to deliver messages and should error. Without
this patch it might take over 100s for it to recover even after the
deleted pod was re-created. With this patch it quickly is able to
deliver data again after the broker comes back online.
Fixes#3030.
* fix: Add tokio rt-multi-thread feature so cargo test -p client_util compiles
* fix: Alphabetize dependencies
* fix: Add the data_types_conversions feature to get tests passing
* fix: Remove dev dependencies already listed under normal dependencies
* fix: Make sure the workspace is using the new resolver