Merge pull request #5311 from influxdata/dom/instrument-kafka-produce

build: bump rskafka to latest
pull/24376/head
kodiakhq[bot] 2022-08-04 15:20:45 +00:00 committed by GitHub
commit 96419b78e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 12 additions and 8 deletions

View File

@ -196,7 +196,7 @@ jobs:
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
docker:
- image: quay.io/influxdb/rust:ci
- image: vectorized/redpanda:v21.9.2
- image: vectorized/redpanda:v22.1.5
command: redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M
- image: postgres
environment:

4
Cargo.lock generated
View File

@ -4246,8 +4246,8 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.2.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=bb5c9c753b9595ea85dad65d9488859ae89c8456#bb5c9c753b9595ea85dad65d9488859ae89c8456"
version = "0.3.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=e34f6f485db9256a7614220cabea86f7c44f5eb6#e34f6f485db9256a7614220cabea86f7c44f5eb6"
dependencies = [
"async-socks5",
"async-trait",

View File

@ -22,7 +22,7 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
pin-project = "1.0"
prost = "0.10"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="bb5c9c753b9595ea85dad65d9488859ae89c8456", default-features = false, features = ["compression-snappy", "transport-socks5"] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="e34f6f485db9256a7614220cabea86f7c44f5eb6", default-features = false, features = ["compression-snappy", "transport-socks5"] }
schema = { path = "../schema" }
tokio = { version = "1.20", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.7.3"

View File

@ -257,9 +257,10 @@ impl WriteBufferStreamHandler for RSKafkaStreamHandler {
Err(e) => {
terminated.store(true, Ordering::SeqCst);
let kind = match e {
RSKafkaError::ServerError(ProtocolError::OffsetOutOfRange, _) => {
WriteBufferErrorKind::UnknownSequenceNumber
}
RSKafkaError::ServerError {
protocol_error: ProtocolError::OffsetOutOfRange,
..
} => WriteBufferErrorKind::UnknownSequenceNumber,
_ => WriteBufferErrorKind::Unknown,
};
return Err(WriteBufferError::new(kind, e));
@ -439,7 +440,10 @@ async fn setup_topic(
{
Ok(_) => {}
// race condition between check and creation action, that's OK
Err(RSKafkaError::ServerError(ProtocolError::TopicAlreadyExists, _)) => {}
Err(RSKafkaError::ServerError {
protocol_error: ProtocolError::TopicAlreadyExists,
..
}) => {}
Err(e) => {
return Err(e.into());
}