diff --git a/iox_objectstore_garbage_collect/src/checker.rs b/iox_objectstore_garbage_collect/src/checker.rs index eaa651bf1f..53f9cc5a41 100644 --- a/iox_objectstore_garbage_collect/src/checker.rs +++ b/iox_objectstore_garbage_collect/src/checker.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use iox_catalog::interface::ParquetFileRepo; +use iox_catalog::interface::{Catalog, ParquetFileRepo}; use object_store::ObjectMeta; use observability_deps::tracing::*; use snafu::prelude::*; @@ -8,11 +8,6 @@ use tokio::sync::mpsc; #[derive(Debug, Snafu)] pub(crate) enum Error { - #[snafu(display("Could not create the catalog"))] - CreatingCatalog { - source: clap_blocks::catalog_dsn::Error, - }, - #[snafu(display("Expected a file name"))] FileNameMissing, @@ -34,13 +29,11 @@ pub(crate) enum Error { pub(crate) type Result = std::result::Result; pub(crate) async fn perform( - args: Arc, + catalog: Arc, cutoff: DateTime, mut items: mpsc::Receiver, deleter: mpsc::Sender, ) -> Result<()> { - let catalog = args.catalog().await.context(CreatingCatalogSnafu)?; - let mut repositories = catalog.repositories().await; let parquet_files = repositories.parquet_files(); diff --git a/iox_objectstore_garbage_collect/src/deleter.rs b/iox_objectstore_garbage_collect/src/deleter.rs index 8c8b2619e6..3b88c08040 100644 --- a/iox_objectstore_garbage_collect/src/deleter.rs +++ b/iox_objectstore_garbage_collect/src/deleter.rs @@ -1,19 +1,14 @@ -use object_store::ObjectMeta; +use object_store::{DynObjectStore, ObjectMeta}; use observability_deps::tracing::*; use snafu::prelude::*; use std::sync::Arc; use tokio::sync::mpsc; pub(crate) async fn perform( - args: Arc, + object_store: Arc, + dry_run: bool, mut items: mpsc::Receiver, ) -> Result<()> { - let object_store = args - .object_store() - .await - .context(CreatingObjectStoreSnafu)?; - let dry_run = args.dry_run; - while let Some(item) = items.recv().await { let path = item.location; if dry_run { @@ -35,11 +30,6 @@ pub(crate) async fn perform( #[derive(Debug, Snafu)] pub(crate) enum Error { - #[snafu(display("Could not create the object store"))] - CreatingObjectStore { - source: clap_blocks::object_store::ParseError, - }, - #[snafu(display("{path} could not be deleted"))] Deleting { source: object_store::Error, diff --git a/iox_objectstore_garbage_collect/src/lister.rs b/iox_objectstore_garbage_collect/src/lister.rs index bf154d885a..243ba16313 100644 --- a/iox_objectstore_garbage_collect/src/lister.rs +++ b/iox_objectstore_garbage_collect/src/lister.rs @@ -1,19 +1,14 @@ use futures::prelude::*; -use object_store::ObjectMeta; +use object_store::{DynObjectStore, ObjectMeta}; use observability_deps::tracing::*; use snafu::prelude::*; use std::sync::Arc; use tokio::sync::mpsc; pub(crate) async fn perform( - args: Arc, + object_store: Arc, checker: mpsc::Sender, ) -> Result<()> { - let object_store = args - .object_store() - .await - .context(CreatingObjectStoreSnafu)?; - let mut items = object_store.list(None).await.context(ListingSnafu)?; while let Some(item) = items.next().await { @@ -27,11 +22,6 @@ pub(crate) async fn perform( #[derive(Debug, Snafu)] pub(crate) enum Error { - #[snafu(display("Could not create the object store"))] - CreatingObjectStore { - source: clap_blocks::object_store::ParseError, - }, - #[snafu(display("The prefix could not be listed"))] Listing { source: object_store::Error }, diff --git a/iox_objectstore_garbage_collect/src/main.rs b/iox_objectstore_garbage_collect/src/main.rs index 5f69c79a64..8f1548f99c 100644 --- a/iox_objectstore_garbage_collect/src/main.rs +++ b/iox_objectstore_garbage_collect/src/main.rs @@ -64,9 +64,11 @@ fn main() -> ExitCode { #[tokio::main(flavor = "current_thread")] async fn inner_main(args: Args) -> Result<()> { - let args = Arc::new(args); + let object_store = args.object_store()?; + let catalog = args.catalog().await?; - let cutoff = args.cutoff().context(ParsingCutoffSnafu)?; + let dry_run = args.dry_run; + let cutoff = args.cutoff()?; info!( cutoff_arg = %args.cutoff, cutoff_parsed = %cutoff, @@ -75,9 +77,9 @@ async fn inner_main(args: Args) -> Result<()> { let (tx1, rx1) = mpsc::channel(BATCH_SIZE); let (tx2, rx2) = mpsc::channel(BATCH_SIZE); - let lister = tokio::spawn(lister::perform(args.clone(), tx1)); - let checker = tokio::spawn(checker::perform(args.clone(), cutoff, rx1, tx2)); - let deleter = tokio::spawn(deleter::perform(args.clone(), rx2)); + let lister = tokio::spawn(lister::perform(Arc::clone(&object_store), tx1)); + let checker = tokio::spawn(checker::perform(catalog, cutoff, rx1, tx2)); + let deleter = tokio::spawn(deleter::perform(object_store, dry_run, rx2)); let (lister, checker, deleter) = futures::join!(lister, checker, deleter); @@ -115,27 +117,44 @@ pub struct Args { } impl Args { - async fn object_store( - &self, - ) -> Result, clap_blocks::object_store::ParseError> { - make_object_store(&self.object_store) + fn object_store(&self) -> Result> { + make_object_store(&self.object_store).context(CreatingObjectStoreSnafu) } - async fn catalog(&self) -> Result, clap_blocks::catalog_dsn::Error> { + async fn catalog(&self) -> Result> { let metrics = metric::Registry::default().into(); self.catalog_dsn .get_catalog("iox_objectstore_garbage_collect", metrics) .await + .context(CreatingCatalogSnafu) } - fn cutoff(&self) -> Result, chrono_english::DateError> { - parse_date_string(&self.cutoff, Utc::now(), Dialect::Us) + fn cutoff(&self) -> Result> { + let argument = &self.cutoff; + parse_date_string(argument, Utc::now(), Dialect::Us) + .context(ParsingCutoffSnafu { argument }) } } #[derive(Debug, Snafu)] enum Error { + #[snafu(display("Could not create the object store"))] + CreatingObjectStore { + source: clap_blocks::object_store::ParseError, + }, + + #[snafu(display("Could not create the catalog"))] + CreatingCatalog { + source: clap_blocks::catalog_dsn::Error, + }, + + #[snafu(display(r#"Could not parse the cutoff "{argument}""#))] + ParsingCutoff { + source: chrono_english::DateError, + argument: String, + }, + #[snafu(display("The lister task failed"))] #[snafu(context(false))] Lister { source: lister::Error }, @@ -153,9 +172,6 @@ enum Error { Deleter { source: deleter::Error }, #[snafu(display("The deleter task panicked"))] DeleterPanic { source: tokio::task::JoinError }, - - #[snafu(display("Could not parse the cutoff argument"))] - ParsingCutoff { source: chrono_english::DateError }, } type Result = std::result::Result;