From d6c6e9a6c7836357e15276ba50bbe8e3593b3432 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 17 Nov 2021 07:19:53 -0500 Subject: [PATCH] fix: Default kafka timeout to be shorter than gRPC timeout (60 sec --> 10 sec) (#3131) * fix: Default kafka timeout to be shorter than gRPC timeout * docs: fix link style --- .../tests/end_to_end_cases/management_api.rs | 32 +++++++++++++++++++ write_buffer/src/kafka.rs | 19 +++++++++-- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index 997b8537df..2af0d86fac 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -8,6 +8,7 @@ use generated_types::{ }; use influxdb_iox_client::{ management::{Client, CreateDatabaseError}, + router::generated_types::{write_buffer_connection, WriteBufferConnection}, write::WriteError, }; use std::{fs::set_permissions, os::unix::fs::PermissionsExt}; @@ -184,6 +185,37 @@ async fn test_create_database_invalid_name() { assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_))); } +#[tokio::test] +async fn test_create_database_invalid_kafka() { + let server_fixture = ServerFixture::create_shared(ServerType::Database).await; + let mut client = server_fixture.management_client(); + + let rules = DatabaseRules { + name: "db_with_bad_kafka_address".into(), + write_buffer_connection: Some(WriteBufferConnection { + direction: write_buffer_connection::Direction::Read.into(), + r#type: "kafka".into(), + connection: "i_am_not_a_kafka_server:1234".into(), + ..Default::default() + }), + ..Default::default() + }; + + let start = Instant::now(); + let err = client + .create_database(rules) + .await + .expect_err("expected request to fail"); + + println!("Failed after {:?}", Instant::now() - start); + + // expect that this error has a useful error related to kafka (not "timeout") + assert_contains!( + err.to_string(), + "error creating write buffer: Meta data fetch error: BrokerTransportFailure" + ); +} + #[tokio::test] async fn test_list_databases() { let server_fixture = ServerFixture::create_shared(ServerType::Database).await; diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 148a52aab9..cb2403149a 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -37,6 +37,16 @@ use crate::{ }, }; +/// Default timeout supplied to rdkafka client for kafka operations. +/// +/// Chosen to be a value less than the default gRPC timeout (30 +/// seconds) so we can detect kafka errors and return them prior to +/// the gRPC requests to IOx timing out. +/// +/// More context in +/// +const KAFKA_OPERATION_TIMEOUT_MS: u64 = 10000; + impl From<&IoxHeaders> for OwnedHeaders { fn from(iox_headers: &IoxHeaders) -> Self { let mut res = Self::new(); @@ -252,7 +262,7 @@ impl WriteBufferReading for KafkaBufferConsumer { consumer_cloned.fetch_watermarks( &database_name, sequencer_id as i32, - Duration::from_secs(60), + Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS), ) }) .await @@ -299,7 +309,7 @@ impl WriteBufferReading for KafkaBufferConsumer { &database_name, sequencer_id as i32, offset, - Duration::from_secs(60), + Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS), ) }) .await @@ -404,7 +414,10 @@ async fn get_partitions( let metadata = tokio::task::spawn_blocking(move || { let probe_consumer: BaseConsumer = cfg.create()?; - probe_consumer.fetch_metadata(Some(&database_name), Duration::from_secs(60)) + probe_consumer.fetch_metadata( + Some(&database_name), + Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS), + ) }) .await .expect("subtask failed")?;