Merge pull request #6689 from influxdata/dom/ingester-rediscovery

fix(router): force rediscovery of nodes
pull/24376/head
Dom 2023-01-24 19:21:17 +00:00 committed by GitHub
commit 39dd455297
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 15 deletions

View File

@ -1,6 +1,12 @@
//! A lazy connector for Tonic gRPC [`Channel`] instances.
use std::{sync::Arc, time::Duration};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use async_trait::async_trait;
use generated_types::influxdata::iox::ingester::v1::{
@ -17,6 +23,10 @@ const RETRY_INTERVAL: Duration = Duration::from_secs(1);
const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
/// How many consecutive errors must be observed before opening a new connection
/// (at most once per [`RETRY_INTERVAL]).
const RECONNECT_ERROR_COUNT: usize = 10;
/// Lazy [`Channel`] connector.
///
/// Connections are attempted in a background thread every [`RETRY_INTERVAL`].
@ -29,6 +39,11 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
pub struct LazyConnector {
addr: Endpoint,
connection: Arc<Mutex<Option<Channel>>>,
/// The number of request errors observed without a single success.
consecutive_errors: Arc<AtomicUsize>,
/// A task that periodically opens a new connection to `addr` when
/// `consecutive_errors` is more than [`RECONNECT_ERROR_COUNT`].
connection_task: JoinHandle<()>,
}
@ -39,10 +54,18 @@ impl LazyConnector {
.connect_timeout(CONNECT_TIMEOUT)
.timeout(REQUEST_TIMEOUT);
let connection = Default::default();
// Drive first connection by setting it above the connection limit.
let consecutive_errors = Arc::new(AtomicUsize::new(RECONNECT_ERROR_COUNT + 1));
Self {
addr: addr.clone(),
connection: Arc::clone(&connection),
connection_task: tokio::spawn(try_connect(addr, connection)),
connection_task: tokio::spawn(try_connect(
addr,
connection,
Arc::clone(&consecutive_errors),
)),
consecutive_errors,
}
}
@ -63,8 +86,16 @@ impl WriteClient for LazyConnector {
let conn =
conn.ok_or_else(|| RpcWriteError::UpstreamNotConnected(self.addr.uri().to_string()))?;
WriteServiceClient::new(conn).write(op).await?;
Ok(())
match WriteServiceClient::new(conn).write(op).await {
Err(e) => {
self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
return Err(e);
}
Ok(_) => {
self.consecutive_errors.store(0, Ordering::Relaxed);
Ok(())
}
}
}
}
@ -74,19 +105,25 @@ impl Drop for LazyConnector {
}
}
async fn try_connect(addr: Endpoint, connection: Arc<Mutex<Option<Channel>>>) {
async fn try_connect(
addr: Endpoint,
connection: Arc<Mutex<Option<Channel>>>,
consecutive_errors: Arc<AtomicUsize>,
) {
loop {
match addr.connect().await {
Ok(v) => {
info!(endpoint = %addr.uri(), "connected to upstream ingester");
*connection.lock() = Some(v);
return;
if consecutive_errors.load(Ordering::Relaxed) > RECONNECT_ERROR_COUNT {
match addr.connect().await {
Ok(v) => {
info!(endpoint = %addr.uri(), "connected to upstream ingester");
*connection.lock() = Some(v);
consecutive_errors.store(0, Ordering::Relaxed);
}
Err(e) => warn!(
endpoint = %addr.uri(),
error=%e,
"failed to connect to upstream ingester"
),
}
Err(e) => warn!(
endpoint = %addr.uri(),
error=%e,
"failed to connect to upstream ingester"
),
}
tokio::time::sleep(RETRY_INTERVAL).await;
}