refactor: pull lifecycle out of Db (#2242) (#2831)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-10-15 14:08:00 +01:00 committed by GitHub
parent beaf77cecf
commit d6b7b56f16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 165 additions and 111 deletions

View File

@ -42,7 +42,7 @@ pub trait LifecycleDb {
fn name(&self) -> DatabaseName<'static>;
/// Return the time provider for this database
fn time_provider(&self) -> Arc<dyn TimeProvider>;
fn time_provider(&self) -> &Arc<dyn TimeProvider>;
}
/// A `LockablePartition` is a wrapper around a `LifecyclePartition` that allows

View File

@ -21,10 +21,7 @@ pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
///
/// `LifecyclePolicy::check_for_work` can then be used to drive progress
/// of the `LifecycleChunk` contained within this `LifecycleDb`
pub struct LifecyclePolicy<M>
where
M: LifecycleDb,
{
pub struct LifecyclePolicy<M> {
/// The `LifecycleDb` this policy is automating
db: M,
@ -552,10 +549,7 @@ where
}
}
impl<M> Debug for LifecyclePolicy<M>
where
M: LifecycleDb,
{
impl<M> Debug for LifecyclePolicy<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LifecyclePolicy{{..}}")
}
@ -1132,8 +1126,8 @@ mod tests {
DatabaseName::new("test_db").unwrap()
}
fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(&self.time_provider)
fn time_provider(&self) -> &Arc<dyn TimeProvider> {
&self.time_provider
}
}

View File

