From aa4eec9939599c6ba1f3fe49d4389cecdd306ad4 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Nov 2022 14:19:32 +0100 Subject: [PATCH 1/2] chore: update rskafka Mostly upstream dependencies updates8678dfe049de05415929ffec7c1be8921bb057f7. --- Cargo.lock | 2 +- write_buffer/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9493cd0b3e..907383279a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4271,7 +4271,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.3.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=4f05f3b42d144f75ddb684a10c746f8afddbf338#4f05f3b42d144f75ddb684a10c746f8afddbf338" +source = "git+https://github.com/influxdata/rskafka.git?rev=8678dfe049de05415929ffec7c1be8921bb057f7#8678dfe049de05415929ffec7c1be8921bb057f7" dependencies = [ "async-socks5", "async-trait", diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index 645cd10386..04aa760252 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -24,7 +24,7 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.12" pin-project = "1.0" prost = "0.11" -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="4f05f3b42d144f75ddb684a10c746f8afddbf338", default-features = false, features = ["compression-snappy", "transport-socks5"] } +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="8678dfe049de05415929ffec7c1be8921bb057f7", default-features = false, features = ["compression-snappy", "transport-socks5"] } schema = { path = "../schema" } tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.7.4" From 254be59856fc1adcb3f2b1441d1757321d37cb7c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Nov 2022 14:22:33 +0100 Subject: [PATCH 2/2] feat: enable ZSTD compression for write buffer payload Closes #5981. --- Cargo.lock | 2 +- write_buffer/Cargo.toml | 2 +- write_buffer/src/kafka/instrumentation.rs | 4 ++-- write_buffer/src/kafka/mod.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 907383279a..4e3652a148 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4283,10 +4283,10 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "rand", - "snap", "thiserror", "tokio", "tracing", + "zstd", ] [[package]] diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index 04aa760252..0b7c97f2ff 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -24,7 +24,7 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.12" pin-project = "1.0" prost = "0.11" -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="8678dfe049de05415929ffec7c1be8921bb057f7", default-features = false, features = ["compression-snappy", "transport-socks5"] } +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="8678dfe049de05415929ffec7c1be8921bb057f7", default-features = false, features = ["compression-zstd", "transport-socks5"] } schema = { path = "../schema" } tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.7.4" diff --git a/write_buffer/src/kafka/instrumentation.rs b/write_buffer/src/kafka/instrumentation.rs index 283dc281fd..73c44785bc 100644 --- a/write_buffer/src/kafka/instrumentation.rs +++ b/write_buffer/src/kafka/instrumentation.rs @@ -211,7 +211,7 @@ mod tests { }; wrapper - .produce(vec![record.clone()], Compression::Snappy) + .produce(vec![record.clone()], Compression::Zstd) .await .expect("produce call should succeed"); @@ -261,7 +261,7 @@ mod tests { .with_time_provider(Arc::clone(&clock)); wrapper - .produce(Vec::new(), Compression::Snappy) + .produce(Vec::new(), Compression::Zstd) .await .expect_err("produce call should fail"); diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 21b5500df8..1d470f2221 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -691,7 +691,7 @@ mod tests { headers: Default::default(), timestamp: rskafka::chrono::Utc::now(), }], - Compression::NoCompression, + Compression::Zstd, ) .await .unwrap();