fix: Default kafka timeout to be shorter than gRPC timeout (60 sec --> 10 sec) (#3131)

* fix: Default kafka timeout to be shorter than gRPC timeout

* docs: fix link style
pull/24376/head
Andrew Lamb 2021-11-17 07:19:53 -05:00 committed by GitHub
parent a87a320eb3
commit d6c6e9a6c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 3 deletions

View File

@ -8,6 +8,7 @@ use generated_types::{
}; };
use influxdb_iox_client::{ use influxdb_iox_client::{
management::{Client, CreateDatabaseError}, management::{Client, CreateDatabaseError},
router::generated_types::{write_buffer_connection, WriteBufferConnection},
write::WriteError, write::WriteError,
}; };
use std::{fs::set_permissions, os::unix::fs::PermissionsExt}; use std::{fs::set_permissions, os::unix::fs::PermissionsExt};
@ -184,6 +185,37 @@ async fn test_create_database_invalid_name() {
assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_))); assert!(matches!(dbg!(err), CreateDatabaseError::InvalidArgument(_)));
} }
#[tokio::test]
async fn test_create_database_invalid_kafka() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut client = server_fixture.management_client();
let rules = DatabaseRules {
name: "db_with_bad_kafka_address".into(),
write_buffer_connection: Some(WriteBufferConnection {
direction: write_buffer_connection::Direction::Read.into(),
r#type: "kafka".into(),
connection: "i_am_not_a_kafka_server:1234".into(),
..Default::default()
}),
..Default::default()
};
let start = Instant::now();
let err = client
.create_database(rules)
.await
.expect_err("expected request to fail");
println!("Failed after {:?}", Instant::now() - start);
// expect that this error has a useful error related to kafka (not "timeout")
assert_contains!(
err.to_string(),
"error creating write buffer: Meta data fetch error: BrokerTransportFailure"
);
}
#[tokio::test] #[tokio::test]
async fn test_list_databases() { async fn test_list_databases() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await; let server_fixture = ServerFixture::create_shared(ServerType::Database).await;

View File

@ -37,6 +37,16 @@ use crate::{
}, },
}; };
/// Default timeout supplied to rdkafka client for kafka operations.
///
/// Chosen to be a value less than the default gRPC timeout (30
/// seconds) so we can detect kafka errors and return them prior to
/// the gRPC requests to IOx timing out.
///
/// More context in
/// <https://github.com/influxdata/influxdb_iox/issues/3029>
const KAFKA_OPERATION_TIMEOUT_MS: u64 = 10000;
impl From<&IoxHeaders> for OwnedHeaders { impl From<&IoxHeaders> for OwnedHeaders {
fn from(iox_headers: &IoxHeaders) -> Self { fn from(iox_headers: &IoxHeaders) -> Self {
let mut res = Self::new(); let mut res = Self::new();
@ -252,7 +262,7 @@ impl WriteBufferReading for KafkaBufferConsumer {
consumer_cloned.fetch_watermarks( consumer_cloned.fetch_watermarks(
&database_name, &database_name,
sequencer_id as i32, sequencer_id as i32,
Duration::from_secs(60), Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS),
) )
}) })
.await .await
@ -299,7 +309,7 @@ impl WriteBufferReading for KafkaBufferConsumer {
&database_name, &database_name,
sequencer_id as i32, sequencer_id as i32,
offset, offset,
Duration::from_secs(60), Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS),
) )
}) })
.await .await
@ -404,7 +414,10 @@ async fn get_partitions(
let metadata = tokio::task::spawn_blocking(move || { let metadata = tokio::task::spawn_blocking(move || {
let probe_consumer: BaseConsumer = cfg.create()?; let probe_consumer: BaseConsumer = cfg.create()?;
probe_consumer.fetch_metadata(Some(&database_name), Duration::from_secs(60)) probe_consumer.fetch_metadata(
Some(&database_name),
Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS),
)
}) })
.await .await
.expect("subtask failed")?; .expect("subtask failed")?;