Merge pull request #2628 from influxdata/crepererum/move_delete_predicate_persistence

refactor: move delete predicate persistence into background job
pull/24376/head
kodiakhq[bot] 2021-10-06 06:14:05 +00:00 committed by GitHub
commit fe61a4a868
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 105 additions and 8 deletions

View File

@ -31,7 +31,7 @@ use entry::{Entry, Sequence, SequencedEntry, TableBatch};
use internal_types::schema::Schema; use internal_types::schema::Schema;
use iox_object_store::IoxObjectStore; use iox_object_store::IoxObjectStore;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use observability_deps::tracing::{debug, error, info}; use observability_deps::tracing::{debug, error, info, warn};
use parquet_file::catalog::{ use parquet_file::catalog::{
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
core::PreservedCatalog, core::PreservedCatalog,
@ -277,6 +277,9 @@ pub struct Db {
/// Number of iterations of the worker cleanup loop for this Db /// Number of iterations of the worker cleanup loop for this Db
worker_iterations_cleanup: AtomicUsize, worker_iterations_cleanup: AtomicUsize,
/// Number of iterations of the worker delete predicate preservation loop for this Db
worker_iterations_delete_predicate_preservation: AtomicUsize,
/// Optional write buffer producer /// Optional write buffer producer
/// TODO: Move onto Database /// TODO: Move onto Database
write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>, write_buffer_producer: Option<Arc<dyn WriteBufferWriting>>,
@ -299,6 +302,9 @@ pub struct Db {
/// TESTING ONLY: Mocked `Instant::now()` for the background worker /// TESTING ONLY: Mocked `Instant::now()` for the background worker
background_worker_now_override: Mutex<Option<Instant>>, background_worker_now_override: Mutex<Option<Instant>>,
/// To-be-written delete predicates.
delete_predicates_mailbox: Mutex<Vec<(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)>>,
} }
/// All the information needed to commit a database /// All the information needed to commit a database
@ -347,10 +353,12 @@ impl Db {
catalog_access, catalog_access,
worker_iterations_lifecycle: AtomicUsize::new(0), worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0), worker_iterations_cleanup: AtomicUsize::new(0),
worker_iterations_delete_predicate_preservation: AtomicUsize::new(0),
write_buffer_producer: database_to_commit.write_buffer_producer, write_buffer_producer: database_to_commit.write_buffer_producer,
cleanup_lock: Default::default(), cleanup_lock: Default::default(),
lifecycle_policy: tokio::sync::Mutex::new(None), lifecycle_policy: tokio::sync::Mutex::new(None),
background_worker_now_override: Default::default(), background_worker_now_override: Default::default(),
delete_predicates_mailbox: Default::default(),
}; };
let this = Arc::new(this); let this = Arc::new(this);
*this.lifecycle_policy.try_lock().expect("not used yet") = Some( *this.lifecycle_policy.try_lock().expect("not used yet") = Some(
@ -583,12 +591,8 @@ impl Db {
} }
if !affected_persisted_chunks.is_empty() { if !affected_persisted_chunks.is_empty() {
let mut transaction = self.preserved_catalog.open_transaction().await; let mut guard = self.delete_predicates_mailbox.lock();
transaction.delete_predicate(&delete_predicate, &affected_persisted_chunks); guard.push((delete_predicate, affected_persisted_chunks));
transaction
.commit()
.await
.context(CommitDeletePredicateError)?;
} }
Ok(()) Ok(())
@ -798,6 +802,12 @@ impl Db {
self.worker_iterations_cleanup.load(Ordering::Relaxed) self.worker_iterations_cleanup.load(Ordering::Relaxed)
} }
/// Returns the number of iterations of the background worker delete predicate preservation loop
pub fn worker_iterations_delete_predicate_preservation(&self) -> usize {
self.worker_iterations_delete_predicate_preservation
.load(Ordering::Relaxed)
}
/// Perform sequencer-driven replay for this DB. /// Perform sequencer-driven replay for this DB.
/// ///
/// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set /// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set
@ -887,11 +897,40 @@ impl Db {
} }
}; };
// worker loop to persist delete predicates
let delete_predicate_persistence_loop = async {
loop {
let todo: Vec<_> = {
let guard = self.delete_predicates_mailbox.lock();
guard.clone()
};
if !todo.is_empty() {
match self.preserve_delete_predicates(&todo).await {
Ok(()) => {
let mut guard = self.delete_predicates_mailbox.lock();
// TODO: we could also run a de-duplication here once
// https://github.com/influxdata/influxdb_iox/issues/2626 is implemented
guard.drain(0..todo.len());
}
Err(e) => {
error!(%e, "cannot preserve delete predicates");
}
}
}
self.worker_iterations_delete_predicate_preservation
.fetch_add(1, Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(1)).await;
}
};
// None of the futures need to perform drain logic on shutdown. // None of the futures need to perform drain logic on shutdown.
// When the first one finishes, all of them are dropped // When the first one finishes, all of them are dropped
tokio::select! { tokio::select! {
_ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"), _ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"),
_ = object_store_cleanup_loop => error!("object store cleanup exited - db worker bailing out"), _ = object_store_cleanup_loop => error!("object store cleanup loop exited - db worker bailing out"),
_ = delete_predicate_persistence_loop => error!("delete predicate persistence loop exited - db worker bailing out"),
_ = shutdown.cancelled() => info!("db worker shutting down"), _ = shutdown.cancelled() => info!("db worker shutting down"),
} }
@ -915,6 +954,44 @@ impl Db {
delete_parquet_files(&self.preserved_catalog, &files).await delete_parquet_files(&self.preserved_catalog, &files).await
} }
async fn preserve_delete_predicates(
self: &Arc<Self>,
predicates: &[(Arc<DeletePredicate>, Vec<ChunkAddrWithoutDatabase>)],
) -> Result<(), parquet_file::catalog::core::Error> {
let mut transaction = self.preserved_catalog.open_transaction().await;
for (predicate, chunks) in predicates {
transaction.delete_predicate(predicate, chunks);
}
let ckpt_handle = transaction.commit().await?;
let catalog_transactions_until_checkpoint = self
.rules
.read()
.lifecycle_rules
.catalog_transactions_until_checkpoint
.get();
let create_checkpoint =
ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0;
if create_checkpoint {
// Commit is already done, so we can just scan the catalog for the state.
//
// NOTE: There can only be a single transaction in this section because the checkpoint handle holds
// transaction lock. Therefore we don't need to worry about concurrent modifications of
// preserved chunks.
if let Err(e) = ckpt_handle
.create_checkpoint(checkpoint_data_from_catalog(&self.catalog))
.await
{
warn!(%e, "cannot create catalog checkpoint");
// That's somewhat OK. Don't fail the entire task, because the actual preservation was completed
// (both in-mem and within the preserved catalog).
}
}
Ok(())
}
/// Stores an entry based on the configuration. /// Stores an entry based on the configuration.
pub async fn store_entry(&self, entry: Entry, time_of_write: DateTime<Utc>) -> Result<()> { pub async fn store_entry(&self, entry: Entry, time_of_write: DateTime<Utc>) -> Result<()> {
let immutable = { let immutable = {
@ -3680,6 +3757,26 @@ mod tests {
.await .await
.unwrap(); .unwrap();
// ==================== do: use background worker for a short while ====================
let iters_start = db.worker_iterations_delete_predicate_preservation();
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
let t_0 = Instant::now();
loop {
if db.worker_iterations_delete_predicate_preservation() > iters_start {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
shutdown.cancel();
join_handle.await.unwrap();
// ==================== check: delete predicates ==================== // ==================== check: delete predicates ====================
let closure_check_delete_predicates = |db: &Db| { let closure_check_delete_predicates = |db: &Db| {
for chunk in db.catalog.chunks() { for chunk in db.catalog.chunks() {