chore(garbage collector): backoff first connect to objectstore (#7652)
* chore(garbage collector): backoff first connect to objectstore This pr replaces the initial sleep added in #7562 with expential backoff retry of connecting to objectstore. This avoids the issue of waiting the configured sleep which can be quite long in a world where the service is getting redeployed often. * for #7562 * chore: rewrite lister::perform based on pr feedback This commit redoes perform as a do..while loop, putting the call to list from the object store at the top of the loop so the infinite backoff retry and be used for each loop iteration - it might fail on more than the first time! There are 3 selects as there are 3 wait stages and each needs to check for shutdown: os list, processing the list, and sleeping on the poll interval. * chore: hoist cancellation check higher; limit listing to 1000 files Responding to PR feedback. * chore: add error info message * chore: make build. :| * chore: linterpull/24376/head
parent
7ae92caf4b
commit
8609015821
|
@ -2035,6 +2035,7 @@ dependencies = [
|
||||||
name = "garbage_collector"
|
name = "garbage_collector"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"backoff",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap 4.2.4",
|
"clap 4.2.4",
|
||||||
|
|
|
@ -13,6 +13,7 @@ data_types = { path = "../data_types" }
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
humantime = "2.1.0"
|
humantime = "2.1.0"
|
||||||
iox_catalog = { path = "../iox_catalog" }
|
iox_catalog = { path = "../iox_catalog" }
|
||||||
|
backoff = { path = "../backoff" }
|
||||||
object_store = { version = "0.5.6" }
|
object_store = { version = "0.5.6" }
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
snafu = "0.7"
|
snafu = "0.7"
|
||||||
|
|
|
@ -30,7 +30,7 @@ use object_store::DynObjectStore;
|
||||||
use observability_deps::tracing::*;
|
use observability_deps::tracing::*;
|
||||||
use snafu::prelude::*;
|
use snafu::prelude::*;
|
||||||
use std::{fmt::Debug, sync::Arc};
|
use std::{fmt::Debug, sync::Arc};
|
||||||
use tokio::sync::mpsc;
|
use tokio::{select, sync::mpsc};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
/// Logic for listing, checking and deleting files in object storage
|
/// Logic for listing, checking and deleting files in object storage
|
||||||
|
@ -96,12 +96,23 @@ impl GarbageCollector {
|
||||||
let (tx1, rx1) = mpsc::channel(BUFFER_SIZE);
|
let (tx1, rx1) = mpsc::channel(BUFFER_SIZE);
|
||||||
let (tx2, rx2) = mpsc::channel(BUFFER_SIZE);
|
let (tx2, rx2) = mpsc::channel(BUFFER_SIZE);
|
||||||
|
|
||||||
let os_lister = tokio::spawn(os_lister::perform(
|
let sdt = shutdown.clone();
|
||||||
shutdown.clone(),
|
let osa = Arc::clone(&object_store);
|
||||||
Arc::clone(&object_store),
|
|
||||||
tx1,
|
let os_lister = tokio::spawn(async move {
|
||||||
sub_config.objectstore_sleep_interval_minutes,
|
select! {
|
||||||
));
|
ret = os_lister::perform(
|
||||||
|
osa,
|
||||||
|
tx1,
|
||||||
|
sub_config.objectstore_sleep_interval_minutes,
|
||||||
|
) => {
|
||||||
|
ret
|
||||||
|
},
|
||||||
|
_ = sdt.cancelled() => {
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
});
|
||||||
let os_checker = tokio::spawn(os_checker::perform(
|
let os_checker = tokio::spawn(os_checker::perform(
|
||||||
shutdown.clone(),
|
shutdown.clone(),
|
||||||
Arc::clone(&catalog),
|
Arc::clone(&catalog),
|
||||||
|
|
|
@ -1,52 +1,53 @@
|
||||||
|
use backoff::*;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
use object_store::{DynObjectStore, ObjectMeta};
|
use object_store::{DynObjectStore, ObjectMeta};
|
||||||
use observability_deps::tracing::*;
|
use observability_deps::tracing::*;
|
||||||
use snafu::prelude::*;
|
use snafu::prelude::*;
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
use tokio::{select, sync::mpsc, time::sleep};
|
use tokio::{sync::mpsc, time::sleep};
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
|
|
||||||
|
/// perform a object store list, limiting to 1000 files per loop iteration, waiting sleep interval
|
||||||
|
/// per loop.
|
||||||
pub(crate) async fn perform(
|
pub(crate) async fn perform(
|
||||||
shutdown: CancellationToken,
|
|
||||||
object_store: Arc<DynObjectStore>,
|
object_store: Arc<DynObjectStore>,
|
||||||
checker: mpsc::Sender<ObjectMeta>,
|
checker: mpsc::Sender<ObjectMeta>,
|
||||||
sleep_interval_minutes: u64,
|
sleep_interval_minutes: u64,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// sleep poll interval to avoid issues with immediately polling the object store at startup
|
|
||||||
info!("object store polling will start in {sleep_interval_minutes} configured minutes");
|
|
||||||
sleep(Duration::from_secs(60 * sleep_interval_minutes)).await;
|
|
||||||
let mut items = object_store.list(None).await.context(ListingSnafu)?;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
select! {
|
// backoff retry to avoid issues with immediately polling the object store at startup
|
||||||
_ = shutdown.cancelled() => {
|
let mut backoff = Backoff::new(&BackoffConfig::default());
|
||||||
break
|
|
||||||
},
|
let mut items = backoff
|
||||||
item = items.next() => {
|
.retry_all_errors("list_os_files", || object_store.list(None))
|
||||||
match item {
|
.await
|
||||||
Some(item) => {
|
.expect("backoff retries forever");
|
||||||
let item = item.context(MalformedSnafu)?;
|
|
||||||
debug!(location = %item.location, "Object store item");
|
// ignore the result, if it was successful, sleep; if there was an error, sleep still to
|
||||||
checker.send(item).await?;
|
// make sure we don't loop onto the same error repeatedly
|
||||||
}
|
// (todo: maybe improve in the future based on error).
|
||||||
None => {
|
let ret = process_item_list(&mut items, &checker).await;
|
||||||
// sleep for the configured time, then list again and go around the loop
|
match ret {
|
||||||
// again
|
Ok(_) => {}
|
||||||
debug!("end of object store item list");
|
Err(e) => {
|
||||||
select! {
|
info!("error processing items from object store, continuing: {e}")
|
||||||
_ = shutdown.cancelled() => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ = sleep(Duration::from_secs(60 * sleep_interval_minutes)) => {
|
|
||||||
items = object_store.list(None).await.context(ListingSnafu)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(60 * sleep_interval_minutes)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_item_list(
|
||||||
|
items: &mut BoxStream<'_, object_store::Result<ObjectMeta>>,
|
||||||
|
checker: &mpsc::Sender<ObjectMeta>,
|
||||||
|
) -> Result<()> {
|
||||||
|
while let Some(item) = items.take(1000).next().await {
|
||||||
|
let item = item.context(MalformedSnafu)?;
|
||||||
|
debug!(location = %item.location, "Object store item");
|
||||||
|
checker.send(item).await?;
|
||||||
|
}
|
||||||
|
debug!("end of object store item list");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue