diff --git a/Cargo.lock b/Cargo.lock index b545cd515a..ba9d2dfb32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4211,7 +4211,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.3.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=4f13b67b5e4b4ee8ad25c0c8370f1e63a06243e0#4f13b67b5e4b4ee8ad25c0c8370f1e63a06243e0" +source = "git+https://github.com/influxdata/rskafka.git?rev=cb9195a58fdf87be886deee961de08e119d4ca8f#cb9195a58fdf87be886deee961de08e119d4ca8f" dependencies = [ "async-socks5", "async-trait", diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index fb62d38349..e4b72a4ce0 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -22,7 +22,7 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.12" pin-project = "1.0" prost = "0.11" -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="4f13b67b5e4b4ee8ad25c0c8370f1e63a06243e0", default-features = false, features = ["compression-snappy", "transport-socks5"] } +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="cb9195a58fdf87be886deee961de08e119d4ca8f", default-features = false, features = ["compression-snappy", "transport-socks5"] } schema = { path = "../schema" } tokio = { version = "1.20", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.7.3" diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index c9a51b4ebd..bebb68f3c8 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -437,7 +437,7 @@ async fn setup_topic( if let Some(topic) = topics.into_iter().find(|t| t.name == topic_name) { let mut partition_clients = BTreeMap::new(); for partition in topic.partitions { - let c = client.partition_client(&topic_name, partition)?; + let c = client.partition_client(&topic_name, partition).await?; let partition = u32::try_from(partition).map_err(WriteBufferError::invalid_data)?; partition_clients.insert(partition, c); } @@ -633,6 +633,7 @@ mod tests { .await .unwrap() .partition_client(ctx.topic_name.clone(), sequencer_id as i32) + .await .unwrap() .produce( vec![Record {