Some crates listed features they don't use; other crates ware relying on
feature flags enabled by something else. I tested these changes by
disabling the workspace hack crate and testing each crate.
- for file-based write buffers: Use headers + payload
- for Kafka-based write buffers: Use the estimation that we also use for
other metrics
- as a side effect we can now just use `PartialEq` for more types
Fixes#3186.
With the new rust-rdkafka release (merged in #3234) managing multiple
consumer streams becomes a bit easier. Also we can just reuse consumer
clients for multiple metadata requests. In total that provides:
- use only a single client connection for consumers (we had multiple
connection attempts during startup and one client per stream)
- use only two clients for producers (sadly we need a consumer client to
probe the partitions during startup)
- consumers no longer need to poll the stream to receive statistics
The direction was required when a database could read or write from/to a
write buffer. Now it is clear from the usage context of a write buffer
context which of the two applications is meant (databases read, routers
write) so the direction flag is no longer required.
The existing channel construction could lead to cases where streams
would consume messages, put them into the channel but then when the
stream gets dropped the message would be gone forever. So lets move from
a channel-based implementation to directly invoke the generator future,
so this buffering doesn't occur.
Fixes#3179.
When a Kafka broker pod is recreated (for whatever reason) and gets a
new IP while doing so, the following happened:
1. Old broker pod gets terminated, but is still reachable via DNS and
TCP.
2. rdkafka looses its connection, re-creates it using the old IP. The
TCP connection can be established (this heavily depends on the K8s
network setup), but won't be able to send any messages because the
old broker is already shutting down / dead.
3. New broker gets created w/ new IP (but same DNS name).
4. Somewhat in parallel to step 3: rdkafka gets informed by other
brokers that the topic lost its leader and then that the topic has
the new leader (which has the same identity as the old one). Since
leader changes in Kafka can also happen when brokers are totally
healthy, it doesn't conclude that its TCP connection might be broken
and tries to send messages to the new broker via the old TCP
connection.
5. It takes very long (~130s on my test setup) for the old
rdkafka->broker TCP connection to break. Since
`message.send.max.retries` has a default of `2147483647` rdkafka will
not give up on the application level.
5. rdkafka re-connects, while doing so resolves via DNS the new broker
IP and is happy.
An alternative fix that was tried: Use the `connect` rdkafka callback to
hook into the place where it would issue the UNIX `connect` call. There
we can manipulate the socket. Setting `TCP_USER_TIMEOUT` to 5000ms also
solves the issue somewhat, but might have different implications (also
it then takes around 5s to kill the connection). Since this is a more
hackish implementation and somewhat an unofficial way to configure
rdkafka, I decided against it.
Test Setup
==========
```rust
\#[tokio::test]
async fn write_forever() {
maybe_start_logging();
let conn = maybe_skip_kafka_integration!();
let adapter = KafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let writer = ctx.writing(true).await.unwrap();
let lp = "upc user=1 100";
let sequencer_id = set_pop_first(&mut writer.sequencer_ids()).unwrap();
for i in 1.. {
println!("{}", i);
let tables = mutable_batch_lp::lines_to_batches(lp, 0).unwrap();
let write = DmlWrite::new(tables, DmlMeta::unsequenced(None));
let operation = DmlOperation::Write(write);
let res = writer.store_operation(sequencer_id, &operation).await;
dbg!(res);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
```
Make sure to set the the rdkafka `log` config to `all`. Then use KinD,
setup a 3-node Strimzi cluster and start the test binary within the K8s
cluster. You need to start a debug container that is close enough to
your developer system (e.g. an old Debian DOES NOT work if you run
bleeding edge Arch):
```console
$(host) kubectl run -i --tty --rm debug --image=archlinux --restart=Never -n kafka -- bash
````
Then you copy over the test binary the container using [cargo-with](https://github.com/cbourjau/cargo-with):
```console
$(host) cargo with 'kubectl cp {bin} kafka/debug:/foo' -- test -p write_buffe
````
Within the container shell that you've just created, start the
forever-running test (make sure to set `KAFKA_CONNECT` according to your
Strimzi setup!):
```console
$(container) TEST_INTEGRATION=1 KAFKA_CONNECT=my-cluster-kafka-bootstrap:9092 RUST_BACKTRACE=1 RUST_LOG=debug ./foo write_forever --nocapture
````
The test should run and tell you that it is delivering messages. It also
tells you within the debug logs which broker it sends the messages to.
Now you need to kill the broker (in my example it was `my-cluster-kafka-1`):
```console
$(host) kubectl -n kafka delete pod my-cluster-kafka-1
````
The test should now stop to deliver messages and should error. Without
this patch it might take over 100s for it to recover even after the
deleted pod was re-created. With this patch it quickly is able to
deliver data again after the broker comes back online.
Fixes#3030.
* fix: Add tokio rt-multi-thread feature so cargo test -p client_util compiles
* fix: Alphabetize dependencies
* fix: Add the data_types_conversions feature to get tests passing
* fix: Remove dev dependencies already listed under normal dependencies
* fix: Make sure the workspace is using the new resolver
* feat: migrate server to DbWrite (#2724)
* chore: print perf log output
* fix: don't suppress CI status code
* chore: review feedback
* fix: don't error on empty line protocol write payloads
* fix: test
* fix: test
- entries should be sorted by the stream, there is no need to sort the
results
- ensure that there are no leftover entries in the stream by asserting
that it is "pending"
This allows:
- different types (instead of guessing through the connection URL)
- sequencer counts (not used yet but will be by #2455)
- extensible configs (e.g. to configure Kafka in a more granular way,
not wired up yet)
- future extensions (since we use a message now instead of a single
string)
**BREAKING: This requires changes for deployed systems / existing DBs!**
* fix: nocache feature code rot
The MBChunk::snapshot code when using the "nocache" option no longer
compiles - this commit updates it to match the not(nocache) code.
* build: use updated broken_intra_doc_links name
The broken_intra_doc_links lint was renamed
rustdoc::broken_intra_doc_links
https://doc.rust-lang.org/rustdoc/lints.html
Advantages are:
- for large DBs w/ many partitions we can ingest data in-parallel
- on top of this change we can implement per-sequencer seeking, which is
required for replay
Given that we've increased the max message size by a factor of 10, also
increase the internal producer queue max size by a factor of 10 to
reduce the number of retries needed to successfully enqueue messages to
Kafka.
Connects to #2007.
This refactors the write buffer a bit for:
- **Testing:** Add generic tests for the Kafka and the mocking
implementation. The same interface can be used easily add new
implementations (e.g. via Redis, filesystem, ...).
- **Partition on Write:** The caller of the writer operation must now
specify the partition/sequencer ID. The implicit partitioning of the
Kafka writer would have lead to broken data since we must never spill
entries w/ the same primary key over multiple partitions. At the
moment we will only use partition 0 but we can easily implement
better logic in the future.
- **Improved Mocking:** The mocked implementation now simulates a system
that feels more real. Especially the handling around multiple streams
and "write while read" has been improved. This will be helpful for
testing and for new features like seeking (during replay). A solid
realistic mock also helps us to ensure that the tests using the mock
do not rely on unrealistic behavior too much.
* feat: Implement write buffer to Parquet snapshotting
This introduces snapshot to the server packages to manage snapshotting. It also introduces a new trait for representing a Partition. There is a very crude API wired up in http_routes for testing purposes. Follow on work will bring the server package into http_routes and rework the snapshot API.