feat: concurrent garbage collector deletes (#5364)

This should speed up the prod process a bit.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-08-10 09:14:46 +00:00 committed by GitHub
parent c5f062bba0
commit 4da124d862
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 19 deletions

1
Cargo.lock generated
View File

@ -1607,6 +1607,7 @@ dependencies = [
"snafu",
"tempfile",
"tokio",
"tokio-stream",
"uuid 1.1.2",
]

View File

@ -13,6 +13,7 @@ object_store = { version = "0.3.0" }
observability_deps = { path = "../observability_deps" }
snafu = "0.7"
tokio = { version = "1", features = ["macros", "rt", "sync"] }
tokio-stream = "0.1"
uuid = { version = "1", features = ["v4"] }
[dev-dependencies]

View File

@ -1,3 +1,4 @@
use futures::{StreamExt, TryStreamExt};
use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::info;
use snafu::prelude::*;
@ -7,20 +8,30 @@ use tokio::sync::mpsc;
pub(crate) async fn perform(
object_store: Arc<DynObjectStore>,
dry_run: bool,
mut items: mpsc::Receiver<ObjectMeta>,
concurrent_deletes: usize,
items: mpsc::Receiver<ObjectMeta>,
) -> Result<()> {
while let Some(item) = items.recv().await {
let path = item.location;
if dry_run {
info!(?path, "Not deleting due to dry run");
} else {
info!("Deleting {path}");
object_store
.delete(&path)
.await
.context(DeletingSnafu { path })?;
}
}
tokio_stream::wrappers::ReceiverStream::new(items)
.map(|item| {
let object_store = Arc::clone(&object_store);
async move {
let path = item.location;
if dry_run {
info!(?path, "Not deleting due to dry run");
Ok(())
} else {
info!("Deleting {path}");
object_store
.delete(&path)
.await
.context(DeletingSnafu { path })
}
}
})
.buffer_unordered(concurrent_deletes)
.try_collect()
.await?;
Ok(())
}

View File

@ -32,7 +32,7 @@ mod deleter;
/// Logic for listing all files in object storage.
mod lister;
const BATCH_SIZE: usize = 1000;
const BUFFER_SIZE: usize = 1000;
/// Run the tasks that clean up old object store files that don't appear in the catalog.
pub async fn main(config: Config) -> Result<()> {
@ -72,12 +72,17 @@ impl GarbageCollector {
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
let (tx1, rx1) = mpsc::channel(BATCH_SIZE);
let (tx2, rx2) = mpsc::channel(BATCH_SIZE);
let (tx1, rx1) = mpsc::channel(BUFFER_SIZE);
let (tx2, rx2) = mpsc::channel(BUFFER_SIZE);
let lister = tokio::spawn(lister::perform(shutdown_rx, Arc::clone(&object_store), tx1));
let checker = tokio::spawn(checker::perform(catalog, cutoff, rx1, tx2));
let deleter = tokio::spawn(deleter::perform(object_store, dry_run, rx2));
let deleter = tokio::spawn(deleter::perform(
object_store,
dry_run,
sub_config.concurrent_deletes,
rx2,
));
Ok(Self {
shutdown_tx,
@ -140,7 +145,7 @@ impl Debug for Config {
pub struct SubConfig {
/// If this flag is specified, don't delete the files in object storage. Only print the files
/// that would be deleted if this flag wasn't specified.
#[clap(long)]
#[clap(long, env = "INFLUXDB_IOX_GC_DRY_RUN")]
dry_run: bool,
/// Items in the object store that are older than this timestamp and also unreferenced in the
@ -148,8 +153,16 @@ pub struct SubConfig {
///
/// Can be an exact datetime like `2020-01-01T01:23:45-05:00` or a fuzzy
/// specification like `1 hour ago`. If not specified, defaults to 14 days ago.
#[clap(long, default_value_t = String::from("14 days ago"))]
#[clap(
long,
default_value_t = String::from("14 days ago"),
env = "INFLUXDB_IOX_GC_CUTOFF",
)]
cutoff: String,
/// Number of concurrent object store deletion tasks
#[clap(long, default_value_t = 5, env = "INFLUXDB_IOX_GC_CONCURRENT_DELETES")]
concurrent_deletes: usize,
}
impl SubConfig {