refactor: eagerly validate garbage collector command line arguments

pull/24376/head
Jake Goulding 2022-07-01 14:09:42 -04:00 committed by Carol (Nichols || Goulding)
parent 6fc17164bd
commit f1b0b4da93
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
4 changed files with 38 additions and 49 deletions

View File

@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use iox_catalog::interface::ParquetFileRepo;
use iox_catalog::interface::{Catalog, ParquetFileRepo};
use object_store::ObjectMeta;
use observability_deps::tracing::*;
use snafu::prelude::*;
@ -8,11 +8,6 @@ use tokio::sync::mpsc;
#[derive(Debug, Snafu)]
pub(crate) enum Error {
#[snafu(display("Could not create the catalog"))]
CreatingCatalog {
source: clap_blocks::catalog_dsn::Error,
},
#[snafu(display("Expected a file name"))]
FileNameMissing,
@ -34,13 +29,11 @@ pub(crate) enum Error {
pub(crate) type Result<T, E = Error> = std::result::Result<T, E>;
pub(crate) async fn perform(
args: Arc<crate::Args>,
catalog: Arc<dyn Catalog>,
cutoff: DateTime<Utc>,
mut items: mpsc::Receiver<ObjectMeta>,
deleter: mpsc::Sender<ObjectMeta>,
) -> Result<()> {
let catalog = args.catalog().await.context(CreatingCatalogSnafu)?;
let mut repositories = catalog.repositories().await;
let parquet_files = repositories.parquet_files();

View File

@ -1,19 +1,14 @@
use object_store::ObjectMeta;
use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::*;
use snafu::prelude::*;
use std::sync::Arc;
use tokio::sync::mpsc;
pub(crate) async fn perform(
args: Arc<crate::Args>,
object_store: Arc<DynObjectStore>,
dry_run: bool,
mut items: mpsc::Receiver<ObjectMeta>,
) -> Result<()> {
let object_store = args
.object_store()
.await
.context(CreatingObjectStoreSnafu)?;
let dry_run = args.dry_run;
while let Some(item) = items.recv().await {
let path = item.location;
if dry_run {
@ -35,11 +30,6 @@ pub(crate) async fn perform(
#[derive(Debug, Snafu)]
pub(crate) enum Error {
#[snafu(display("Could not create the object store"))]
CreatingObjectStore {
source: clap_blocks::object_store::ParseError,
},
#[snafu(display("{path} could not be deleted"))]
Deleting {
source: object_store::Error,

View File

@ -1,19 +1,14 @@
use futures::prelude::*;
use object_store::ObjectMeta;
use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::*;
use snafu::prelude::*;
use std::sync::Arc;
use tokio::sync::mpsc;
pub(crate) async fn perform(
args: Arc<crate::Args>,
object_store: Arc<DynObjectStore>,
checker: mpsc::Sender<ObjectMeta>,
) -> Result<()> {
let object_store = args
.object_store()
.await
.context(CreatingObjectStoreSnafu)?;
let mut items = object_store.list(None).await.context(ListingSnafu)?;
while let Some(item) = items.next().await {
@ -27,11 +22,6 @@ pub(crate) async fn perform(
#[derive(Debug, Snafu)]
pub(crate) enum Error {
#[snafu(display("Could not create the object store"))]
CreatingObjectStore {
source: clap_blocks::object_store::ParseError,
},
#[snafu(display("The prefix could not be listed"))]
Listing { source: object_store::Error },

View File

@ -64,9 +64,11 @@ fn main() -> ExitCode {
#[tokio::main(flavor = "current_thread")]
async fn inner_main(args: Args) -> Result<()> {
let args = Arc::new(args);
let object_store = args.object_store()?;
let catalog = args.catalog().await?;
let cutoff = args.cutoff().context(ParsingCutoffSnafu)?;
let dry_run = args.dry_run;
let cutoff = args.cutoff()?;
info!(
cutoff_arg = %args.cutoff,
cutoff_parsed = %cutoff,
@ -75,9 +77,9 @@ async fn inner_main(args: Args) -> Result<()> {
let (tx1, rx1) = mpsc::channel(BATCH_SIZE);
let (tx2, rx2) = mpsc::channel(BATCH_SIZE);
let lister = tokio::spawn(lister::perform(args.clone(), tx1));
let checker = tokio::spawn(checker::perform(args.clone(), cutoff, rx1, tx2));
let deleter = tokio::spawn(deleter::perform(args.clone(), rx2));
let lister = tokio::spawn(lister::perform(Arc::clone(&object_store), tx1));
let checker = tokio::spawn(checker::perform(catalog, cutoff, rx1, tx2));
let deleter = tokio::spawn(deleter::perform(object_store, dry_run, rx2));
let (lister, checker, deleter) = futures::join!(lister, checker, deleter);
@ -115,27 +117,44 @@ pub struct Args {
}
impl Args {
async fn object_store(
&self,
) -> Result<Arc<DynObjectStore>, clap_blocks::object_store::ParseError> {
make_object_store(&self.object_store)
fn object_store(&self) -> Result<Arc<DynObjectStore>> {
make_object_store(&self.object_store).context(CreatingObjectStoreSnafu)
}
async fn catalog(&self) -> Result<Arc<dyn Catalog>, clap_blocks::catalog_dsn::Error> {
async fn catalog(&self) -> Result<Arc<dyn Catalog>> {
let metrics = metric::Registry::default().into();
self.catalog_dsn
.get_catalog("iox_objectstore_garbage_collect", metrics)
.await
.context(CreatingCatalogSnafu)
}
fn cutoff(&self) -> Result<DateTime<Utc>, chrono_english::DateError> {
parse_date_string(&self.cutoff, Utc::now(), Dialect::Us)
fn cutoff(&self) -> Result<DateTime<Utc>> {
let argument = &self.cutoff;
parse_date_string(argument, Utc::now(), Dialect::Us)
.context(ParsingCutoffSnafu { argument })
}
}
#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Could not create the object store"))]
CreatingObjectStore {
source: clap_blocks::object_store::ParseError,
},
#[snafu(display("Could not create the catalog"))]
CreatingCatalog {
source: clap_blocks::catalog_dsn::Error,
},
#[snafu(display(r#"Could not parse the cutoff "{argument}""#))]
ParsingCutoff {
source: chrono_english::DateError,
argument: String,
},
#[snafu(display("The lister task failed"))]
#[snafu(context(false))]
Lister { source: lister::Error },
@ -153,9 +172,6 @@ enum Error {
Deleter { source: deleter::Error },
#[snafu(display("The deleter task panicked"))]
DeleterPanic { source: tokio::task::JoinError },
#[snafu(display("Could not parse the cutoff argument"))]
ParsingCutoff { source: chrono_english::DateError },
}
type Result<T, E = Error> = std::result::Result<T, E>;