feat: make object store garbage collector into a long-running service (#5135)

* refactor: remove unused logging config

* chore: remove the object store garbage collector CLI tool

* refactor: accept an object store and catalog

* refactor: make Result type alias public like the error

* refactor: remove public modifier from modules

* refactor: allow shutting down the object store garbage collector

* feat: Introduce the object-store garbage collection server

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Jake Goulding 2022-07-18 17:27:38 -04:00 committed by GitHub
parent bceee0e2a0
commit f7a0fd43d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 497 additions and 115 deletions

19
Cargo.lock generated
View File

@ -2056,12 +2056,12 @@ dependencies = [
"influxdb_storage_client",
"influxrpc_parser",
"iox_catalog",
"iox_objectstore_garbage_collect",
"iox_query",
"iox_time",
"ioxd_common",
"ioxd_compactor",
"ioxd_ingester",
"ioxd_objectstore_garbage_collect",
"ioxd_querier",
"ioxd_router",
"ioxd_test",
@ -2320,7 +2320,6 @@ dependencies = [
"snafu",
"tempfile",
"tokio",
"trogging",
"uuid 1.1.2",
]
@ -2473,6 +2472,22 @@ dependencies = [
"write_buffer",
]
[[package]]
name = "ioxd_objectstore_garbage_collect"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"hyper",
"iox_objectstore_garbage_collect",
"ioxd_common",
"metric",
"snafu",
"tokio",
"trace",
"workspace-hack",
]
[[package]]
name = "ioxd_querier"
version = "0.1.0"

View File

@ -30,6 +30,7 @@ members = [
"ioxd_common",
"ioxd_compactor",
"ioxd_ingester",
"ioxd_objectstore_garbage_collect",
"ioxd_querier",
"ioxd_router",
"ioxd_test",

View File

@ -15,12 +15,12 @@ influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "
influxdb_storage_client = { path = "../influxdb_storage_client" }
influxrpc_parser = { path = "../influxrpc_parser"}
iox_catalog = { path = "../iox_catalog" }
iox_objectstore_garbage_collect = { path = "../iox_objectstore_garbage_collect" }
ioxd_common = { path = "../ioxd_common"}
ioxd_compactor = { path = "../ioxd_compactor"}
ioxd_ingester = { path = "../ioxd_ingester"}
ioxd_router = { path = "../ioxd_router"}
ioxd_objectstore_garbage_collect = { path = "../ioxd_objectstore_garbage_collect" }
ioxd_querier = { path = "../ioxd_querier"}
ioxd_router = { path = "../ioxd_router"}
ioxd_test = { path = "../ioxd_test"}
metric = { path = "../metric" }
object_store = "0.3.0"

View File

@ -1,5 +0,0 @@
pub use iox_objectstore_garbage_collect::{Config, Error};
pub async fn command(config: Config) -> Result<(), Error> {
iox_objectstore_garbage_collect::main(config).await
}

View File

@ -0,0 +1,93 @@
use clap_blocks::{
catalog_dsn::CatalogDsnConfig, object_store::make_object_store, run_config::RunConfig,
};
use iox_time::SystemProvider;
use ioxd_common::{
server_type::{CommonServerState, CommonServerStateError},
Service,
};
use ioxd_objectstore_garbage_collect as gc;
use object_store::DynObjectStore;
use object_store_metrics::ObjectStoreMetrics;
use observability_deps::tracing::*;
use snafu::prelude::*;
use std::sync::Arc;
use super::main;
#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
pub run_config: RunConfig,
#[clap(flatten)]
catalog_dsn: CatalogDsnConfig,
#[clap(flatten)]
pub sub_config: gc::SubConfig,
}
pub async fn command(config: Config) -> Result<()> {
let time_provider = Arc::new(SystemProvider::new());
let metric_registry: Arc<metric::Registry> = Default::default();
let catalog = config
.catalog_dsn
.get_catalog("garbage-collector", Arc::clone(&metric_registry))
.await?;
let object_store = make_object_store(config.run_config.object_store_config())?;
// Decorate the object store with a metric recorder.
let object_store: Arc<DynObjectStore> = Arc::new(ObjectStoreMetrics::new(
object_store,
time_provider,
&metric_registry,
));
let sub_config = config.sub_config;
info!("starting garbage-collector");
let server_type = Arc::new({
let config = gc::Config {
object_store,
catalog,
sub_config,
};
let metric_registry = Arc::clone(&metric_registry);
gc::Server::start(metric_registry, config)
});
let common_state = CommonServerState::from_config(config.run_config)?;
let services = vec![Service::create(server_type, common_state.run_config())];
Ok(main::main(common_state, services, metric_registry).await?)
}
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Could not parse the catalog configuration"))]
#[snafu(context(false))]
CatalogConfigParsing {
source: clap_blocks::catalog_dsn::Error,
},
#[snafu(display("Could not parse the object store configuration"))]
#[snafu(context(false))]
ObjectStoreConfigParsing {
source: clap_blocks::object_store::ParseError,
},
#[snafu(display("Could not create the common server state"))]
#[snafu(context(false))]
CommonServerStateCreation { source: CommonServerStateError },
#[snafu(display("Could not start the garbage collector"))]
#[snafu(context(false))]
ServiceExecution { source: super::main::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;

View File

@ -3,6 +3,7 @@ use trogging::cli::LoggingConfig;
pub(crate) mod all_in_one;
mod compactor;
mod garbage_collector;
mod ingester;
mod main;
mod querier;
@ -15,6 +16,9 @@ pub enum Error {
#[snafu(display("Error in compactor subcommand: {}", source))]
CompactorError { source: compactor::Error },
#[snafu(display("Error in garbage collector subcommand: {}", source))]
GarbageCollectorError { source: garbage_collector::Error },
#[snafu(display("Error in querier subcommand: {}", source))]
QuerierError { source: querier::Error },
@ -48,6 +52,7 @@ impl Config {
match &self.command {
None => &self.all_in_one_config.logging_config,
Some(Command::Compactor(config)) => config.run_config.logging_config(),
Some(Command::GarbageCollector(config)) => config.run_config.logging_config(),
Some(Command::Querier(config)) => config.run_config.logging_config(),
Some(Command::Router(config)) => config.run_config.logging_config(),
Some(Command::Ingester(config)) => config.run_config.logging_config(),
@ -76,6 +81,9 @@ enum Command {
/// Run the server in test mode
Test(test::Config),
/// Run the server in garbage collecter mode
GarbageCollector(garbage_collector::Config),
}
pub async fn command(config: Config) -> Result<()> {
@ -86,6 +94,9 @@ pub async fn command(config: Config) -> Result<()> {
Some(Command::Compactor(config)) => {
compactor::command(config).await.context(CompactorSnafu)
}
Some(Command::GarbageCollector(config)) => garbage_collector::command(config)
.await
.context(GarbageCollectorSnafu),
Some(Command::Querier(config)) => querier::command(config).await.context(QuerierSnafu),
Some(Command::Router(config)) => router::command(config).await.context(RouterSnafu),
Some(Command::Ingester(config)) => ingester::command(config).await.context(IngesterSnafu),

View File

@ -29,7 +29,6 @@ use tokio::runtime::Runtime;
mod commands {
pub mod catalog;
pub mod debug;
pub mod objectstore_garbage_collect;
pub mod query;
pub mod query_ingester;
pub mod remote;
@ -172,10 +171,6 @@ enum Command {
/// Query the ingester only
QueryIngester(commands::query_ingester::Config),
/// Clean up old object store files that don't appear in the catalog.
#[clap(name = "objectstore_garbage_collect")]
ObjectStoreGarbageCollect(Box<commands::objectstore_garbage_collect::Config>),
}
fn main() -> Result<(), std::io::Error> {
@ -310,17 +305,6 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::ObjectStoreGarbageCollect(config)) => {
let _tracing_guard = handle_init_logs(init_simple_logs(log_verbose_count));
if let Err(e) = commands::objectstore_garbage_collect::command(*config).await {
use snafu::ErrorCompat;
eprintln!("{}", e);
for cause in ErrorCompat::iter_chain(&e).skip(1) {
eprintln!("Caused by: {cause}");
}
std::process::exit(ReturnCode::Failure as _)
}
}
}
});

View File

@ -7,20 +7,19 @@ edition = "2021"
chrono = { version = "0.4", default-features = false }
chrono-english = "0.1.4"
clap = { version = "3", features = ["derive", "env"] }
clap_blocks = { path = "../clap_blocks" }
iox_catalog = { path = "../iox_catalog" }
futures = "0.3"
metric = { path = "../metric" }
iox_catalog = { path = "../iox_catalog" }
object_store = { version = "0.3.0" }
observability_deps = { path = "../observability_deps" }
snafu = "0.7"
tokio = { version = "1", features = ["macros", "rt", "sync"] }
trogging = { path = "../trogging", default-features = false, features = ["clap"] }
uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
clap_blocks = { path = "../clap_blocks" }
data_types = { path = "../data_types" }
filetime = "0.2"
metric = { path = "../metric" }
once_cell = { version = "1.12.0", features = ["parking_lot"] }
parquet_file = { path = "../parquet_file" }
tempfile = "3"

View File

@ -18,46 +18,86 @@
use chrono::{DateTime, Utc};
use chrono_english::{parse_date_string, Dialect};
use clap::Parser;
use clap_blocks::{
catalog_dsn::CatalogDsnConfig,
object_store::{make_object_store, ObjectStoreConfig},
};
use iox_catalog::interface::Catalog;
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use snafu::prelude::*;
use std::sync::Arc;
use tokio::sync::mpsc;
use trogging::cli::LoggingConfig;
use tokio::sync::{broadcast, mpsc};
/// Logic for checking if a file in object storage should be deleted or not.
pub mod checker;
mod checker;
/// Logic for deleting a file from object storage.
pub mod deleter;
mod deleter;
/// Logic for listing all files in object storage.
pub mod lister;
mod lister;
const BATCH_SIZE: usize = 1000;
/// Run the tasks that clean up old object store files that don't appear in the catalog.
pub async fn main(config: Config) -> Result<()> {
let object_store = config.object_store()?;
let catalog = config.catalog().await?;
GarbageCollector::start(config)?.join().await
}
let dry_run = config.dry_run;
let cutoff = config.cutoff()?;
/// The tasks that clean up old object store files that don't appear in the catalog.
#[derive(Debug)]
pub struct GarbageCollector {
shutdown_tx: broadcast::Sender<()>,
lister: tokio::task::JoinHandle<Result<(), lister::Error>>,
checker: tokio::task::JoinHandle<Result<(), checker::Error>>,
deleter: tokio::task::JoinHandle<Result<(), deleter::Error>>,
}
impl GarbageCollector {
/// Construct the garbage collector and start it
pub fn start(config: Config) -> Result<Self> {
let Config {
object_store,
sub_config,
catalog,
} = config;
let dry_run = sub_config.dry_run;
let cutoff = sub_config.cutoff()?;
info!(
cutoff_arg = %config.cutoff,
cutoff_arg = %sub_config.cutoff,
cutoff_parsed = %cutoff,
);
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let (tx1, rx1) = mpsc::channel(BATCH_SIZE);
let (tx2, rx2) = mpsc::channel(BATCH_SIZE);
let lister = tokio::spawn(lister::perform(Arc::clone(&object_store), tx1));
let lister = tokio::spawn(lister::perform(shutdown_rx, Arc::clone(&object_store), tx1));
let checker = tokio::spawn(checker::perform(catalog, cutoff, rx1, tx2));
let deleter = tokio::spawn(deleter::perform(object_store, dry_run, rx2));
Ok(Self {
shutdown_tx,
lister,
checker,
deleter,
})
}
/// A handle to gracefully shutdown the garbage collector when invoked
pub fn shutdown_handle(&self) -> impl Fn() {
let shutdown_tx = self.shutdown_tx.clone();
move || {
shutdown_tx.send(()).ok();
}
}
/// Wait for the garbage collector to finish work
pub async fn join(self) -> Result<()> {
let Self {
lister,
checker,
deleter,
..
} = self;
let (lister, checker, deleter) = futures::join!(lister, checker, deleter);
deleter.context(DeleterPanicSnafu)??;
@ -65,21 +105,25 @@ pub async fn main(config: Config) -> Result<()> {
lister.context(ListerPanicSnafu)??;
Ok(())
}
}
/// Clean up old object store files that don't appear in the catalog.
#[derive(Debug, Parser)]
/// Configuration to run the object store garbage collector
#[derive(Debug, Clone)]
pub struct Config {
#[clap(flatten)]
object_store: ObjectStoreConfig,
/// The object store to garbage collect
pub object_store: Arc<DynObjectStore>,
#[clap(flatten)]
catalog_dsn: CatalogDsnConfig,
/// The catalog to check if an object is garbage
pub catalog: Arc<dyn Catalog>,
/// logging options
#[clap(flatten)]
pub(crate) logging_config: LoggingConfig,
/// The garbage collector specific configuration
pub sub_config: SubConfig,
}
/// Configuration specific to the object store garbage collector
#[derive(Debug, Clone, Parser)]
pub struct SubConfig {
/// If this flag is specified, don't delete the files in object storage. Only print the files
/// that would be deleted if this flag wasn't specified.
#[clap(long)]
@ -94,20 +138,7 @@ pub struct Config {
cutoff: String,
}
impl Config {
fn object_store(&self) -> Result<Arc<DynObjectStore>> {
make_object_store(&self.object_store).context(CreatingObjectStoreSnafu)
}
async fn catalog(&self) -> Result<Arc<dyn Catalog>> {
let metrics = metric::Registry::default().into();
self.catalog_dsn
.get_catalog("iox_objectstore_garbage_collect", metrics)
.await
.context(CreatingCatalogSnafu)
}
impl SubConfig {
fn cutoff(&self) -> Result<DateTime<Utc>> {
let argument = &self.cutoff;
parse_date_string(argument, Utc::now(), Dialect::Us)
@ -118,16 +149,6 @@ impl Config {
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Could not create the object store"))]
CreatingObjectStore {
source: clap_blocks::object_store::ParseError,
},
#[snafu(display("Could not create the catalog"))]
CreatingCatalog {
source: clap_blocks::catalog_dsn::Error,
},
#[snafu(display(r#"Could not parse the cutoff "{argument}""#))]
ParsingCutoff {
source: chrono_english::DateError,
@ -153,12 +174,17 @@ pub enum Error {
DeleterPanic { source: tokio::task::JoinError },
}
type Result<T, E = Error> = std::result::Result<T, E>;
#[allow(missing_docs)]
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[cfg(test)]
mod tests {
use clap_blocks::{
catalog_dsn::CatalogDsnConfig,
object_store::{make_object_store, ObjectStoreConfig},
};
use filetime::FileTime;
use std::{fs, path::PathBuf};
use std::{fs, iter, path::PathBuf};
use tempfile::TempDir;
use super::*;
@ -167,13 +193,7 @@ mod tests {
async fn deletes_untracked_files_older_than_the_cutoff() {
let setup = OldFileSetup::new();
#[rustfmt::skip]
let config = Config::parse_from([
"dummy-program-name",
"--object-store", "file",
"--data-dir", setup.data_dir_arg(),
"--catalog", "memory",
]);
let config = build_config(setup.data_dir_arg(), []).await;
main(config).await.unwrap();
assert!(
@ -188,13 +208,9 @@ mod tests {
let setup = OldFileSetup::new();
#[rustfmt::skip]
let config = Config::parse_from([
"dummy-program-name",
"--object-store", "file",
"--data-dir", setup.data_dir_arg(),
"--catalog", "memory",
let config = build_config(setup.data_dir_arg(), [
"--cutoff", "10 years ago",
]);
]).await;
main(config).await.unwrap();
assert!(
@ -204,6 +220,42 @@ mod tests {
);
}
async fn build_config(data_dir: &str, args: impl IntoIterator<Item = &str> + Send) -> Config {
let sub_config = SubConfig::parse_from(iter::once("dummy-program-name").chain(args));
let object_store = object_store(data_dir);
let catalog = catalog().await;
Config {
object_store,
catalog,
sub_config,
}
}
fn object_store(data_dir: &str) -> Arc<DynObjectStore> {
#[rustfmt::skip]
let cfg = ObjectStoreConfig::parse_from([
"dummy-program-name",
"--object-store", "file",
"--data-dir", data_dir,
]);
make_object_store(&cfg).unwrap()
}
async fn catalog() -> Arc<dyn Catalog> {
#[rustfmt::skip]
let cfg = CatalogDsnConfig::parse_from([
"dummy-program-name",
"--catalog", "memory",
]);
let metrics = metric::Registry::default().into();
cfg.get_catalog("iox_objectstore_garbage_collect", metrics)
.await
.unwrap()
}
struct OldFileSetup {
data_dir: TempDir,
file_path: PathBuf,

View File

@ -3,19 +3,37 @@ use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::*;
use snafu::prelude::*;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::{
select,
sync::{broadcast, mpsc},
};
pub(crate) async fn perform(
mut shutdown: broadcast::Receiver<()>,
object_store: Arc<DynObjectStore>,
checker: mpsc::Sender<ObjectMeta>,
) -> Result<()> {
let mut items = object_store.list(None).await.context(ListingSnafu)?;
while let Some(item) = items.next().await {
loop {
select! {
_ = shutdown.recv() => {
break
},
item = items.next() => {
match item {
Some(item) => {
let item = item.context(MalformedSnafu)?;
info!(location = %item.location);
checker.send(item).await?;
}
None => {
break;
}
}
}
}
}
Ok(())
}

View File

@ -0,0 +1,18 @@
[package]
name = "ioxd_objectstore_garbage_collect"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
futures = "0.3"
hyper = "0.14"
iox_objectstore_garbage_collect = { path = "../iox_objectstore_garbage_collect" }
ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
snafu = "0.7"
tokio = { version = "1", features = ["sync"] }
trace = { path = "../trace" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -0,0 +1,196 @@
//! Tool to clean up old object store files that don't appear in the catalog.
#![deny(
rustdoc::broken_intra_doc_links,
rust_2018_idioms,
missing_debug_implementations,
unreachable_pub
)]
#![warn(
missing_docs,
clippy::todo,
clippy::dbg_macro,
clippy::clone_on_ref_ptr,
clippy::future_not_send
)]
#![allow(clippy::missing_docs_in_private_items)]
use async_trait::async_trait;
use futures::{
future::{BoxFuture, Shared},
prelude::*,
};
use hyper::{Body, Request, Response};
use iox_objectstore_garbage_collect::GarbageCollector;
use ioxd_common::{
http::error::{HttpApiError, HttpApiErrorCode, HttpApiErrorSource},
rpc::RpcBuilderInput,
serve_builder,
server_type::{RpcError, ServerType},
setup_builder,
};
use metric::Registry;
use snafu::prelude::*;
use std::{sync::Arc, time::Duration};
use tokio::{select, sync::broadcast, task::JoinError, time};
use trace::TraceCollector;
pub use iox_objectstore_garbage_collect::{Config, SubConfig};
/// The object store garbage collection server
#[derive(Debug)]
pub struct Server {
metric_registry: Arc<metric::Registry>,
worker: SharedCloneError<(), JoinError>,
shutdown_tx: broadcast::Sender<()>,
}
impl Server {
/// Construct and start the object store garbage collector
pub fn start(metric_registry: Arc<metric::Registry>, config: Config) -> Self {
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let worker = tokio::spawn(Self::worker_task(config, shutdown_rx));
let worker = shared_clone_error(worker);
Self {
metric_registry,
worker,
shutdown_tx,
}
}
async fn worker_task(config: Config, mut shutdown_rx: broadcast::Receiver<()>) {
const ONE_HOUR: u64 = 60 * 60;
let mut minimum_next_start_time = time::interval(Duration::from_secs(ONE_HOUR));
loop {
// Avoid a hot infinite loop when there's nothing to delete
select! {
_ = shutdown_rx.recv() => return,
_ = minimum_next_start_time.tick() => {},
}
let handle = GarbageCollector::start(config.clone())
.context(StartGarbageCollectorSnafu)
.unwrap_or_report();
let shutdown_garbage_collector = handle.shutdown_handle();
let mut complete = shared_clone_error(handle.join());
loop {
select! {
_ = shutdown_rx.recv() => {
shutdown_garbage_collector();
complete.await.context(JoinGarbageCollectorSnafu).unwrap_or_report();
return;
},
v = &mut complete => {
v.context(JoinGarbageCollectorSnafu).unwrap_or_report();
break;
},
}
}
}
}
}
#[async_trait]
impl ServerType for Server {
fn metric_registry(&self) -> Arc<Registry> {
Arc::clone(&self.metric_registry)
}
fn trace_collector(&self) -> Option<Arc<dyn TraceCollector>> {
None
}
async fn route_http_request(
&self,
_req: Request<Body>,
) -> Result<Response<Body>, Box<dyn HttpApiErrorSource>> {
Err(Box::new(HttpNotFound))
}
async fn server_grpc(self: Arc<Self>, builder_input: RpcBuilderInput) -> Result<(), RpcError> {
let builder = setup_builder!(builder_input, self);
serve_builder!(builder);
Ok(())
}
async fn join(self: Arc<Self>) {
self.worker
.clone()
.await
.context(JoinWorkerSnafu)
.unwrap_or_report();
}
fn shutdown(&self) {
self.shutdown_tx.send(()).ok();
}
}
#[derive(Debug, Snafu)]
struct HttpNotFound;
impl HttpApiErrorSource for HttpNotFound {
fn to_http_api_error(&self) -> HttpApiError {
HttpApiError::new(HttpApiErrorCode::NotFound, self.to_string())
}
}
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Could not start the garbage collector"))]
StartGarbageCollector {
source: iox_objectstore_garbage_collect::Error,
},
#[snafu(display("Could not join the garbage collector"))]
JoinGarbageCollector {
source: Arc<iox_objectstore_garbage_collect::Error>,
},
#[snafu(display("Could not join the garbage collector worker task"))]
JoinWorker { source: Arc<JoinError> },
}
#[allow(missing_docs)]
pub type Result<T, E = Error> = std::result::Result<T, E>;
trait UnwrapOrReportExt<T> {
fn unwrap_or_report(self) -> T;
}
impl<T> UnwrapOrReportExt<T> for Result<T> {
fn unwrap_or_report(self) -> T {
match self {
Ok(v) => v,
Err(e) => {
use snafu::ErrorCompat;
use std::fmt::Write;
let mut message = String::new();
writeln!(message, "{}", e).unwrap();
for cause in ErrorCompat::iter_chain(&e).skip(1) {
writeln!(message, "Caused by: {cause}").unwrap();
}
panic!("{message}");
}
}
}
}
type SharedCloneError<T, E> = Shared<BoxFuture<'static, Result<T, Arc<E>>>>;
fn shared_clone_error<F>(handle: F) -> SharedCloneError<F::Ok, F::Error>
where
F: TryFuture + Send + 'static,
F::Ok: Clone,
{
handle.map_err(Arc::new).boxed().shared()
}