@ -1,3 +1,4 @@
use crate::lifecycle::LifecycleWorker;
use crate::write_buffer::WriteBufferConsumer;
use crate::{
db::{
@ -646,6 +647,7 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
initialize_database(shared.as_ref()).await;
if shared.shutdown.is_cancelled() {
// TODO: Shutdown intermediate workers (#2813)
info!(db_name=%shared.config.name, "database shutdown before finishing initialization");
break;
}
@ -653,6 +655,7 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
let DatabaseStateInitialized {
db,
write_buffer_consumer,
lifecycle_worker,
..
} = shared
.state
@ -682,6 +685,10 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
.fuse();
futures::pin_mut!(consumer_join);
// Future that completes if the LifecycleWorker exits
let lifecycle_join = lifecycle_worker.join().fuse();
futures::pin_mut!(lifecycle_join);
// This inner loop runs until either:
//
// - Something calls `Database::shutdown`
@ -711,6 +718,10 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
error!(db_name=%shared.config.name, "unexpected shutdown of write buffer consumer - bailing out");
shared.shutdown.cancel();
}
_ = lifecycle_join => {
error!(db_name=%shared.config.name, "unexpected shutdown of lifecycle worker - bailing out");
shared.shutdown.cancel();
}
_ = db_worker => {
error!(db_name=%shared.config.name, "unexpected shutdown of db - bailing out");
shared.shutdown.cancel();
@ -726,6 +737,14 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
}
}
if !lifecycle_join.is_terminated() {
info!(db_name=%shared.config.name, "shutting down lifecycle worker");
lifecycle_worker.shutdown();
if let Err(e) = lifecycle_worker.join().await {
error!(db_name=%shared.config.name, %e, "error shutting down lifecycle worker")
}
}
if !db_worker.is_terminated() {
info!(db_name=%shared.config.name, "waiting for db worker shutdown");
db_shutdown.cancel();
@ -1124,13 +1143,16 @@ impl DatabaseStateRulesLoaded {
time_provider: Arc::clone(shared.application.time_provider()),
};
let db = Db::new(
let db = Arc::new(Db::new(
database_to_commit,
Arc::clone(shared.application.job_registry()),
);
));
let lifecycle_worker = Arc::new(LifecycleWorker::new(Arc::clone(&db)));
Ok(DatabaseStateCatalogLoaded {
db,
lifecycle_worker,
replay_plan: Arc::new(replay_plan),
provided_rules: Arc::clone(&self.provided_rules),
})
@ -1140,6 +1162,7 @@ impl DatabaseStateRulesLoaded {
#[derive(Debug, Clone)]
struct DatabaseStateCatalogLoaded {
db: Arc<Db>,
lifecycle_worker: Arc<LifecycleWorker>,
replay_plan: Arc<Option<ReplayPlan>>,
provided_rules: Arc<ProvidedDatabaseRules>,
}
@ -1182,12 +1205,12 @@ impl DatabaseStateCatalogLoaded {
_ => None,
};
// TODO: Pull write buffer and lifecycle out of Db
db.unsuppress_persistence();
self.lifecycle_worker.unsuppress_persistence();
Ok(DatabaseStateInitialized {
db,
write_buffer_consumer,
lifecycle_worker: Arc::clone(&self.lifecycle_worker),
provided_rules: Arc::clone(&self.provided_rules),
})
}
@ -1197,6 +1220,7 @@ impl DatabaseStateCatalogLoaded {
pub struct DatabaseStateInitialized {
db: Arc<Db>,
write_buffer_consumer: Option<Arc<WriteBufferConsumer>>,
lifecycle_worker: Arc<LifecycleWorker>,
provided_rules: Arc<ProvidedDatabaseRules>,
}

View File

@ -50,6 +50,7 @@ use trace::ctx::SpanContext;
use write_buffer::core::{WriteBufferReading, WriteBufferWriting};
pub(crate) use crate::db::chunk::DbChunk;
pub(crate) use crate::db::lifecycle::ArcDb;
use crate::{
db::{
access::QueryCatalogAccess,
@ -59,7 +60,7 @@ use crate::{
table::TableSchemaUpsertHandle,
Catalog, Error as CatalogError, TableNameFilter,
},
lifecycle::{LockableCatalogChunk, LockableCatalogPartition, WeakDb},
lifecycle::{LockableCatalogChunk, LockableCatalogPartition},
},
JobRegistry,
};
@ -271,9 +272,6 @@ pub struct Db {
/// Catalog interface for query
catalog_access: Arc<QueryCatalogAccess>,
/// Number of iterations of the worker lifecycle loop for this Db
worker_iterations_lifecycle: AtomicUsize,
/// Number of iterations of the worker cleanup loop for this Db
worker_iterations_cleanup: AtomicUsize,
@ -291,15 +289,6 @@ pub struct Db {
/// catalog transaction only needs shared access and hence will acquire a read-guard.
cleanup_lock: Arc<tokio::sync::RwLock<()>>,
/// Lifecycle policy.
///
/// Optional because it will be created after `Arc<Self>`.
///
/// This is stored here for the following reasons:
/// - to control the persistence suppression via a [`Db::unsuppress_persistence`]
/// - to keep the lifecycle state (e.g. the number of running compactions) around
lifecycle_policy: Mutex<Option<::lifecycle::LifecyclePolicy<WeakDb>>>,
time_provider: Arc<dyn TimeProvider>,
/// To-be-written delete predicates.
@ -327,7 +316,7 @@ pub(crate) struct DatabaseToCommit {
}
impl Db {
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Arc<Self> {
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Self {
let name = Arc::from(database_to_commit.rules.name.as_str());
let rules = RwLock::new(database_to_commit.rules);
@ -344,7 +333,7 @@ impl Db {
);
let catalog_access = Arc::new(catalog_access);
let this = Self {
Self {
rules,
name,
server_id,
@ -355,21 +344,14 @@ impl Db {
jobs,
metric_registry: database_to_commit.metric_registry,
catalog_access,
worker_iterations_lifecycle: AtomicUsize::new(0),
worker_iterations_cleanup: AtomicUsize::new(0),
worker_iterations_delete_predicate_preservation: AtomicUsize::new(0),
write_buffer_producer: database_to_commit.write_buffer_producer,
cleanup_lock: Default::default(),
lifecycle_policy: Mutex::new(None),
time_provider: database_to_commit.time_provider,
delete_predicates_mailbox: Default::default(),
persisted_chunk_id_override: Default::default(),
};
let this = Arc::new(this);
*this.lifecycle_policy.try_lock().expect("not used yet") = Some(
::lifecycle::LifecyclePolicy::new_suppress_persistence(WeakDb(Arc::downgrade(&this))),
);
this
}
}
/// Return all table names of the DB
@ -377,15 +359,6 @@ impl Db {
self.catalog.table_names()
}
/// Allow persistence if database rules all it.
pub fn unsuppress_persistence(&self) {
let mut guard = self.lifecycle_policy.lock();
let policy = guard
.as_mut()
.expect("lifecycle policy should be initialized");
policy.unsuppress_persistence();
}
/// Return a handle to the executor used to run queries
pub fn executor(&self) -> Arc<Executor> {
Arc::clone(&self.exec)
@ -785,11 +758,6 @@ impl Db {
Some(chunk.table_summary())
}
/// Returns the number of iterations of the background worker lifecycle loop
pub fn worker_iterations_lifecycle(&self) -> usize {
self.worker_iterations_lifecycle.load(Ordering::Relaxed)
}
/// Returns the number of iterations of the background worker lifecycle loop
pub fn worker_iterations_cleanup(&self) -> usize {
self.worker_iterations_cleanup.load(Ordering::Relaxed)
@ -827,24 +795,6 @@ impl Db {
) {
info!("started background worker");
// Loop that drives the lifecycle for this database
let lifecycle_loop = async {
loop {
self.worker_iterations_lifecycle
.fetch_add(1, Ordering::Relaxed);
let fut = {
let mut guard = self.lifecycle_policy.lock();
let policy = guard
.as_mut()
.expect("lifecycle policy should be initialized");
policy.check_for_work()
};
fut.await
}
};
// object store cleanup loop
let object_store_cleanup_loop = async {
loop {
@ -915,7 +865,6 @@ impl Db {
// None of the futures need to perform drain logic on shutdown.
// When the first one finishes, all of them are dropped
tokio::select! {
_ = lifecycle_loop => error!("lifecycle loop exited - db worker bailing out"),
_ = object_store_cleanup_loop => error!("object store cleanup loop exited - db worker bailing out"),
_ = delete_predicate_persistence_loop => error!("delete predicate persistence loop exited - db worker bailing out"),
_ = shutdown.cancelled() => info!("db worker shutting down"),

View File

@ -22,11 +22,7 @@ use observability_deps::tracing::{info, trace};
use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta;
use schema::{merge::SchemaMerger, Schema, TIME_COLUMN_NAME};
use std::{
convert::TryInto,
fmt::Display,
sync::{Arc, Weak},
};
use std::{fmt::Display, sync::Arc};
use time::{Time, TimeProvider};
use tracker::{RwLock, TaskTracker};
@ -43,11 +39,17 @@ mod persist;
mod unload;
mod write;
/// A newtype wrapper around `Weak<Db>` to workaround trait orphan rules
/// A newtype wrapper around `Arc<Db>` to workaround trait orphan rules
///
/// TODO: Pull LifecyclePolicy out of Db to allow strong reference (#2242)
#[derive(Debug, Clone)]
pub struct WeakDb(pub(super) Weak<Db>);
pub struct ArcDb(Arc<Db>);
impl ArcDb {
pub fn new(db: Arc<Db>) -> Self {
Self(db)
}
}
///
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
@ -239,52 +241,33 @@ impl LockablePartition for LockableCatalogPartition {
}
}
impl LifecycleDb for WeakDb {
impl LifecycleDb for ArcDb {
type Chunk = LockableCatalogChunk;
type Partition = LockableCatalogPartition;
fn buffer_size(&self) -> usize {
self.0
.upgrade()
.map(|db| db.catalog.metrics().memory().total())
.unwrap_or_default()
self.0.catalog.metrics().memory().total()
}
fn rules(&self) -> LifecycleRules {
self.0
.upgrade()
.map(|db| db.rules.read().lifecycle_rules.clone())
.unwrap_or_default()
self.0.rules.read().lifecycle_rules.clone()
}
fn partitions(&self) -> Vec<Self::Partition> {
self.0
.upgrade()
.map(|db| {
db.catalog
.partitions()
.into_iter()
.map(|partition| LockableCatalogPartition::new(Arc::clone(&db), partition))
.collect()
})
.unwrap_or_default()
.catalog
.partitions()
.into_iter()
.map(|partition| LockableCatalogPartition::new(Arc::clone(&self.0), partition))
.collect()
}
fn name(&self) -> DatabaseName<'static> {
self.0
.upgrade()
.map(|db| db.rules.read().name.clone())
.unwrap_or_else(|| "gone".to_string().try_into().unwrap())
self.0.rules.read().name.clone()
}
fn time_provider(&self) -> Arc<dyn TimeProvider> {
Arc::clone(
&self
.0
.upgrade()
.expect("database dropped without shutting down lifecycle")
.time_provider,
)
fn time_provider(&self) -> &Arc<dyn TimeProvider> {
&self.0.time_provider
}
}

View File

@ -438,6 +438,7 @@ mod tests {
use tokio_util::sync::CancellationToken;
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use crate::lifecycle::LifecycleWorker;
use crate::utils::TestDb;
use crate::write_buffer::WriteBufferConsumer;
@ -581,6 +582,8 @@ mod tests {
)
.await;
let mut lifecycle = LifecycleWorker::new(Arc::clone(&test_db.db));
let mut maybe_consumer = Some(WriteBufferConsumer::new(
Box::new(MockBufferForReading::new(write_buffer_state.clone(), None).unwrap()),
Arc::clone(&test_db.db),
@ -606,6 +609,9 @@ mod tests {
consumer.join().await.unwrap();
}
lifecycle.shutdown();
lifecycle.join().await.unwrap();
// stop background worker
shutdown.cancel();
join_handle.await.unwrap();
@ -626,6 +632,7 @@ mod tests {
test_db = test_db_tmp;
shutdown = shutdown_tmp;
join_handle = join_handle_tmp;
lifecycle = LifecycleWorker::new(Arc::clone(&test_db.db));
}
Step::Replay | Step::SkipReplay => {
assert!(maybe_consumer.is_none());
@ -710,8 +717,7 @@ mod tests {
));
}
let db = &test_db.db;
db.unsuppress_persistence();
lifecycle.unsuppress_persistence();
// wait until checks pass
let t_0 = Instant::now();

View File

@ -118,6 +118,8 @@ pub mod utils;
mod write_buffer;
mod lifecycle;
type DatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(Debug, Snafu)]

96
server/src/lifecycle.rs Normal file
View File

@ -0,0 +1,96 @@
use std::future::Future;
use std::sync::Arc;
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
use parking_lot::Mutex;
use tokio::task::JoinError;
use crate::db::ArcDb;
use crate::Db;
use lifecycle::LifecyclePolicy;
use observability_deps::tracing::{info, warn};
use tokio_util::sync::CancellationToken;
/// A lifecycle worker manages a background task that drives a [`LifecyclePolicy`]
#[derive(Debug)]
pub struct LifecycleWorker {
/// Future that resolves when the background worker exits
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
/// Shared worker state
state: Arc<WorkerState>,
}
#[derive(Debug)]
struct WorkerState {
policy: Mutex<LifecyclePolicy<ArcDb>>,
shutdown: CancellationToken,
}
impl LifecycleWorker {
/// Creates a new `LifecycleWorker`
///
/// The worker starts with persistence suppressed, persistence must be enabled
/// by a call to [`LifecycleWorker::unsuppress_persistence`]
pub fn new(db: Arc<Db>) -> Self {
let db = ArcDb::new(db);
let shutdown = CancellationToken::new();
let policy = LifecyclePolicy::new_suppress_persistence(db);
let state = Arc::new(WorkerState {
policy: Mutex::new(policy),
shutdown,
});
let join = tokio::spawn(background_worker(Arc::clone(&state)))
.map_err(Arc::new)
.boxed()
.shared();
Self { join, state }
}
/// Stop suppressing persistence and allow it if the database rules allow it.
pub fn unsuppress_persistence(&self) {
self.state.policy.lock().unsuppress_persistence()
}
/// Triggers shutdown of this `WriteBufferConsumer`
pub fn shutdown(&self) {
self.state.shutdown.cancel()
}
/// Waits for the background worker of this `Database` to exit
pub fn join(&self) -> impl Future<Output = Result<(), Arc<JoinError>>> {
self.join.clone()
}
}
impl Drop for LifecycleWorker {
fn drop(&mut self) {
if !self.state.shutdown.is_cancelled() {
warn!("lifecycle worker dropped without calling shutdown()");
self.state.shutdown.cancel();
}
if self.join.clone().now_or_never().is_none() {
warn!("lifecycle worker dropped without waiting for worker termination");
}
}
}
async fn background_worker(state: Arc<WorkerState>) {
loop {
let fut = state.policy.lock().check_for_work();
tokio::select! {
_ = fut => {},
_ = state.shutdown.cancelled() => {
info!("lifecycle worker shutting down");
break
}
}
}
}

View File

@ -146,7 +146,7 @@ impl TestDbBuilder {
TestDb {
metric_registry,
db: Db::new(database_to_commit, jobs),
db: Arc::new(Db::new(database_to_commit, jobs)),
replay_plan: replay_plan.expect("did not skip replay"),
}
}