diff --git a/Cargo.lock b/Cargo.lock index 67093a6fc5..c717985b2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -521,7 +521,7 @@ dependencies = [ "num-integer", "num-traits", "serde", - "time 0.1.44", + "time", "wasm-bindgen", "winapi", ] @@ -3062,15 +3062,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_threads" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" -dependencies = [ - "libc", -] - [[package]] name = "object" version = "0.29.0" @@ -4195,11 +4186,12 @@ dependencies = [ [[package]] name = "rskafka" version = "0.3.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=59295beeae2106c2536008065e171dd88fd1c64e#59295beeae2106c2536008065e171dd88fd1c64e" +source = "git+https://github.com/influxdata/rskafka.git?rev=3208e4742f08048bbab4e8fc4e0a775507fe3e66#3208e4742f08048bbab4e8fc4e0a775507fe3e66" dependencies = [ "async-socks5", "async-trait", "bytes", + "chrono", "crc32c", "futures", "integer-encoding 3.0.4", @@ -4208,7 +4200,6 @@ dependencies = [ "rand", "snap", "thiserror", - "time 0.3.14", "tokio", "tracing", ] @@ -5105,16 +5096,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "time" -version = "0.3.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c3f9a28b618c3a6b9251b6908e9c99e04b9e5c02e6581ccbb67d59c34ef7f9b" -dependencies = [ - "libc", - "num_threads", -] - [[package]] name = "tiny-keccak" version = "2.0.2" diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index 39a62375ab..a799657291 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -22,7 +22,7 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.12" pin-project = "1.0" prost = "0.11" -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="59295beeae2106c2536008065e171dd88fd1c64e", default-features = false, features = ["compression-snappy", "transport-socks5"] } +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3208e4742f08048bbab4e8fc4e0a775507fe3e66", default-features = false, features = ["compression-snappy", "transport-socks5"] } schema = { path = "../schema" } tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.7.3" diff --git a/write_buffer/src/kafka/instrumentation.rs b/write_buffer/src/kafka/instrumentation.rs index 55547e4d9c..283dc281fd 100644 --- a/write_buffer/src/kafka/instrumentation.rs +++ b/write_buffer/src/kafka/instrumentation.rs @@ -152,7 +152,7 @@ mod tests { use iox_time::Time; use metric::Metric; use parking_lot::Mutex; - use rskafka::time::OffsetDateTime; + use rskafka::chrono::{self, Utc}; use super::*; @@ -207,7 +207,7 @@ mod tests { key: Some("bananas".into()), value: None, headers: Default::default(), - timestamp: OffsetDateTime::UNIX_EPOCH, + timestamp: chrono::DateTime::::MIN_UTC, }; wrapper diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 0d58eb9932..e5903f8905 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -25,7 +25,7 @@ use rskafka::{ client::{ consumer::{StartOffset, StreamConsumerBuilder}, error::{Error as RSKafkaError, ProtocolError, RequestContext, ServerErrorResponse}, - partition::{OffsetAt, PartitionClient}, + partition::{OffsetAt, PartitionClient, UnknownTopicHandling}, producer::{BatchProducer, BatchProducerBuilder}, ClientBuilder, }, @@ -189,10 +189,7 @@ async fn try_decode( sequence_number: SequenceNumber::new(record.offset), }; - let timestamp_nanos = i64::try_from(record.record.timestamp.unix_timestamp_nanos()) - .map_err(WriteBufferError::invalid_data)?; - - let timestamp = Time::from_timestamp_nanos(timestamp_nanos); + let timestamp = Time::from_date_time(record.record.timestamp); let value = record .record @@ -478,7 +475,9 @@ async fn setup_topic( let topic_name = topic_name.clone(); async move { let shard_index = ShardIndex::new(p); - let c = client_ref.partition_client(&topic_name, p).await?; + let c = client_ref + .partition_client(&topic_name, p, UnknownTopicHandling::Error) + .await?; Result::<_, WriteBufferError>::Ok((shard_index, c)) } }), @@ -689,7 +688,11 @@ mod tests { .build() .await .unwrap() - .partition_client(ctx.topic_name.clone(), shard_index.get()) + .partition_client( + ctx.topic_name.clone(), + shard_index.get(), + UnknownTopicHandling::Retry, + ) .await .unwrap() .produce( @@ -697,7 +700,7 @@ mod tests { key: None, value: None, headers: Default::default(), - timestamp: rskafka::time::OffsetDateTime::now_utc(), + timestamp: rskafka::chrono::Utc::now(), }], Compression::NoCompression, ) diff --git a/write_buffer/src/kafka/record_aggregator.rs b/write_buffer/src/kafka/record_aggregator.rs index 11d0264a3f..23cc68aefd 100644 --- a/write_buffer/src/kafka/record_aggregator.rs +++ b/write_buffer/src/kafka/record_aggregator.rs @@ -97,9 +97,7 @@ impl RecordAggregator { .headers() .map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec())) .collect(), - timestamp: rskafka::time::OffsetDateTime::from_unix_timestamp_nanos( - now.date_time().timestamp_nanos() as i128, - )?, + timestamp: now.date_time(), }; Ok((record, now)) @@ -284,7 +282,7 @@ mod tests { Vec::::from(NAMESPACE), ); assert!(record.headers.get(HEADER_TRACE_CONTEXT).is_some()); - assert_eq!(record.timestamp.unix_timestamp(), 1659990497); + assert_eq!(record.timestamp.timestamp(), 1659990497); // Extract the DmlMeta from the de-aggregator let got = deagg