refactor: retry querier->ingester requests (#5695)
* refactor: retry querier->ingester requests Esp. for InfluxRPC requests that scan multiple tables, it may be that one ingester requests fails. We shall retry that request instead of failing the entire query. * refactor: improve docs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * fix: less foo * docs: remove outdated TODO * test: assert that panic happened Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>pull/24376/head
parent
8b9f4ac447
commit
5e7fd55a42
|
|
@ -48,13 +48,16 @@ impl Default for BackoffConfig {
|
|||
/// Error after giving up retrying.
|
||||
#[derive(Debug, Snafu, PartialEq, Eq)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
pub enum BackoffError {
|
||||
#[snafu(display("Deadline exceeded: {deadline:?}"))]
|
||||
DeadlineExceeded { deadline: Duration },
|
||||
pub enum BackoffError<E>
|
||||
where
|
||||
E: std::error::Error + 'static,
|
||||
{
|
||||
#[snafu(display("Retry did not exceed within {deadline:?}: {source}"))]
|
||||
DeadlineExceeded { deadline: Duration, source: E },
|
||||
}
|
||||
|
||||
/// Backoff result.
|
||||
pub type BackoffResult<T> = Result<T, BackoffError>;
|
||||
pub type BackoffResult<T, E> = Result<T, BackoffError<E>>;
|
||||
|
||||
/// [`Backoff`] can be created from a [`BackoffConfig`]
|
||||
///
|
||||
|
|
@ -108,8 +111,8 @@ impl Backoff {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns the next backoff duration to wait for
|
||||
fn next(&mut self) -> BackoffResult<Duration> {
|
||||
/// Returns the next backoff duration to wait for, if any
|
||||
fn next(&mut self) -> Option<Duration> {
|
||||
let range = self.init_backoff..(self.next_backoff_secs * self.base);
|
||||
|
||||
let rand_backoff = match self.rng.as_mut() {
|
||||
|
|
@ -121,29 +124,25 @@ impl Backoff {
|
|||
self.total += next_backoff;
|
||||
if let Some(deadline) = self.deadline {
|
||||
if self.total >= deadline {
|
||||
return Err(BackoffError::DeadlineExceeded {
|
||||
deadline: Duration::from_secs_f64(deadline),
|
||||
});
|
||||
return None;
|
||||
}
|
||||
}
|
||||
Ok(Duration::from_secs_f64(std::mem::replace(
|
||||
Some(Duration::from_secs_f64(std::mem::replace(
|
||||
&mut self.next_backoff_secs,
|
||||
next_backoff,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Perform an async operation that retries with a backoff
|
||||
// TODO: Currently, this can't fail, but there should be a global maximum timeout that
|
||||
// causes an error if the total time retrying exceeds that amount.
|
||||
pub async fn retry_with_backoff<F, F1, B, E>(
|
||||
&mut self,
|
||||
task_name: &str,
|
||||
mut do_stuff: F,
|
||||
) -> BackoffResult<B>
|
||||
) -> BackoffResult<B, E>
|
||||
where
|
||||
F: (FnMut() -> F1) + Send,
|
||||
F1: std::future::Future<Output = ControlFlow<B, E>> + Send,
|
||||
E: std::error::Error + Send,
|
||||
E: std::error::Error + Send + 'static,
|
||||
{
|
||||
loop {
|
||||
// first execute `F` and then use it, so we can avoid `F: Sync`.
|
||||
|
|
@ -154,7 +153,16 @@ impl Backoff {
|
|||
ControlFlow::Continue(e) => e,
|
||||
};
|
||||
|
||||
let backoff = self.next()?;
|
||||
let backoff = match self.next() {
|
||||
Some(backoff) => backoff,
|
||||
None => {
|
||||
return Err(BackoffError::DeadlineExceeded {
|
||||
deadline: Duration::from_secs_f64(self.deadline.expect("deadline")),
|
||||
source: e,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
e=%e,
|
||||
task_name,
|
||||
|
|
@ -170,11 +178,11 @@ impl Backoff {
|
|||
&mut self,
|
||||
task_name: &str,
|
||||
mut do_stuff: F,
|
||||
) -> BackoffResult<B>
|
||||
) -> BackoffResult<B, E>
|
||||
where
|
||||
F: (FnMut() -> F1) + Send,
|
||||
F1: std::future::Future<Output = Result<B, E>> + Send,
|
||||
E: std::error::Error + Send,
|
||||
E: std::error::Error + Send + 'static,
|
||||
{
|
||||
self.retry_with_backoff(task_name, move || {
|
||||
// first execute `F` and then use it, so we can avoid `F: Sync`.
|
||||
|
|
@ -249,7 +257,6 @@ mod tests {
|
|||
},
|
||||
Some(rng),
|
||||
);
|
||||
let err = backoff.next().unwrap_err();
|
||||
assert_eq!(err, BackoffError::DeadlineExceeded { deadline });
|
||||
assert_eq!(backoff.next(), None);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
pub(crate) mod influxrpc;
|
||||
mod multi_ingester;
|
||||
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use assert_cmd::Command;
|
||||
use futures::FutureExt;
|
||||
use predicates::prelude::*;
|
||||
use test_helpers::assert_contains;
|
||||
use test_helpers_end_to_end::{
|
||||
maybe_skip_integration, try_run_query, MiniCluster, Step, StepTest, StepTestState, TestConfig,
|
||||
maybe_skip_integration, run_query, MiniCluster, Step, StepTest, StepTestState, TestConfig,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -228,8 +230,9 @@ async fn ingester_panic() {
|
|||
// Set up the cluster ====================================
|
||||
let router_config = TestConfig::new_router(&database_url);
|
||||
// can't use standard mini cluster here as we setup the querier to panic
|
||||
let ingester_config =
|
||||
TestConfig::new_ingester(&router_config).with_ingester_flight_do_get_panic(2);
|
||||
let ingester_config = TestConfig::new_ingester(&router_config)
|
||||
.with_ingester_flight_do_get_panic(2)
|
||||
.with_ingester_persist_memory_threshold(1_000_000);
|
||||
let querier_config = TestConfig::new_querier(&ingester_config).with_json_logs();
|
||||
let mut cluster = MiniCluster::new()
|
||||
.with_router(router_config)
|
||||
|
|
@ -251,20 +254,31 @@ async fn ingester_panic() {
|
|||
Step::AssertNotPersisted,
|
||||
Step::Custom(Box::new(move |state: &mut StepTestState| {
|
||||
async move {
|
||||
// SQL query fails, error is propagated
|
||||
// Ingester panics but querier will retry.
|
||||
let sql = format!("select * from {} where tag2='B'", table_name);
|
||||
let err = try_run_query(
|
||||
let batches = run_query(
|
||||
sql,
|
||||
state.cluster().namespace(),
|
||||
state.cluster().querier().querier_grpc_connection(),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
if let influxdb_iox_client::flight::Error::GrpcError(status) = err {
|
||||
assert_eq!(status.code(), tonic::Code::Internal);
|
||||
} else {
|
||||
panic!("wrong error type");
|
||||
}
|
||||
.await;
|
||||
let expected = [
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| tag1 | tag2 | time | val |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
"+------+------+--------------------------------+-----+",
|
||||
];
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
// verify that the ingester panicked
|
||||
let ingester_logs =
|
||||
std::fs::read_to_string(state.cluster().ingester().log_path().await)
|
||||
.unwrap();
|
||||
assert_contains!(
|
||||
ingester_logs,
|
||||
"thread 'tokio-runtime-worker' panicked at 'Panicking in `do_get` for testing purposes.'"
|
||||
);
|
||||
|
||||
// find relevant log line for debugging
|
||||
let querier_logs =
|
||||
|
|
@ -279,36 +293,27 @@ async fn ingester_panic() {
|
|||
let log_data = log_data["fields"].as_object().unwrap();
|
||||
|
||||
// query ingester using debug information
|
||||
for i in 0..2 {
|
||||
let assert = Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(log_data["ingester_address"].as_str().unwrap())
|
||||
.arg("query-ingester")
|
||||
.arg(log_data["namespace"].as_str().unwrap())
|
||||
.arg(log_data["table"].as_str().unwrap())
|
||||
.arg("--columns")
|
||||
.arg(log_data["columns"].as_str().unwrap())
|
||||
.arg("--predicate-base64")
|
||||
.arg(log_data["predicate_binary"].as_str().unwrap())
|
||||
.assert();
|
||||
let assert = Command::cargo_bin("influxdb_iox")
|
||||
.unwrap()
|
||||
.arg("-h")
|
||||
.arg(log_data["ingester_address"].as_str().unwrap())
|
||||
.arg("query-ingester")
|
||||
.arg(log_data["namespace"].as_str().unwrap())
|
||||
.arg(log_data["table"].as_str().unwrap())
|
||||
.arg("--columns")
|
||||
.arg(log_data["columns"].as_str().unwrap())
|
||||
.arg("--predicate-base64")
|
||||
.arg(log_data["predicate_binary"].as_str().unwrap())
|
||||
.assert();
|
||||
|
||||
// The ingester is configured to fail 2 times, once for the original query and once during
|
||||
// debugging. The 2nd debug query should work and should only return data for `tag2=B` (not for
|
||||
// `tag2=C`).
|
||||
if i == 0 {
|
||||
assert.failure().stderr(predicate::str::contains(
|
||||
"Error querying: status: Internal, message: \"Panicking in `do_get` for testing purposes.\"",
|
||||
));
|
||||
} else {
|
||||
assert.success().stdout(
|
||||
predicate::str::contains(
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
)
|
||||
.and(predicate::str::contains("C").not()),
|
||||
);
|
||||
}
|
||||
}
|
||||
// The debug query should work and should only return data for `tag2=B` (not for
|
||||
// `tag2=C`).
|
||||
assert.success().stdout(
|
||||
predicate::str::contains(
|
||||
"| A | B | 1970-01-01T00:00:00.000123456Z | 42 |",
|
||||
)
|
||||
.and(predicate::str::contains("C").not()),
|
||||
);
|
||||
}
|
||||
.boxed()
|
||||
})),
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ use self::{
|
|||
use crate::{cache::CatalogCache, chunk::util::create_basic_summary};
|
||||
use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch};
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig, BackoffError};
|
||||
use client_util::connection;
|
||||
use data_types::{
|
||||
ChunkId, ChunkOrder, IngesterMapping, PartitionId, SequenceNumber, ShardId, ShardIndex,
|
||||
|
|
@ -307,6 +308,7 @@ pub struct IngesterConnectionImpl {
|
|||
flight_client: Arc<dyn FlightClient>,
|
||||
catalog_cache: Arc<CatalogCache>,
|
||||
metrics: Arc<IngesterConnectionMetrics>,
|
||||
backoff_config: BackoffConfig,
|
||||
}
|
||||
|
||||
impl IngesterConnectionImpl {
|
||||
|
|
@ -333,6 +335,7 @@ impl IngesterConnectionImpl {
|
|||
shard_to_ingesters,
|
||||
Arc::new(FlightClientImpl::new()),
|
||||
catalog_cache,
|
||||
BackoffConfig::default(),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
@ -344,6 +347,7 @@ impl IngesterConnectionImpl {
|
|||
shard_to_ingesters: HashMap<ShardIndex, IngesterMapping>,
|
||||
flight_client: Arc<dyn FlightClient>,
|
||||
catalog_cache: Arc<CatalogCache>,
|
||||
backoff_config: BackoffConfig,
|
||||
) -> Self {
|
||||
let unique_ingester_addresses: HashSet<_> = shard_to_ingesters
|
||||
.values()
|
||||
|
|
@ -363,6 +367,7 @@ impl IngesterConnectionImpl {
|
|||
flight_client,
|
||||
catalog_cache,
|
||||
metrics,
|
||||
backoff_config,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -698,25 +703,38 @@ impl IngesterConnection for IngesterConnectionImpl {
|
|||
|
||||
let metrics = Arc::clone(&self.metrics);
|
||||
|
||||
let measured_ingester_request = |ingester_address| {
|
||||
let measured_ingester_request = |ingester_address: Arc<str>| {
|
||||
let metrics = Arc::clone(&metrics);
|
||||
let request = GetPartitionForIngester {
|
||||
flight_client: Arc::clone(&self.flight_client),
|
||||
catalog_cache: Arc::clone(&self.catalog_cache),
|
||||
ingester_address,
|
||||
ingester_address: Arc::clone(&ingester_address),
|
||||
namespace_name: Arc::clone(&namespace_name),
|
||||
table_name: Arc::clone(&table_name),
|
||||
columns: columns.clone(),
|
||||
predicate,
|
||||
expected_schema: Arc::clone(&expected_schema),
|
||||
};
|
||||
let metrics = Arc::clone(&metrics);
|
||||
|
||||
let backoff_config = self.backoff_config.clone();
|
||||
|
||||
// wrap `execute` into an additional future so that we can measure the request time
|
||||
// INFO: create the measurement structure outside of the async block so cancellation is
|
||||
// always measured
|
||||
let measure_me = ObserveIngesterRequest::new(request.clone(), metrics, &span_recorder);
|
||||
async move {
|
||||
let res = execute(request.clone(), measure_me.span_recorder()).await;
|
||||
let span_recorder = measure_me
|
||||
.span_recorder()
|
||||
.child("ingester request (retry block)");
|
||||
|
||||
let res = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("ingester request", move || {
|
||||
let request = request.clone();
|
||||
let span_recorder = span_recorder.child("ingester request (single try)");
|
||||
|
||||
async move { execute(request, &span_recorder).await }
|
||||
})
|
||||
.await;
|
||||
|
||||
match &res {
|
||||
Ok(partitions) => {
|
||||
|
|
@ -774,7 +792,9 @@ impl IngesterConnection for IngesterConnectionImpl {
|
|||
.await
|
||||
.map_err(|e| {
|
||||
span_recorder.error("failed");
|
||||
e
|
||||
match e {
|
||||
BackoffError::DeadlineExceeded { source, .. } => source,
|
||||
}
|
||||
})?
|
||||
// We have a Vec<Vec<..>> flatten to Vec<_>
|
||||
.into_iter()
|
||||
|
|
@ -1194,7 +1214,10 @@ mod tests {
|
|||
use metric::Attributes;
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use schema::{builder::SchemaBuilder, InfluxFieldType};
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::{
|
||||
collections::{BTreeSet, HashMap},
|
||||
time::Duration,
|
||||
};
|
||||
use test_helpers::assert_error;
|
||||
use tokio::{runtime::Handle, sync::Mutex};
|
||||
use trace::{span::SpanStatus, RingBufferTraceCollector};
|
||||
|
|
@ -1243,7 +1266,8 @@ mod tests {
|
|||
)])
|
||||
.await,
|
||||
);
|
||||
let ingester_conn = mock_flight_client.ingester_conn().await;
|
||||
let mut ingester_conn = mock_flight_client.ingester_conn().await;
|
||||
ingester_conn.backoff_config = BackoffConfig::default();
|
||||
let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap();
|
||||
assert!(partitions.is_empty());
|
||||
}
|
||||
|
|
@ -1872,6 +1896,12 @@ mod tests {
|
|||
self.catalog.object_store(),
|
||||
&Handle::current(),
|
||||
)),
|
||||
BackoffConfig {
|
||||
init_backoff: Duration::from_secs(1),
|
||||
max_backoff: Duration::from_secs(2),
|
||||
base: 1.1,
|
||||
deadline: Some(Duration::from_millis(500)),
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ use std::{
|
|||
fmt::Display,
|
||||
fmt::Write,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::runtime::Handle;
|
||||
|
||||
|
|
@ -944,6 +945,12 @@ impl MockIngester {
|
|||
shard_to_ingesters,
|
||||
Arc::new(self),
|
||||
Arc::clone(&catalog_cache),
|
||||
BackoffConfig {
|
||||
init_backoff: Duration::from_secs(1),
|
||||
max_backoff: Duration::from_secs(2),
|
||||
base: 1.1,
|
||||
deadline: Some(Duration::from_millis(500)),
|
||||
},
|
||||
);
|
||||
let ingester_connection = Arc::new(ingester_connection);
|
||||
let sharder = Arc::new(JumpHash::new((0..1).map(ShardIndex::new).map(Arc::new)));
|
||||
|
|
|
|||
Loading…
Reference in New Issue