Merge pull request #7562 from influxdata/pjb-17451-gc-s3-connection-refused
fix(garbage_collector): delay initial s3 checker loop, fix dryrunpull/24376/head
commit
4afb2ba2ca
|
@ -136,6 +136,7 @@ impl GarbageCollector {
|
|||
shutdown.clone(),
|
||||
catalog,
|
||||
sub_config.retention_sleep_interval_minutes,
|
||||
sub_config.dry_run,
|
||||
));
|
||||
|
||||
Ok(Self {
|
||||
|
@ -264,7 +265,11 @@ mod tests {
|
|||
async fn deletes_untracked_files_older_than_the_cutoff() {
|
||||
let setup = OldFileSetup::new();
|
||||
|
||||
let config = build_config(setup.data_dir_arg(), []).await;
|
||||
let config = build_config(
|
||||
setup.data_dir_arg(),
|
||||
["--objectstore-sleep-interval-minutes=0"],
|
||||
)
|
||||
.await;
|
||||
tokio::spawn(async {
|
||||
main(config).await.unwrap();
|
||||
});
|
||||
|
|
|
@ -72,7 +72,7 @@ async fn should_delete(
|
|||
parquet_files: &mut dyn ParquetFileRepo,
|
||||
) -> Result<bool> {
|
||||
if cutoff < item.last_modified {
|
||||
info!(
|
||||
debug!(
|
||||
location = %item.location,
|
||||
deleting = false,
|
||||
reason = "too new",
|
||||
|
@ -95,7 +95,7 @@ async fn should_delete(
|
|||
|
||||
if parquet_file.is_some() {
|
||||
// We have a reference to this file; do not delete
|
||||
info!(
|
||||
debug!(
|
||||
location = %item.location,
|
||||
deleting = false,
|
||||
reason = "exists in catalog",
|
||||
|
@ -103,7 +103,7 @@ async fn should_delete(
|
|||
);
|
||||
return Ok(false);
|
||||
} else {
|
||||
info!(
|
||||
debug!(
|
||||
location = %item.location,
|
||||
deleting = true,
|
||||
reason = "not in catalog",
|
||||
|
@ -111,7 +111,7 @@ async fn should_delete(
|
|||
);
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
debug!(
|
||||
location = %item.location,
|
||||
deleting = true,
|
||||
uuid,
|
||||
|
@ -120,7 +120,7 @@ async fn should_delete(
|
|||
);
|
||||
}
|
||||
} else {
|
||||
info!(
|
||||
debug!(
|
||||
location = %item.location,
|
||||
deleting = true,
|
||||
file_name = %file_name.as_ref(),
|
||||
|
|
|
@ -12,6 +12,9 @@ pub(crate) async fn perform(
|
|||
checker: mpsc::Sender<ObjectMeta>,
|
||||
sleep_interval_minutes: u64,
|
||||
) -> Result<()> {
|
||||
// sleep poll interval to avoid issues with immediately polling the object store at startup
|
||||
info!("object store polling will start in {sleep_interval_minutes} configured minutes");
|
||||
sleep(Duration::from_secs(60 * sleep_interval_minutes)).await;
|
||||
let mut items = object_store.list(None).await.context(ListingSnafu)?;
|
||||
|
||||
loop {
|
||||
|
@ -29,6 +32,7 @@ pub(crate) async fn perform(
|
|||
None => {
|
||||
// sleep for the configured time, then list again and go around the loop
|
||||
// again
|
||||
debug!("end of object store item list");
|
||||
select! {
|
||||
_ = shutdown.cancelled() => {
|
||||
break;
|
||||
|
|
|
@ -9,26 +9,27 @@ pub(crate) async fn perform(
|
|||
shutdown: CancellationToken,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
sleep_interval_minutes: u64,
|
||||
dry_run: bool,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
let flagged = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.flag_for_delete_by_retention()
|
||||
.await
|
||||
.context(FlaggingSnafu)?;
|
||||
info!(flagged_count = %flagged.len(), "iox_catalog::flag_for_delete_by_retention()");
|
||||
if !dry_run {
|
||||
let flagged = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.parquet_files()
|
||||
.flag_for_delete_by_retention()
|
||||
.await
|
||||
.context(FlaggingSnafu)?;
|
||||
info!(flagged_count = %flagged.len(), "iox_catalog::flag_for_delete_by_retention()");
|
||||
} else {
|
||||
debug!("dry run enabled for parquet retention flagger");
|
||||
};
|
||||
|
||||
if flagged.is_empty() {
|
||||
select! {
|
||||
_ = shutdown.cancelled() => {
|
||||
break
|
||||
},
|
||||
_ = sleep(Duration::from_secs(60 * sleep_interval_minutes)) => (),
|
||||
}
|
||||
} else if shutdown.is_cancelled() {
|
||||
break;
|
||||
select! {
|
||||
_ = shutdown.cancelled() => {
|
||||
break
|
||||
},
|
||||
_ = sleep(Duration::from_secs(60 * sleep_interval_minutes)) => (),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
|
Loading…
Reference in New Issue