diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index 20c57b5ccc..f71d3e2cd3 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -42,7 +42,7 @@ pub trait LifecycleDb { fn name(&self) -> DatabaseName<'static>; /// Return the time provider for this database - fn time_provider(&self) -> Arc; + fn time_provider(&self) -> &Arc; } /// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 25e737c5b8..ecd6d68c6c 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -21,10 +21,7 @@ pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); /// /// `LifecyclePolicy::check_for_work` can then be used to drive progress /// of the `LifecycleChunk` contained within this `LifecycleDb` -pub struct LifecyclePolicy -where - M: LifecycleDb, -{ +pub struct LifecyclePolicy { /// The `LifecycleDb` this policy is automating db: M, @@ -552,10 +549,7 @@ where } } -impl Debug for LifecyclePolicy -where - M: LifecycleDb, -{ +impl Debug for LifecyclePolicy { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "LifecyclePolicy{{..}}") } @@ -1132,8 +1126,8 @@ mod tests { DatabaseName::new("test_db").unwrap() } - fn time_provider(&self) -> Arc { - Arc::clone(&self.time_provider) + fn time_provider(&self) -> &Arc { + &self.time_provider } } diff --git a/server/src/database.rs b/server/src/database.rs index 0047f2dfcc..1b60612b83 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1,3 +1,4 @@ +use crate::lifecycle::LifecycleWorker; use crate::write_buffer::WriteBufferConsumer; use crate::{ db::{ @@ -646,6 +647,7 @@ async fn background_worker(shared: Arc) { initialize_database(shared.as_ref()).await; if shared.shutdown.is_cancelled() { + // TODO: Shutdown intermediate workers (#2813) info!(db_name=%shared.config.name, "database shutdown before finishing initialization"); break; } @@ -653,6 +655,7 @@ async fn background_worker(shared: Arc) { let DatabaseStateInitialized { db, write_buffer_consumer, + lifecycle_worker, .. } = shared .state @@ -682,6 +685,10 @@ async fn background_worker(shared: Arc) { .fuse(); futures::pin_mut!(consumer_join); + // Future that completes if the LifecycleWorker exits + let lifecycle_join = lifecycle_worker.join().fuse(); + futures::pin_mut!(lifecycle_join); + // This inner loop runs until either: // // - Something calls `Database::shutdown` @@ -711,6 +718,10 @@ async fn background_worker(shared: Arc) { error!(db_name=%shared.config.name, "unexpected shutdown of write buffer consumer - bailing out"); shared.shutdown.cancel(); } + _ = lifecycle_join => { + error!(db_name=%shared.config.name, "unexpected shutdown of lifecycle worker - bailing out"); + shared.shutdown.cancel(); + } _ = db_worker => { error!(db_name=%shared.config.name, "unexpected shutdown of db - bailing out"); shared.shutdown.cancel(); @@ -726,6 +737,14 @@ async fn background_worker(shared: Arc) { } } + if !lifecycle_join.is_terminated() { + info!(db_name=%shared.config.name, "shutting down lifecycle worker"); + lifecycle_worker.shutdown(); + if let Err(e) = lifecycle_worker.join().await { + error!(db_name=%shared.config.name, %e, "error shutting down lifecycle worker") + } + } + if !db_worker.is_terminated() { info!(db_name=%shared.config.name, "waiting for db worker shutdown"); db_shutdown.cancel(); @@ -1124,13 +1143,16 @@ impl DatabaseStateRulesLoaded { time_provider: Arc::clone(shared.application.time_provider()), }; - let db = Db::new( + let db = Arc::new(Db::new( database_to_commit, Arc::clone(shared.application.job_registry()), - ); + )); + + let lifecycle_worker = Arc::new(LifecycleWorker::new(Arc::clone(&db))); Ok(DatabaseStateCatalogLoaded { db, + lifecycle_worker, replay_plan: Arc::new(replay_plan), provided_rules: Arc::clone(&self.provided_rules), }) @@ -1140,6 +1162,7 @@ impl DatabaseStateRulesLoaded { #[derive(Debug, Clone)] struct DatabaseStateCatalogLoaded { db: Arc, + lifecycle_worker: Arc, replay_plan: Arc>, provided_rules: Arc, } @@ -1182,12 +1205,12 @@ impl DatabaseStateCatalogLoaded { _ => None, }; - // TODO: Pull write buffer and lifecycle out of Db - db.unsuppress_persistence(); + self.lifecycle_worker.unsuppress_persistence(); Ok(DatabaseStateInitialized { db, write_buffer_consumer, + lifecycle_worker: Arc::clone(&self.lifecycle_worker), provided_rules: Arc::clone(&self.provided_rules), }) } @@ -1197,6 +1220,7 @@ impl DatabaseStateCatalogLoaded { pub struct DatabaseStateInitialized { db: Arc, write_buffer_consumer: Option>, + lifecycle_worker: Arc, provided_rules: Arc, } diff --git a/server/src/db.rs b/server/src/db.rs index e398aa2d49..23d09e2c1f 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -50,6 +50,7 @@ use trace::ctx::SpanContext; use write_buffer::core::{WriteBufferReading, WriteBufferWriting}; pub(crate) use crate::db::chunk::DbChunk; +pub(crate) use crate::db::lifecycle::ArcDb; use crate::{ db::{ access::QueryCatalogAccess, @@ -59,7 +60,7 @@ use crate::{ table::TableSchemaUpsertHandle, Catalog, Error as CatalogError, TableNameFilter, }, - lifecycle::{LockableCatalogChunk, LockableCatalogPartition, WeakDb}, + lifecycle::{LockableCatalogChunk, LockableCatalogPartition}, }, JobRegistry, }; @@ -271,9 +272,6 @@ pub struct Db { /// Catalog interface for query catalog_access: Arc, - /// Number of iterations of the worker lifecycle loop for this Db - worker_iterations_lifecycle: AtomicUsize, - /// Number of iterations of the worker cleanup loop for this Db worker_iterations_cleanup: AtomicUsize, @@ -291,15 +289,6 @@ pub struct Db { /// catalog transaction only needs shared access and hence will acquire a read-guard. cleanup_lock: Arc>, - /// Lifecycle policy. - /// - /// Optional because it will be created after `Arc`. - /// - /// This is stored here for the following reasons: - /// - to control the persistence suppression via a [`Db::unsuppress_persistence`] - /// - to keep the lifecycle state (e.g. the number of running compactions) around - lifecycle_policy: Mutex>>, - time_provider: Arc, /// To-be-written delete predicates. @@ -327,7 +316,7 @@ pub(crate) struct DatabaseToCommit { } impl Db { - pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc) -> Arc { + pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc) -> Self { let name = Arc::from(database_to_commit.rules.name.as_str()); let rules = RwLock::new(database_to_commit.rules); @@ -344,7 +333,7 @@ impl Db { ); let catalog_access = Arc::new(catalog_access); - let this = Self { + Self { rules, name, server_id, @@ -355,21 +344,14 @@ impl Db { jobs, metric_registry: database_to_commit.metric_registry, catalog_access, - worker_iterations_lifecycle: AtomicUsize::new(0), worker_iterations_cleanup: AtomicUsize::new(0), worker_iterations_delete_predicate_preservation: AtomicUsize::new(0), write_buffer_producer: database_to_commit.write_buffer_producer, cleanup_lock: Default::default(), - lifecycle_policy: Mutex::new(None), time_provider: database_to_commit.time_provider, delete_predicates_mailbox: Default::default(), persisted_chunk_id_override: Default::default(), - }; - let this = Arc::new(this); - *this.lifecycle_policy.try_lock().expect("not used yet") = Some( - ::lifecycle::LifecyclePolicy::new_suppress_persistence(WeakDb(Arc::downgrade(&this))), - ); - this + } } /// Return all table names of the DB @@ -377,15 +359,6 @@ impl Db { self.catalog.table_names() } - /// Allow persistence if database rules all it. - pub fn unsuppress_persistence(&self) { - let mut guard = self.lifecycle_policy.lock(); - let policy = guard - .as_mut() - .expect("lifecycle policy should be initialized"); - policy.unsuppress_persistence(); - } - /// Return a handle to the executor used to run queries pub fn executor(&self) -> Arc { Arc::clone(&self.exec) @@ -785,11 +758,6 @@ impl Db { Some(chunk.table_summary()) } - /// Returns the number of iterations of the background worker lifecycle loop - pub fn worker_iterations_lifecycle(&self) -> usize { - self.worker_iterations_lifecycle.load(Ordering::Relaxed) - } - /// Returns the number of iterations of the background worker lifecycle loop pub fn worker_iterations_cleanup(&self) -> usize { self.worker_iterations_cleanup.load(Ordering::Relaxed) @@ -827,24 +795,6 @@ impl Db { ) { info!("started background worker"); - // Loop that drives the lifecycle for this database - let lifecycle_loop = async { - loop { - self.worker_iterations_lifecycle - .fetch_add(1, Ordering::Relaxed); - - let fut = { - let mut guard = self.lifecycle_policy.lock(); - let policy = guard - .as_mut() - .expect("lifecycle policy should be initialized"); - - policy.check_for_work() - }; - fut.await - } - }; - // object store cleanup loop let object_store_cleanup_loop = async { loop { @@ -915,7 +865,6 @@ impl Db { // None of the futures need to perform drain logic on shutdown. // When the first one finishes, all of them are dropped tokio::select! { - _ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"), _ = object_store_cleanup_loop => error!("object store cleanup loop exited - db worker bailing out"), _ = delete_predicate_persistence_loop => error!("delete predicate persistence loop exited - db worker bailing out"), _ = shutdown.cancelled() => info!("db worker shutting down"), diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 80c15f7adf..522de7e3e0 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -22,11 +22,7 @@ use observability_deps::tracing::{info, trace}; use persistence_windows::persistence_windows::FlushHandle; use query::QueryChunkMeta; use schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME}; -use std::{ - convert::TryInto, - fmt::Display, - sync::{Arc, Weak}, -}; +use std::{fmt::Display, sync::Arc}; use time::{Time, TimeProvider}; use tracker::{RwLock, TaskTracker}; @@ -43,11 +39,17 @@ mod persist; mod unload; mod write; -/// A newtype wrapper around `Weak` to workaround trait orphan rules +/// A newtype wrapper around `Arc` to workaround trait orphan rules /// /// TODO: Pull LifecyclePolicy out of Db to allow strong reference (#2242) #[derive(Debug, Clone)] -pub struct WeakDb(pub(super) Weak); +pub struct ArcDb(Arc); + +impl ArcDb { + pub fn new(db: Arc) -> Self { + Self(db) + } +} /// /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` @@ -239,52 +241,33 @@ impl LockablePartition for LockableCatalogPartition { } } -impl LifecycleDb for WeakDb { +impl LifecycleDb for ArcDb { type Chunk = LockableCatalogChunk; type Partition = LockableCatalogPartition; fn buffer_size(&self) -> usize { - self.0 - .upgrade() - .map(|db| db.catalog.metrics().memory().total()) - .unwrap_or_default() + self.0.catalog.metrics().memory().total() } fn rules(&self) -> LifecycleRules { - self.0 - .upgrade() - .map(|db| db.rules.read().lifecycle_rules.clone()) - .unwrap_or_default() + self.0.rules.read().lifecycle_rules.clone() } fn partitions(&self) -> Vec { self.0 - .upgrade() - .map(|db| { - db.catalog - .partitions() - .into_iter() - .map(|partition| LockableCatalogPartition::new(Arc::clone(&db), partition)) - .collect() - }) - .unwrap_or_default() + .catalog + .partitions() + .into_iter() + .map(|partition| LockableCatalogPartition::new(Arc::clone(&self.0), partition)) + .collect() } fn name(&self) -> DatabaseName<'static> { - self.0 - .upgrade() - .map(|db| db.rules.read().name.clone()) - .unwrap_or_else(|| "gone".to_string().try_into().unwrap()) + self.0.rules.read().name.clone() } - fn time_provider(&self) -> Arc { - Arc::clone( - &self - .0 - .upgrade() - .expect("database dropped without shutting down lifecycle") - .time_provider, - ) + fn time_provider(&self) -> &Arc { + &self.0.time_provider } } diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index b6f13762ed..b23278a734 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -438,6 +438,7 @@ mod tests { use tokio_util::sync::CancellationToken; use write_buffer::mock::{MockBufferForReading, MockBufferSharedState}; + use crate::lifecycle::LifecycleWorker; use crate::utils::TestDb; use crate::write_buffer::WriteBufferConsumer; @@ -581,6 +582,8 @@ mod tests { ) .await; + let mut lifecycle = LifecycleWorker::new(Arc::clone(&test_db.db)); + let mut maybe_consumer = Some(WriteBufferConsumer::new( Box::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()), Arc::clone(&test_db.db), @@ -606,6 +609,9 @@ mod tests { consumer.join().await.unwrap(); } + lifecycle.shutdown(); + lifecycle.join().await.unwrap(); + // stop background worker shutdown.cancel(); join_handle.await.unwrap(); @@ -626,6 +632,7 @@ mod tests { test_db = test_db_tmp; shutdown = shutdown_tmp; join_handle = join_handle_tmp; + lifecycle = LifecycleWorker::new(Arc::clone(&test_db.db)); } Step::Replay | Step::SkipReplay => { assert!(maybe_consumer.is_none()); @@ -710,8 +717,7 @@ mod tests { )); } - let db = &test_db.db; - db.unsuppress_persistence(); + lifecycle.unsuppress_persistence(); // wait until checks pass let t_0 = Instant::now(); diff --git a/server/src/lib.rs b/server/src/lib.rs index f77a0f9e58..4ed89638d4 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -118,6 +118,8 @@ pub mod utils; mod write_buffer; +mod lifecycle; + type DatabaseError = Box; #[derive(Debug, Snafu)] diff --git a/server/src/lifecycle.rs b/server/src/lifecycle.rs new file mode 100644 index 0000000000..e09e5b8d8d --- /dev/null +++ b/server/src/lifecycle.rs @@ -0,0 +1,96 @@ +use std::future::Future; +use std::sync::Arc; + +use futures::future::{BoxFuture, Shared}; +use futures::{FutureExt, TryFutureExt}; +use parking_lot::Mutex; +use tokio::task::JoinError; + +use crate::db::ArcDb; +use crate::Db; +use lifecycle::LifecyclePolicy; +use observability_deps::tracing::{info, warn}; +use tokio_util::sync::CancellationToken; + +/// A lifecycle worker manages a background task that drives a [`LifecyclePolicy`] +#[derive(Debug)] +pub struct LifecycleWorker { + /// Future that resolves when the background worker exits + join: Shared>>>, + + /// Shared worker state + state: Arc, +} + +#[derive(Debug)] +struct WorkerState { + policy: Mutex>, + + shutdown: CancellationToken, +} + +impl LifecycleWorker { + /// Creates a new `LifecycleWorker` + /// + /// The worker starts with persistence suppressed, persistence must be enabled + /// by a call to [`LifecycleWorker::unsuppress_persistence`] + pub fn new(db: Arc) -> Self { + let db = ArcDb::new(db); + let shutdown = CancellationToken::new(); + + let policy = LifecyclePolicy::new_suppress_persistence(db); + + let state = Arc::new(WorkerState { + policy: Mutex::new(policy), + shutdown, + }); + + let join = tokio::spawn(background_worker(Arc::clone(&state))) + .map_err(Arc::new) + .boxed() + .shared(); + + Self { join, state } + } + + /// Stop suppressing persistence and allow it if the database rules allow it. + pub fn unsuppress_persistence(&self) { + self.state.policy.lock().unsuppress_persistence() + } + + /// Triggers shutdown of this `WriteBufferConsumer` + pub fn shutdown(&self) { + self.state.shutdown.cancel() + } + + /// Waits for the background worker of this `Database` to exit + pub fn join(&self) -> impl Future>> { + self.join.clone() + } +} + +impl Drop for LifecycleWorker { + fn drop(&mut self) { + if !self.state.shutdown.is_cancelled() { + warn!("lifecycle worker dropped without calling shutdown()"); + self.state.shutdown.cancel(); + } + + if self.join.clone().now_or_never().is_none() { + warn!("lifecycle worker dropped without waiting for worker termination"); + } + } +} + +async fn background_worker(state: Arc) { + loop { + let fut = state.policy.lock().check_for_work(); + tokio::select! { + _ = fut => {}, + _ = state.shutdown.cancelled() => { + info!("lifecycle worker shutting down"); + break + } + } + } +} diff --git a/server/src/utils.rs b/server/src/utils.rs index af5814e6d7..11af11ba70 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -146,7 +146,7 @@ impl TestDbBuilder { TestDb { metric_registry, - db: Db::new(database_to_commit, jobs), + db: Arc::new(Db::new(database_to_commit, jobs)), replay_plan: replay_plan.expect("did not skip replay"), } }