Some crates listed features they don't use; other crates ware relying on
feature flags enabled by something else. I tested these changes by
disabling the workspace hack crate and testing each crate.
- for file-based write buffers: Use headers + payload
- for Kafka-based write buffers: Use the estimation that we also use for
other metrics
- as a side effect we can now just use `PartialEq` for more types
Fixes#3186.
With the new rust-rdkafka release (merged in #3234) managing multiple
consumer streams becomes a bit easier. Also we can just reuse consumer
clients for multiple metadata requests. In total that provides:
- use only a single client connection for consumers (we had multiple
connection attempts during startup and one client per stream)
- use only two clients for producers (sadly we need a consumer client to
probe the partitions during startup)
- consumers no longer need to poll the stream to receive statistics
The direction was required when a database could read or write from/to a
write buffer. Now it is clear from the usage context of a write buffer
context which of the two applications is meant (databases read, routers
write) so the direction flag is no longer required.
The existing channel construction could lead to cases where streams
would consume messages, put them into the channel but then when the
stream gets dropped the message would be gone forever. So lets move from
a channel-based implementation to directly invoke the generator future,
so this buffering doesn't occur.
Fixes#3179.
When a Kafka broker pod is recreated (for whatever reason) and gets a
new IP while doing so, the following happened:
1. Old broker pod gets terminated, but is still reachable via DNS and
TCP.
2. rdkafka looses its connection, re-creates it using the old IP. The
TCP connection can be established (this heavily depends on the K8s
network setup), but won't be able to send any messages because the
old broker is already shutting down / dead.
3. New broker gets created w/ new IP (but same DNS name).
4. Somewhat in parallel to step 3: rdkafka gets informed by other
brokers that the topic lost its leader and then that the topic has
the new leader (which has the same identity as the old one). Since
leader changes in Kafka can also happen when brokers are totally
healthy, it doesn't conclude that its TCP connection might be broken
and tries to send messages to the new broker via the old TCP
connection.
5. It takes very long (~130s on my test setup) for the old
rdkafka->broker TCP connection to break. Since
`message.send.max.retries` has a default of `2147483647` rdkafka will
not give up on the application level.
5. rdkafka re-connects, while doing so resolves via DNS the new broker
IP and is happy.
An alternative fix that was tried: Use the `connect` rdkafka callback to
hook into the place where it would issue the UNIX `connect` call. There
we can manipulate the socket. Setting `TCP_USER_TIMEOUT` to 5000ms also
solves the issue somewhat, but might have different implications (also
it then takes around 5s to kill the connection). Since this is a more
hackish implementation and somewhat an unofficial way to configure
rdkafka, I decided against it.
Test Setup
==========
```rust
\#[tokio::test]
async fn write_forever() {
maybe_start_logging();
let conn = maybe_skip_kafka_integration!();
let adapter = KafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let writer = ctx.writing(true).await.unwrap();
let lp = "upc user=1 100";
let sequencer_id = set_pop_first(&mut writer.sequencer_ids()).unwrap();
for i in 1.. {
println!("{}", i);
let tables = mutable_batch_lp::lines_to_batches(lp, 0).unwrap();
let write = DmlWrite::new(tables, DmlMeta::unsequenced(None));
let operation = DmlOperation::Write(write);
let res = writer.store_operation(sequencer_id, &operation).await;
dbg!(res);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
```
Make sure to set the the rdkafka `log` config to `all`. Then use KinD,
setup a 3-node Strimzi cluster and start the test binary within the K8s
cluster. You need to start a debug container that is close enough to
your developer system (e.g. an old Debian DOES NOT work if you run
bleeding edge Arch):
```console
$(host) kubectl run -i --tty --rm debug --image=archlinux --restart=Never -n kafka -- bash
````
Then you copy over the test binary the container using [cargo-with](https://github.com/cbourjau/cargo-with):
```console
$(host) cargo with 'kubectl cp {bin} kafka/debug:/foo' -- test -p write_buffe
````
Within the container shell that you've just created, start the
forever-running test (make sure to set `KAFKA_CONNECT` according to your
Strimzi setup!):
```console
$(container) TEST_INTEGRATION=1 KAFKA_CONNECT=my-cluster-kafka-bootstrap:9092 RUST_BACKTRACE=1 RUST_LOG=debug ./foo write_forever --nocapture
````
The test should run and tell you that it is delivering messages. It also
tells you within the debug logs which broker it sends the messages to.
Now you need to kill the broker (in my example it was `my-cluster-kafka-1`):
```console
$(host) kubectl -n kafka delete pod my-cluster-kafka-1
````
The test should now stop to deliver messages and should error. Without
this patch it might take over 100s for it to recover even after the
deleted pod was re-created. With this patch it quickly is able to
deliver data again after the broker comes back online.
Fixes#3030.
* fix: Add tokio rt-multi-thread feature so cargo test -p client_util compiles
* fix: Alphabetize dependencies
* fix: Add the data_types_conversions feature to get tests passing
* fix: Remove dev dependencies already listed under normal dependencies
* fix: Make sure the workspace is using the new resolver
* feat: migrate server to DbWrite (#2724)
* chore: print perf log output
* fix: don't suppress CI status code
* chore: review feedback
* fix: don't error on empty line protocol write payloads
* fix: test
* fix: test