diff --git a/backoff/src/lib.rs b/backoff/src/lib.rs index c6f31048af..92b0f0387c 100644 --- a/backoff/src/lib.rs +++ b/backoff/src/lib.rs @@ -48,13 +48,16 @@ impl Default for BackoffConfig { /// Error after giving up retrying. #[derive(Debug, Snafu, PartialEq, Eq)] #[allow(missing_copy_implementations, missing_docs)] -pub enum BackoffError { - #[snafu(display("Deadline exceeded: {deadline:?}"))] - DeadlineExceeded { deadline: Duration }, +pub enum BackoffError +where + E: std::error::Error + 'static, +{ + #[snafu(display("Retry did not exceed within {deadline:?}: {source}"))] + DeadlineExceeded { deadline: Duration, source: E }, } /// Backoff result. -pub type BackoffResult = Result; +pub type BackoffResult = Result>; /// [`Backoff`] can be created from a [`BackoffConfig`] /// @@ -108,8 +111,8 @@ impl Backoff { } } - /// Returns the next backoff duration to wait for - fn next(&mut self) -> BackoffResult { + /// Returns the next backoff duration to wait for, if any + fn next(&mut self) -> Option { let range = self.init_backoff..(self.next_backoff_secs * self.base); let rand_backoff = match self.rng.as_mut() { @@ -121,29 +124,25 @@ impl Backoff { self.total += next_backoff; if let Some(deadline) = self.deadline { if self.total >= deadline { - return Err(BackoffError::DeadlineExceeded { - deadline: Duration::from_secs_f64(deadline), - }); + return None; } } - Ok(Duration::from_secs_f64(std::mem::replace( + Some(Duration::from_secs_f64(std::mem::replace( &mut self.next_backoff_secs, next_backoff, ))) } /// Perform an async operation that retries with a backoff - // TODO: Currently, this can't fail, but there should be a global maximum timeout that - // causes an error if the total time retrying exceeds that amount. pub async fn retry_with_backoff( &mut self, task_name: &str, mut do_stuff: F, - ) -> BackoffResult + ) -> BackoffResult where F: (FnMut() -> F1) + Send, F1: std::future::Future> + Send, - E: std::error::Error + Send, + E: std::error::Error + Send + 'static, { loop { // first execute `F` and then use it, so we can avoid `F: Sync`. @@ -154,7 +153,16 @@ impl Backoff { ControlFlow::Continue(e) => e, }; - let backoff = self.next()?; + let backoff = match self.next() { + Some(backoff) => backoff, + None => { + return Err(BackoffError::DeadlineExceeded { + deadline: Duration::from_secs_f64(self.deadline.expect("deadline")), + source: e, + }); + } + }; + info!( e=%e, task_name, @@ -170,11 +178,11 @@ impl Backoff { &mut self, task_name: &str, mut do_stuff: F, - ) -> BackoffResult + ) -> BackoffResult where F: (FnMut() -> F1) + Send, F1: std::future::Future> + Send, - E: std::error::Error + Send, + E: std::error::Error + Send + 'static, { self.retry_with_backoff(task_name, move || { // first execute `F` and then use it, so we can avoid `F: Sync`. @@ -249,7 +257,6 @@ mod tests { }, Some(rng), ); - let err = backoff.next().unwrap_err(); - assert_eq!(err, BackoffError::DeadlineExceeded { deadline }); + assert_eq!(backoff.next(), None); } } diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index 3f05da6814..e0ea4cb026 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -1,11 +1,13 @@ pub(crate) mod influxrpc; mod multi_ingester; +use arrow_util::assert_batches_sorted_eq; use assert_cmd::Command; use futures::FutureExt; use predicates::prelude::*; +use test_helpers::assert_contains; use test_helpers_end_to_end::{ - maybe_skip_integration, try_run_query, MiniCluster, Step, StepTest, StepTestState, TestConfig, + maybe_skip_integration, run_query, MiniCluster, Step, StepTest, StepTestState, TestConfig, }; #[tokio::test] @@ -228,8 +230,9 @@ async fn ingester_panic() { // Set up the cluster ==================================== let router_config = TestConfig::new_router(&database_url); // can't use standard mini cluster here as we setup the querier to panic - let ingester_config = - TestConfig::new_ingester(&router_config).with_ingester_flight_do_get_panic(2); + let ingester_config = TestConfig::new_ingester(&router_config) + .with_ingester_flight_do_get_panic(2) + .with_ingester_persist_memory_threshold(1_000_000); let querier_config = TestConfig::new_querier(&ingester_config).with_json_logs(); let mut cluster = MiniCluster::new() .with_router(router_config) @@ -251,20 +254,31 @@ async fn ingester_panic() { Step::AssertNotPersisted, Step::Custom(Box::new(move |state: &mut StepTestState| { async move { - // SQL query fails, error is propagated + // Ingester panics but querier will retry. let sql = format!("select * from {} where tag2='B'", table_name); - let err = try_run_query( + let batches = run_query( sql, state.cluster().namespace(), state.cluster().querier().querier_grpc_connection(), ) - .await - .unwrap_err(); - if let influxdb_iox_client::flight::Error::GrpcError(status) = err { - assert_eq!(status.code(), tonic::Code::Internal); - } else { - panic!("wrong error type"); - } + .await; + let expected = [ + "+------+------+--------------------------------+-----+", + "| tag1 | tag2 | time | val |", + "+------+------+--------------------------------+-----+", + "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", + "+------+------+--------------------------------+-----+", + ]; + assert_batches_sorted_eq!(&expected, &batches); + + // verify that the ingester panicked + let ingester_logs = + std::fs::read_to_string(state.cluster().ingester().log_path().await) + .unwrap(); + assert_contains!( + ingester_logs, + "thread 'tokio-runtime-worker' panicked at 'Panicking in `do_get` for testing purposes.'" + ); // find relevant log line for debugging let querier_logs = @@ -279,36 +293,27 @@ async fn ingester_panic() { let log_data = log_data["fields"].as_object().unwrap(); // query ingester using debug information - for i in 0..2 { - let assert = Command::cargo_bin("influxdb_iox") - .unwrap() - .arg("-h") - .arg(log_data["ingester_address"].as_str().unwrap()) - .arg("query-ingester") - .arg(log_data["namespace"].as_str().unwrap()) - .arg(log_data["table"].as_str().unwrap()) - .arg("--columns") - .arg(log_data["columns"].as_str().unwrap()) - .arg("--predicate-base64") - .arg(log_data["predicate_binary"].as_str().unwrap()) - .assert(); + let assert = Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("-h") + .arg(log_data["ingester_address"].as_str().unwrap()) + .arg("query-ingester") + .arg(log_data["namespace"].as_str().unwrap()) + .arg(log_data["table"].as_str().unwrap()) + .arg("--columns") + .arg(log_data["columns"].as_str().unwrap()) + .arg("--predicate-base64") + .arg(log_data["predicate_binary"].as_str().unwrap()) + .assert(); - // The ingester is configured to fail 2 times, once for the original query and once during - // debugging. The 2nd debug query should work and should only return data for `tag2=B` (not for - // `tag2=C`). - if i == 0 { - assert.failure().stderr(predicate::str::contains( - "Error querying: status: Internal, message: \"Panicking in `do_get` for testing purposes.\"", - )); - } else { - assert.success().stdout( - predicate::str::contains( - "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", - ) - .and(predicate::str::contains("C").not()), - ); - } - } + // The debug query should work and should only return data for `tag2=B` (not for + // `tag2=C`). + assert.success().stdout( + predicate::str::contains( + "| A | B | 1970-01-01T00:00:00.000123456Z | 42 |", + ) + .and(predicate::str::contains("C").not()), + ); } .boxed() })), diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 8044a94119..8b03625e85 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -5,6 +5,7 @@ use self::{ use crate::{cache::CatalogCache, chunk::util::create_basic_summary}; use arrow::{datatypes::DataType, error::ArrowError, record_batch::RecordBatch}; use async_trait::async_trait; +use backoff::{Backoff, BackoffConfig, BackoffError}; use client_util::connection; use data_types::{ ChunkId, ChunkOrder, IngesterMapping, PartitionId, SequenceNumber, ShardId, ShardIndex, @@ -307,6 +308,7 @@ pub struct IngesterConnectionImpl { flight_client: Arc, catalog_cache: Arc, metrics: Arc, + backoff_config: BackoffConfig, } impl IngesterConnectionImpl { @@ -333,6 +335,7 @@ impl IngesterConnectionImpl { shard_to_ingesters, Arc::new(FlightClientImpl::new()), catalog_cache, + BackoffConfig::default(), ) } @@ -344,6 +347,7 @@ impl IngesterConnectionImpl { shard_to_ingesters: HashMap, flight_client: Arc, catalog_cache: Arc, + backoff_config: BackoffConfig, ) -> Self { let unique_ingester_addresses: HashSet<_> = shard_to_ingesters .values() @@ -363,6 +367,7 @@ impl IngesterConnectionImpl { flight_client, catalog_cache, metrics, + backoff_config, } } } @@ -698,25 +703,38 @@ impl IngesterConnection for IngesterConnectionImpl { let metrics = Arc::clone(&self.metrics); - let measured_ingester_request = |ingester_address| { + let measured_ingester_request = |ingester_address: Arc| { + let metrics = Arc::clone(&metrics); let request = GetPartitionForIngester { flight_client: Arc::clone(&self.flight_client), catalog_cache: Arc::clone(&self.catalog_cache), - ingester_address, + ingester_address: Arc::clone(&ingester_address), namespace_name: Arc::clone(&namespace_name), table_name: Arc::clone(&table_name), columns: columns.clone(), predicate, expected_schema: Arc::clone(&expected_schema), }; - let metrics = Arc::clone(&metrics); + + let backoff_config = self.backoff_config.clone(); // wrap `execute` into an additional future so that we can measure the request time // INFO: create the measurement structure outside of the async block so cancellation is // always measured let measure_me = ObserveIngesterRequest::new(request.clone(), metrics, &span_recorder); async move { - let res = execute(request.clone(), measure_me.span_recorder()).await; + let span_recorder = measure_me + .span_recorder() + .child("ingester request (retry block)"); + + let res = Backoff::new(&backoff_config) + .retry_all_errors("ingester request", move || { + let request = request.clone(); + let span_recorder = span_recorder.child("ingester request (single try)"); + + async move { execute(request, &span_recorder).await } + }) + .await; match &res { Ok(partitions) => { @@ -774,7 +792,9 @@ impl IngesterConnection for IngesterConnectionImpl { .await .map_err(|e| { span_recorder.error("failed"); - e + match e { + BackoffError::DeadlineExceeded { source, .. } => source, + } })? // We have a Vec> flatten to Vec<_> .into_iter() @@ -1194,7 +1214,10 @@ mod tests { use metric::Attributes; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use schema::{builder::SchemaBuilder, InfluxFieldType}; - use std::collections::{BTreeSet, HashMap}; + use std::{ + collections::{BTreeSet, HashMap}, + time::Duration, + }; use test_helpers::assert_error; use tokio::{runtime::Handle, sync::Mutex}; use trace::{span::SpanStatus, RingBufferTraceCollector}; @@ -1243,7 +1266,8 @@ mod tests { )]) .await, ); - let ingester_conn = mock_flight_client.ingester_conn().await; + let mut ingester_conn = mock_flight_client.ingester_conn().await; + ingester_conn.backoff_config = BackoffConfig::default(); let partitions = get_partitions(&ingester_conn, &[1]).await.unwrap(); assert!(partitions.is_empty()); } @@ -1872,6 +1896,12 @@ mod tests { self.catalog.object_store(), &Handle::current(), )), + BackoffConfig { + init_backoff: Duration::from_secs(1), + max_backoff: Duration::from_secs(2), + base: 1.1, + deadline: Some(Duration::from_millis(500)), + }, ) } } diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 31a5431d6e..2c5bef0504 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -39,6 +39,7 @@ use std::{ fmt::Display, fmt::Write, sync::Arc, + time::Duration, }; use tokio::runtime::Handle; @@ -944,6 +945,12 @@ impl MockIngester { shard_to_ingesters, Arc::new(self), Arc::clone(&catalog_cache), + BackoffConfig { + init_backoff: Duration::from_secs(1), + max_backoff: Duration::from_secs(2), + base: 1.1, + deadline: Some(Duration::from_millis(500)), + }, ); let ingester_connection = Arc::new(ingester_connection); let sharder = Arc::new(JumpHash::new((0..1).map(ShardIndex::new).map(Arc::new)));