Merge pull request #5576 from influxdata/dom/rskafka-bump

perf: use rskafka producer optimisations
pull/24376/head
kodiakhq[bot] 2022-09-07 12:10:34 +00:00 committed by GitHub
commit 2f2d09a4b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 19 additions and 37 deletions

25
Cargo.lock generated
View File

@ -521,7 +521,7 @@ dependencies = [
"num-integer",
"num-traits",
"serde",
"time 0.1.44",
"time",
"wasm-bindgen",
"winapi",
]
@ -3062,15 +3062,6 @@ dependencies = [
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.29.0"
@ -4195,11 +4186,12 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.3.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=59295beeae2106c2536008065e171dd88fd1c64e#59295beeae2106c2536008065e171dd88fd1c64e"
source = "git+https://github.com/influxdata/rskafka.git?rev=3208e4742f08048bbab4e8fc4e0a775507fe3e66#3208e4742f08048bbab4e8fc4e0a775507fe3e66"
dependencies = [
"async-socks5",
"async-trait",
"bytes",
"chrono",
"crc32c",
"futures",
"integer-encoding 3.0.4",
@ -4208,7 +4200,6 @@ dependencies = [
"rand",
"snap",
"thiserror",
"time 0.3.14",
"tokio",
"tracing",
]
@ -5105,16 +5096,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c3f9a28b618c3a6b9251b6908e9c99e04b9e5c02e6581ccbb67d59c34ef7f9b"
dependencies = [
"libc",
"num_threads",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"

View File

@ -22,7 +22,7 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
pin-project = "1.0"
prost = "0.11"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="59295beeae2106c2536008065e171dd88fd1c64e", default-features = false, features = ["compression-snappy", "transport-socks5"] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3208e4742f08048bbab4e8fc4e0a775507fe3e66", default-features = false, features = ["compression-snappy", "transport-socks5"] }
schema = { path = "../schema" }
tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.7.3"

View File

@ -152,7 +152,7 @@ mod tests {
use iox_time::Time;
use metric::Metric;
use parking_lot::Mutex;
use rskafka::time::OffsetDateTime;
use rskafka::chrono::{self, Utc};
use super::*;
@ -207,7 +207,7 @@ mod tests {
key: Some("bananas".into()),
value: None,
headers: Default::default(),
timestamp: OffsetDateTime::UNIX_EPOCH,
timestamp: chrono::DateTime::<Utc>::MIN_UTC,
};
wrapper

View File

@ -25,7 +25,7 @@ use rskafka::{
client::{
consumer::{StartOffset, StreamConsumerBuilder},
error::{Error as RSKafkaError, ProtocolError, RequestContext, ServerErrorResponse},
partition::{OffsetAt, PartitionClient},
partition::{OffsetAt, PartitionClient, UnknownTopicHandling},
producer::{BatchProducer, BatchProducerBuilder},
ClientBuilder,
},
@ -189,10 +189,7 @@ async fn try_decode(
sequence_number: SequenceNumber::new(record.offset),
};
let timestamp_nanos = i64::try_from(record.record.timestamp.unix_timestamp_nanos())
.map_err(WriteBufferError::invalid_data)?;
let timestamp = Time::from_timestamp_nanos(timestamp_nanos);
let timestamp = Time::from_date_time(record.record.timestamp);
let value = record
.record
@ -478,7 +475,9 @@ async fn setup_topic(
let topic_name = topic_name.clone();
async move {
let shard_index = ShardIndex::new(p);
let c = client_ref.partition_client(&topic_name, p).await?;
let c = client_ref
.partition_client(&topic_name, p, UnknownTopicHandling::Error)
.await?;
Result::<_, WriteBufferError>::Ok((shard_index, c))
}
}),
@ -689,7 +688,11 @@ mod tests {
.build()
.await
.unwrap()
.partition_client(ctx.topic_name.clone(), shard_index.get())
.partition_client(
ctx.topic_name.clone(),
shard_index.get(),
UnknownTopicHandling::Retry,
)
.await
.unwrap()
.produce(
@ -697,7 +700,7 @@ mod tests {
key: None,
value: None,
headers: Default::default(),
timestamp: rskafka::time::OffsetDateTime::now_utc(),
timestamp: rskafka::chrono::Utc::now(),
}],
Compression::NoCompression,
)

View File

@ -97,9 +97,7 @@ impl RecordAggregator {
.headers()
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec()))
.collect(),
timestamp: rskafka::time::OffsetDateTime::from_unix_timestamp_nanos(
now.date_time().timestamp_nanos() as i128,
)?,
timestamp: now.date_time(),
};
Ok((record, now))
@ -284,7 +282,7 @@ mod tests {
Vec::<u8>::from(NAMESPACE),
);
assert!(record.headers.get(HEADER_TRACE_CONTEXT).is_some());
assert_eq!(record.timestamp.unix_timestamp(), 1659990497);
assert_eq!(record.timestamp.timestamp(), 1659990497);
// Extract the DmlMeta from the de-aggregator
let got = deagg