diff --git a/server/src/db.rs b/server/src/db.rs index 39629ca3a6..e73b5c14fe 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -31,7 +31,7 @@ use entry::{Entry, Sequence, SequencedEntry, TableBatch}; use internal_types::schema::Schema; use iox_object_store::IoxObjectStore; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; -use observability_deps::tracing::{debug, error, info}; +use observability_deps::tracing::{debug, error, info, warn}; use parquet_file::catalog::{ cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, core::PreservedCatalog, @@ -277,6 +277,9 @@ pub struct Db { /// Number of iterations of the worker cleanup loop for this Db worker_iterations_cleanup: AtomicUsize, + /// Number of iterations of the worker delete predicate preservation loop for this Db + worker_iterations_delete_predicate_preservation: AtomicUsize, + /// Optional write buffer producer /// TODO: Move onto Database write_buffer_producer: Option>, @@ -299,6 +302,9 @@ pub struct Db { /// TESTING ONLY: Mocked `Instant::now()` for the background worker background_worker_now_override: Mutex>, + + /// To-be-written delete predicates. + delete_predicates_mailbox: Mutex, Vec)>>, } /// All the information needed to commit a database @@ -347,10 +353,12 @@ impl Db { catalog_access, worker_iterations_lifecycle: AtomicUsize::new(0), worker_iterations_cleanup: AtomicUsize::new(0), + worker_iterations_delete_predicate_preservation: AtomicUsize::new(0), write_buffer_producer: database_to_commit.write_buffer_producer, cleanup_lock: Default::default(), lifecycle_policy: tokio::sync::Mutex::new(None), background_worker_now_override: Default::default(), + delete_predicates_mailbox: Default::default(), }; let this = Arc::new(this); *this.lifecycle_policy.try_lock().expect("not used yet") = Some( @@ -583,12 +591,8 @@ impl Db { } if !affected_persisted_chunks.is_empty() { - let mut transaction = self.preserved_catalog.open_transaction().await; - transaction.delete_predicate(&delete_predicate, &affected_persisted_chunks); - transaction - .commit() - .await - .context(CommitDeletePredicateError)?; + let mut guard = self.delete_predicates_mailbox.lock(); + guard.push((delete_predicate, affected_persisted_chunks)); } Ok(()) @@ -798,6 +802,12 @@ impl Db { self.worker_iterations_cleanup.load(Ordering::Relaxed) } + /// Returns the number of iterations of the background worker delete predicate preservation loop + pub fn worker_iterations_delete_predicate_preservation(&self) -> usize { + self.worker_iterations_delete_predicate_preservation + .load(Ordering::Relaxed) + } + /// Perform sequencer-driven replay for this DB. /// /// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set @@ -887,11 +897,40 @@ impl Db { } }; + // worker loop to persist delete predicates + let delete_predicate_persistence_loop = async { + loop { + let todo: Vec<_> = { + let guard = self.delete_predicates_mailbox.lock(); + guard.clone() + }; + + if !todo.is_empty() { + match self.preserve_delete_predicates(&todo).await { + Ok(()) => { + let mut guard = self.delete_predicates_mailbox.lock(); + // TODO: we could also run a de-duplication here once + // https://github.com/influxdata/influxdb_iox/issues/2626 is implemented + guard.drain(0..todo.len()); + } + Err(e) => { + error!(%e, "cannot preserve delete predicates"); + } + } + } + + self.worker_iterations_delete_predicate_preservation + .fetch_add(1, Ordering::Relaxed); + tokio::time::sleep(Duration::from_secs(1)).await; + } + }; + // None of the futures need to perform drain logic on shutdown. // When the first one finishes, all of them are dropped tokio::select! { _ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"), - _ = object_store_cleanup_loop => error!("object store cleanup exited - db worker bailing out"), + _ = object_store_cleanup_loop => error!("object store cleanup loop exited - db worker bailing out"), + _ = delete_predicate_persistence_loop => error!("delete predicate persistence loop exited - db worker bailing out"), _ = shutdown.cancelled() => info!("db worker shutting down"), } @@ -915,6 +954,44 @@ impl Db { delete_parquet_files(&self.preserved_catalog, &files).await } + async fn preserve_delete_predicates( + self: &Arc, + predicates: &[(Arc, Vec)], + ) -> Result<(), parquet_file::catalog::core::Error> { + let mut transaction = self.preserved_catalog.open_transaction().await; + for (predicate, chunks) in predicates { + transaction.delete_predicate(predicate, chunks); + } + let ckpt_handle = transaction.commit().await?; + + let catalog_transactions_until_checkpoint = self + .rules + .read() + .lifecycle_rules + .catalog_transactions_until_checkpoint + .get(); + let create_checkpoint = + ckpt_handle.revision_counter() % catalog_transactions_until_checkpoint == 0; + if create_checkpoint { + // Commit is already done, so we can just scan the catalog for the state. + // + // NOTE: There can only be a single transaction in this section because the checkpoint handle holds + // transaction lock. Therefore we don't need to worry about concurrent modifications of + // preserved chunks. + if let Err(e) = ckpt_handle + .create_checkpoint(checkpoint_data_from_catalog(&self.catalog)) + .await + { + warn!(%e, "cannot create catalog checkpoint"); + + // That's somewhat OK. Don't fail the entire task, because the actual preservation was completed + // (both in-mem and within the preserved catalog). + } + } + + Ok(()) + } + /// Stores an entry based on the configuration. pub async fn store_entry(&self, entry: Entry, time_of_write: DateTime) -> Result<()> { let immutable = { @@ -3680,6 +3757,26 @@ mod tests { .await .unwrap(); + // ==================== do: use background worker for a short while ==================== + let iters_start = db.worker_iterations_delete_predicate_preservation(); + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + let t_0 = Instant::now(); + loop { + if db.worker_iterations_delete_predicate_preservation() > iters_start { + break; + } + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + shutdown.cancel(); + join_handle.await.unwrap(); + // ==================== check: delete predicates ==================== let closure_check_delete_predicates = |db: &Db| { for chunk in db.catalog.chunks() {