diff --git a/Cargo.lock b/Cargo.lock index 768209fc0a..14ee1cb7bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2056,12 +2056,12 @@ dependencies = [ "influxdb_storage_client", "influxrpc_parser", "iox_catalog", - "iox_objectstore_garbage_collect", "iox_query", "iox_time", "ioxd_common", "ioxd_compactor", "ioxd_ingester", + "ioxd_objectstore_garbage_collect", "ioxd_querier", "ioxd_router", "ioxd_test", @@ -2320,7 +2320,6 @@ dependencies = [ "snafu", "tempfile", "tokio", - "trogging", "uuid 1.1.2", ] @@ -2473,6 +2472,22 @@ dependencies = [ "write_buffer", ] +[[package]] +name = "ioxd_objectstore_garbage_collect" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "hyper", + "iox_objectstore_garbage_collect", + "ioxd_common", + "metric", + "snafu", + "tokio", + "trace", + "workspace-hack", +] + [[package]] name = "ioxd_querier" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index c2c31bdc45..a744e9caae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "ioxd_common", "ioxd_compactor", "ioxd_ingester", + "ioxd_objectstore_garbage_collect", "ioxd_querier", "ioxd_router", "ioxd_test", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 5c9df48341..ac7ff9161f 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -15,12 +15,12 @@ influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", " influxdb_storage_client = { path = "../influxdb_storage_client" } influxrpc_parser = { path = "../influxrpc_parser"} iox_catalog = { path = "../iox_catalog" } -iox_objectstore_garbage_collect = { path = "../iox_objectstore_garbage_collect" } ioxd_common = { path = "../ioxd_common"} ioxd_compactor = { path = "../ioxd_compactor"} ioxd_ingester = { path = "../ioxd_ingester"} -ioxd_router = { path = "../ioxd_router"} +ioxd_objectstore_garbage_collect = { path = "../ioxd_objectstore_garbage_collect" } ioxd_querier = { path = "../ioxd_querier"} +ioxd_router = { path = "../ioxd_router"} ioxd_test = { path = "../ioxd_test"} metric = { path = "../metric" } object_store = "0.3.0" diff --git a/influxdb_iox/src/commands/objectstore_garbage_collect.rs b/influxdb_iox/src/commands/objectstore_garbage_collect.rs deleted file mode 100644 index 19412df442..0000000000 --- a/influxdb_iox/src/commands/objectstore_garbage_collect.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub use iox_objectstore_garbage_collect::{Config, Error}; - -pub async fn command(config: Config) -> Result<(), Error> { - iox_objectstore_garbage_collect::main(config).await -} diff --git a/influxdb_iox/src/commands/run/garbage_collector.rs b/influxdb_iox/src/commands/run/garbage_collector.rs new file mode 100644 index 0000000000..8828bda72f --- /dev/null +++ b/influxdb_iox/src/commands/run/garbage_collector.rs @@ -0,0 +1,93 @@ +use clap_blocks::{ + catalog_dsn::CatalogDsnConfig, object_store::make_object_store, run_config::RunConfig, +}; +use iox_time::SystemProvider; +use ioxd_common::{ + server_type::{CommonServerState, CommonServerStateError}, + Service, +}; +use ioxd_objectstore_garbage_collect as gc; +use object_store::DynObjectStore; +use object_store_metrics::ObjectStoreMetrics; +use observability_deps::tracing::*; +use snafu::prelude::*; +use std::sync::Arc; + +use super::main; + +#[derive(Debug, clap::Parser)] +pub struct Config { + #[clap(flatten)] + pub run_config: RunConfig, + + #[clap(flatten)] + catalog_dsn: CatalogDsnConfig, + + #[clap(flatten)] + pub sub_config: gc::SubConfig, +} + +pub async fn command(config: Config) -> Result<()> { + let time_provider = Arc::new(SystemProvider::new()); + let metric_registry: Arc<metric::Registry> = Default::default(); + + let catalog = config + .catalog_dsn + .get_catalog("garbage-collector", Arc::clone(&metric_registry)) + .await?; + + let object_store = make_object_store(config.run_config.object_store_config())?; + + // Decorate the object store with a metric recorder. + let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreMetrics::new( + object_store, + time_provider, + &metric_registry, + )); + + let sub_config = config.sub_config; + + info!("starting garbage-collector"); + + let server_type = Arc::new({ + let config = gc::Config { + object_store, + catalog, + sub_config, + }; + let metric_registry = Arc::clone(&metric_registry); + + gc::Server::start(metric_registry, config) + }); + + let common_state = CommonServerState::from_config(config.run_config)?; + + let services = vec![Service::create(server_type, common_state.run_config())]; + + Ok(main::main(common_state, services, metric_registry).await?) +} + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Could not parse the catalog configuration"))] + #[snafu(context(false))] + CatalogConfigParsing { + source: clap_blocks::catalog_dsn::Error, + }, + + #[snafu(display("Could not parse the object store configuration"))] + #[snafu(context(false))] + ObjectStoreConfigParsing { + source: clap_blocks::object_store::ParseError, + }, + + #[snafu(display("Could not create the common server state"))] + #[snafu(context(false))] + CommonServerStateCreation { source: CommonServerStateError }, + + #[snafu(display("Could not start the garbage collector"))] + #[snafu(context(false))] + ServiceExecution { source: super::main::Error }, +} + +pub type Result<T, E = Error> = std::result::Result<T, E>; diff --git a/influxdb_iox/src/commands/run/mod.rs b/influxdb_iox/src/commands/run/mod.rs index f4f34c096f..cd98c071e7 100644 --- a/influxdb_iox/src/commands/run/mod.rs +++ b/influxdb_iox/src/commands/run/mod.rs @@ -3,6 +3,7 @@ use trogging::cli::LoggingConfig; pub(crate) mod all_in_one; mod compactor; +mod garbage_collector; mod ingester; mod main; mod querier; @@ -15,6 +16,9 @@ pub enum Error { #[snafu(display("Error in compactor subcommand: {}", source))] CompactorError { source: compactor::Error }, + #[snafu(display("Error in garbage collector subcommand: {}", source))] + GarbageCollectorError { source: garbage_collector::Error }, + #[snafu(display("Error in querier subcommand: {}", source))] QuerierError { source: querier::Error }, @@ -48,6 +52,7 @@ impl Config { match &self.command { None => &self.all_in_one_config.logging_config, Some(Command::Compactor(config)) => config.run_config.logging_config(), + Some(Command::GarbageCollector(config)) => config.run_config.logging_config(), Some(Command::Querier(config)) => config.run_config.logging_config(), Some(Command::Router(config)) => config.run_config.logging_config(), Some(Command::Ingester(config)) => config.run_config.logging_config(), @@ -76,6 +81,9 @@ enum Command { /// Run the server in test mode Test(test::Config), + + /// Run the server in garbage collecter mode + GarbageCollector(garbage_collector::Config), } pub async fn command(config: Config) -> Result<()> { @@ -86,6 +94,9 @@ pub async fn command(config: Config) -> Result<()> { Some(Command::Compactor(config)) => { compactor::command(config).await.context(CompactorSnafu) } + Some(Command::GarbageCollector(config)) => garbage_collector::command(config) + .await + .context(GarbageCollectorSnafu), Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu), Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu), Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu), diff --git a/influxdb_iox/src/main.rs b/influxdb_iox/src/main.rs index 9b9b837023..bd57fa4ebd 100644 --- a/influxdb_iox/src/main.rs +++ b/influxdb_iox/src/main.rs @@ -29,7 +29,6 @@ use tokio::runtime::Runtime; mod commands { pub mod catalog; pub mod debug; - pub mod objectstore_garbage_collect; pub mod query; pub mod query_ingester; pub mod remote; @@ -172,10 +171,6 @@ enum Command { /// Query the ingester only QueryIngester(commands::query_ingester::Config), - - /// Clean up old object store files that don't appear in the catalog. - #[clap(name = "objectstore_garbage_collect")] - ObjectStoreGarbageCollect(Box<commands::objectstore_garbage_collect::Config>), } fn main() -> Result<(), std::io::Error> { @@ -310,17 +305,6 @@ fn main() -> Result<(), std::io::Error> { std::process::exit(ReturnCode::Failure as _) } } - Some(Command::ObjectStoreGarbageCollect(config)) => { - let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count)); - if let Err(e) = commands::objectstore_garbage_collect::command(*config).await { - use snafu::ErrorCompat; - eprintln!("{}", e); - for cause in ErrorCompat::iter_chain(&e).skip(1) { - eprintln!("Caused by: {cause}"); - } - std::process::exit(ReturnCode::Failure as _) - } - } } }); diff --git a/iox_objectstore_garbage_collect/Cargo.toml b/iox_objectstore_garbage_collect/Cargo.toml index 85ce2705a2..55294568fb 100644 --- a/iox_objectstore_garbage_collect/Cargo.toml +++ b/iox_objectstore_garbage_collect/Cargo.toml @@ -7,20 +7,19 @@ edition = "2021" chrono = { version = "0.4", default-features = false } chrono-english = "0.1.4" clap = { version = "3", features = ["derive", "env"] } -clap_blocks = { path = "../clap_blocks" } -iox_catalog = { path = "../iox_catalog" } futures = "0.3" -metric = { path = "../metric" } +iox_catalog = { path = "../iox_catalog" } object_store = { version = "0.3.0" } observability_deps = { path = "../observability_deps" } snafu = "0.7" tokio = { version = "1", features = ["macros", "rt", "sync"] } -trogging = { path = "../trogging", default-features = false, features = ["clap"] } uuid = { version = "1", features = ["v4"] } [dev-dependencies] +clap_blocks = { path = "../clap_blocks" } data_types = { path = "../data_types" } filetime = "0.2" +metric = { path = "../metric" } once_cell = { version = "1.12.0", features = ["parking_lot"] } parquet_file = { path = "../parquet_file" } tempfile = "3" diff --git a/iox_objectstore_garbage_collect/src/lib.rs b/iox_objectstore_garbage_collect/src/lib.rs index 849cb60dc1..e5eec3004e 100644 --- a/iox_objectstore_garbage_collect/src/lib.rs +++ b/iox_objectstore_garbage_collect/src/lib.rs @@ -18,68 +18,112 @@ use chrono::{DateTime, Utc}; use chrono_english::{parse_date_string, Dialect}; use clap::Parser; -use clap_blocks::{ - catalog_dsn::CatalogDsnConfig, - object_store::{make_object_store, ObjectStoreConfig}, -}; use iox_catalog::interface::Catalog; use object_store::DynObjectStore; use observability_deps::tracing::*; use snafu::prelude::*; use std::sync::Arc; -use tokio::sync::mpsc; -use trogging::cli::LoggingConfig; +use tokio::sync::{broadcast, mpsc}; /// Logic for checking if a file in object storage should be deleted or not. -pub mod checker; +mod checker; /// Logic for deleting a file from object storage. -pub mod deleter; +mod deleter; /// Logic for listing all files in object storage. -pub mod lister; +mod lister; const BATCH_SIZE: usize = 1000; /// Run the tasks that clean up old object store files that don't appear in the catalog. pub async fn main(config: Config) -> Result<()> { - let object_store = config.object_store()?; - let catalog = config.catalog().await?; - - let dry_run = config.dry_run; - let cutoff = config.cutoff()?; - info!( - cutoff_arg = %config.cutoff, - cutoff_parsed = %cutoff, - ); - - let (tx1, rx1) = mpsc::channel(BATCH_SIZE); - let (tx2, rx2) = mpsc::channel(BATCH_SIZE); - - let lister = tokio::spawn(lister::perform(Arc::clone(&object_store), tx1)); - let checker = tokio::spawn(checker::perform(catalog, cutoff, rx1, tx2)); - let deleter = tokio::spawn(deleter::perform(object_store, dry_run, rx2)); - - let (lister, checker, deleter) = futures::join!(lister, checker, deleter); - - deleter.context(DeleterPanicSnafu)??; - checker.context(CheckerPanicSnafu)??; - lister.context(ListerPanicSnafu)??; - - Ok(()) + GarbageCollector::start(config)?.join().await } -/// Clean up old object store files that don't appear in the catalog. -#[derive(Debug, Parser)] +/// The tasks that clean up old object store files that don't appear in the catalog. +#[derive(Debug)] +pub struct GarbageCollector { + shutdown_tx: broadcast::Sender<()>, + lister: tokio::task::JoinHandle<Result<(), lister::Error>>, + checker: tokio::task::JoinHandle<Result<(), checker::Error>>, + deleter: tokio::task::JoinHandle<Result<(), deleter::Error>>, +} + +impl GarbageCollector { + /// Construct the garbage collector and start it + pub fn start(config: Config) -> Result<Self> { + let Config { + object_store, + sub_config, + catalog, + } = config; + + let dry_run = sub_config.dry_run; + let cutoff = sub_config.cutoff()?; + info!( + cutoff_arg = %sub_config.cutoff, + cutoff_parsed = %cutoff, + ); + + let (shutdown_tx, shutdown_rx) = broadcast::channel(1); + + let (tx1, rx1) = mpsc::channel(BATCH_SIZE); + let (tx2, rx2) = mpsc::channel(BATCH_SIZE); + + let lister = tokio::spawn(lister::perform(shutdown_rx, Arc::clone(&object_store), tx1)); + let checker = tokio::spawn(checker::perform(catalog, cutoff, rx1, tx2)); + let deleter = tokio::spawn(deleter::perform(object_store, dry_run, rx2)); + + Ok(Self { + shutdown_tx, + lister, + checker, + deleter, + }) + } + + /// A handle to gracefully shutdown the garbage collector when invoked + pub fn shutdown_handle(&self) -> impl Fn() { + let shutdown_tx = self.shutdown_tx.clone(); + move || { + shutdown_tx.send(()).ok(); + } + } + + /// Wait for the garbage collector to finish work + pub async fn join(self) -> Result<()> { + let Self { + lister, + checker, + deleter, + .. + } = self; + + let (lister, checker, deleter) = futures::join!(lister, checker, deleter); + + deleter.context(DeleterPanicSnafu)??; + checker.context(CheckerPanicSnafu)??; + lister.context(ListerPanicSnafu)??; + + Ok(()) + } +} + +/// Configuration to run the object store garbage collector +#[derive(Debug, Clone)] pub struct Config { - #[clap(flatten)] - object_store: ObjectStoreConfig, + /// The object store to garbage collect + pub object_store: Arc<DynObjectStore>, - #[clap(flatten)] - catalog_dsn: CatalogDsnConfig, + /// The catalog to check if an object is garbage + pub catalog: Arc<dyn Catalog>, - /// logging options - #[clap(flatten)] - pub(crate) logging_config: LoggingConfig, + /// The garbage collector specific configuration + pub sub_config: SubConfig, +} +/// Configuration specific to the object store garbage collector +#[derive(Debug, Clone, Parser)] +pub struct SubConfig { /// If this flag is specified, don't delete the files in object storage. Only print the files /// that would be deleted if this flag wasn't specified. #[clap(long)] @@ -94,20 +138,7 @@ pub struct Config { cutoff: String, } -impl Config { - fn object_store(&self) -> Result<Arc<DynObjectStore>> { - make_object_store(&self.object_store).context(CreatingObjectStoreSnafu) - } - - async fn catalog(&self) -> Result<Arc<dyn Catalog>> { - let metrics = metric::Registry::default().into(); - - self.catalog_dsn - .get_catalog("iox_objectstore_garbage_collect", metrics) - .await - .context(CreatingCatalogSnafu) - } - +impl SubConfig { fn cutoff(&self) -> Result<DateTime<Utc>> { let argument = &self.cutoff; parse_date_string(argument, Utc::now(), Dialect::Us) @@ -118,16 +149,6 @@ impl Config { #[derive(Debug, Snafu)] #[allow(missing_docs)] pub enum Error { - #[snafu(display("Could not create the object store"))] - CreatingObjectStore { - source: clap_blocks::object_store::ParseError, - }, - - #[snafu(display("Could not create the catalog"))] - CreatingCatalog { - source: clap_blocks::catalog_dsn::Error, - }, - #[snafu(display(r#"Could not parse the cutoff "{argument}""#))] ParsingCutoff { source: chrono_english::DateError, @@ -153,12 +174,17 @@ pub enum Error { DeleterPanic { source: tokio::task::JoinError }, } -type Result<T, E = Error> = std::result::Result<T, E>; +#[allow(missing_docs)] +pub type Result<T, E = Error> = std::result::Result<T, E>; #[cfg(test)] mod tests { + use clap_blocks::{ + catalog_dsn::CatalogDsnConfig, + object_store::{make_object_store, ObjectStoreConfig}, + }; use filetime::FileTime; - use std::{fs, path::PathBuf}; + use std::{fs, iter, path::PathBuf}; use tempfile::TempDir; use super::*; @@ -167,13 +193,7 @@ mod tests { async fn deletes_untracked_files_older_than_the_cutoff() { let setup = OldFileSetup::new(); - #[rustfmt::skip] - let config = Config::parse_from([ - "dummy-program-name", - "--object-store", "file", - "--data-dir", setup.data_dir_arg(), - "--catalog", "memory", - ]); + let config = build_config(setup.data_dir_arg(), []).await; main(config).await.unwrap(); assert!( @@ -188,13 +208,9 @@ mod tests { let setup = OldFileSetup::new(); #[rustfmt::skip] - let config = Config::parse_from([ - "dummy-program-name", - "--object-store", "file", - "--data-dir", setup.data_dir_arg(), - "--catalog", "memory", + let config = build_config(setup.data_dir_arg(), [ "--cutoff", "10 years ago", - ]); + ]).await; main(config).await.unwrap(); assert!( @@ -204,6 +220,42 @@ mod tests { ); } + async fn build_config(data_dir: &str, args: impl IntoIterator<Item = &str> + Send) -> Config { + let sub_config = SubConfig::parse_from(iter::once("dummy-program-name").chain(args)); + let object_store = object_store(data_dir); + let catalog = catalog().await; + + Config { + object_store, + catalog, + sub_config, + } + } + + fn object_store(data_dir: &str) -> Arc<DynObjectStore> { + #[rustfmt::skip] + let cfg = ObjectStoreConfig::parse_from([ + "dummy-program-name", + "--object-store", "file", + "--data-dir", data_dir, + ]); + make_object_store(&cfg).unwrap() + } + + async fn catalog() -> Arc<dyn Catalog> { + #[rustfmt::skip] + let cfg = CatalogDsnConfig::parse_from([ + "dummy-program-name", + "--catalog", "memory", + ]); + + let metrics = metric::Registry::default().into(); + + cfg.get_catalog("iox_objectstore_garbage_collect", metrics) + .await + .unwrap() + } + struct OldFileSetup { data_dir: TempDir, file_path: PathBuf, diff --git a/iox_objectstore_garbage_collect/src/lister.rs b/iox_objectstore_garbage_collect/src/lister.rs index 7aff121d37..c1447c315b 100644 --- a/iox_objectstore_garbage_collect/src/lister.rs +++ b/iox_objectstore_garbage_collect/src/lister.rs @@ -3,18 +3,36 @@ use object_store::{DynObjectStore, ObjectMeta}; use observability_deps::tracing::*; use snafu::prelude::*; use std::sync::Arc; -use tokio::sync::mpsc; +use tokio::{ + select, + sync::{broadcast, mpsc}, +}; pub(crate) async fn perform( + mut shutdown: broadcast::Receiver<()>, object_store: Arc<DynObjectStore>, checker: mpsc::Sender<ObjectMeta>, ) -> Result<()> { let mut items = object_store.list(None).await.context(ListingSnafu)?; - while let Some(item) = items.next().await { - let item = item.context(MalformedSnafu)?; - info!(location = %item.location); - checker.send(item).await?; + loop { + select! { + _ = shutdown.recv() => { + break + }, + item = items.next() => { + match item { + Some(item) => { + let item = item.context(MalformedSnafu)?; + info!(location = %item.location); + checker.send(item).await?; + } + None => { + break; + } + } + } + } } Ok(()) diff --git a/iox_objectstore_garbage_collect/src/server.rs b/iox_objectstore_garbage_collect/src/server.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ioxd_objectstore_garbage_collect/Cargo.toml b/ioxd_objectstore_garbage_collect/Cargo.toml new file mode 100644 index 0000000000..c66300d3f4 --- /dev/null +++ b/ioxd_objectstore_garbage_collect/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "ioxd_objectstore_garbage_collect" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +futures = "0.3" +hyper = "0.14" +iox_objectstore_garbage_collect = { path = "../iox_objectstore_garbage_collect" } +ioxd_common = { path = "../ioxd_common" } +metric = { path = "../metric" } +snafu = "0.7" +tokio = { version = "1", features = ["sync"] } +trace = { path = "../trace" } +workspace-hack = { path = "../workspace-hack"} diff --git a/ioxd_objectstore_garbage_collect/src/lib.rs b/ioxd_objectstore_garbage_collect/src/lib.rs new file mode 100644 index 0000000000..904ab53d6d --- /dev/null +++ b/ioxd_objectstore_garbage_collect/src/lib.rs @@ -0,0 +1,196 @@ +//! Tool to clean up old object store files that don't appear in the catalog. + +#![deny( + rustdoc::broken_intra_doc_links, + rust_2018_idioms, + missing_debug_implementations, + unreachable_pub +)] +#![warn( + missing_docs, + clippy::todo, + clippy::dbg_macro, + clippy::clone_on_ref_ptr, + clippy::future_not_send +)] +#![allow(clippy::missing_docs_in_private_items)] + +use async_trait::async_trait; +use futures::{ + future::{BoxFuture, Shared}, + prelude::*, +}; +use hyper::{Body, Request, Response}; +use iox_objectstore_garbage_collect::GarbageCollector; +use ioxd_common::{ + http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource}, + rpc::RpcBuilderInput, + serve_builder, + server_type::{RpcError, ServerType}, + setup_builder, +}; +use metric::Registry; +use snafu::prelude::*; +use std::{sync::Arc, time::Duration}; +use tokio::{select, sync::broadcast, task::JoinError, time}; +use trace::TraceCollector; + +pub use iox_objectstore_garbage_collect::{Config, SubConfig}; + +/// The object store garbage collection server +#[derive(Debug)] +pub struct Server { + metric_registry: Arc<metric::Registry>, + worker: SharedCloneError<(), JoinError>, + shutdown_tx: broadcast::Sender<()>, +} + +impl Server { + /// Construct and start the object store garbage collector + pub fn start(metric_registry: Arc<metric::Registry>, config: Config) -> Self { + let (shutdown_tx, shutdown_rx) = broadcast::channel(1); + + let worker = tokio::spawn(Self::worker_task(config, shutdown_rx)); + let worker = shared_clone_error(worker); + + Self { + metric_registry, + worker, + shutdown_tx, + } + } + + async fn worker_task(config: Config, mut shutdown_rx: broadcast::Receiver<()>) { + const ONE_HOUR: u64 = 60 * 60; + let mut minimum_next_start_time = time::interval(Duration::from_secs(ONE_HOUR)); + + loop { + // Avoid a hot infinite loop when there's nothing to delete + select! { + _ = shutdown_rx.recv() => return, + _ = minimum_next_start_time.tick() => {}, + } + + let handle = GarbageCollector::start(config.clone()) + .context(StartGarbageCollectorSnafu) + .unwrap_or_report(); + let shutdown_garbage_collector = handle.shutdown_handle(); + let mut complete = shared_clone_error(handle.join()); + + loop { + select! { + _ = shutdown_rx.recv() => { + shutdown_garbage_collector(); + complete.await.context(JoinGarbageCollectorSnafu).unwrap_or_report(); + return; + }, + v = &mut complete => { + v.context(JoinGarbageCollectorSnafu).unwrap_or_report(); + break; + }, + } + } + } + } +} + +#[async_trait] +impl ServerType for Server { + fn metric_registry(&self) -> Arc<Registry> { + Arc::clone(&self.metric_registry) + } + + fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> { + None + } + + async fn route_http_request( + &self, + _req: Request<Body>, + ) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>> { + Err(Box::new(HttpNotFound)) + } + + async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> { + let builder = setup_builder!(builder_input, self); + serve_builder!(builder); + + Ok(()) + } + + async fn join(self: Arc<Self>) { + self.worker + .clone() + .await + .context(JoinWorkerSnafu) + .unwrap_or_report(); + } + + fn shutdown(&self) { + self.shutdown_tx.send(()).ok(); + } +} + +#[derive(Debug, Snafu)] +struct HttpNotFound; + +impl HttpApiErrorSource for HttpNotFound { + fn to_http_api_error(&self) -> HttpApiError { + HttpApiError::new(HttpApiErrorCode::NotFound, self.to_string()) + } +} + +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +pub enum Error { + #[snafu(display("Could not start the garbage collector"))] + StartGarbageCollector { + source: iox_objectstore_garbage_collect::Error, + }, + + #[snafu(display("Could not join the garbage collector"))] + JoinGarbageCollector { + source: Arc<iox_objectstore_garbage_collect::Error>, + }, + + #[snafu(display("Could not join the garbage collector worker task"))] + JoinWorker { source: Arc<JoinError> }, +} + +#[allow(missing_docs)] +pub type Result<T, E = Error> = std::result::Result<T, E>; + +trait UnwrapOrReportExt<T> { + fn unwrap_or_report(self) -> T; +} + +impl<T> UnwrapOrReportExt<T> for Result<T> { + fn unwrap_or_report(self) -> T { + match self { + Ok(v) => v, + Err(e) => { + use snafu::ErrorCompat; + use std::fmt::Write; + + let mut message = String::new(); + + writeln!(message, "{}", e).unwrap(); + for cause in ErrorCompat::iter_chain(&e).skip(1) { + writeln!(message, "Caused by: {cause}").unwrap(); + } + + panic!("{message}"); + } + } + } +} + +type SharedCloneError<T, E> = Shared<BoxFuture<'static, Result<T, Arc<E>>>>; + +fn shared_clone_error<F>(handle: F) -> SharedCloneError<F::Ok, F::Error> +where + F: TryFuture + Send + 'static, + F::Ok: Clone, +{ + handle.map_err(Arc::new).boxed().shared() +}