diff --git a/server/src/config.rs b/server/src/config.rs index c8e9fcbd77..34fbf42ae8 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -543,10 +543,7 @@ impl<'a> DatabaseHandle<'a> { write_buffer, }; - let db = Arc::new(Db::new( - database_to_commit, - Arc::clone(application.job_registry()), - )); + let db = Db::new(database_to_commit, Arc::clone(application.job_registry())); self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan })); diff --git a/server/src/db.rs b/server/src/db.rs index 72d400c8cd..f5591368a9 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -11,7 +11,7 @@ use crate::{ table::TableSchemaUpsertHandle, Catalog, TableNameFilter, }, - lifecycle::{ArcDb, LockableCatalogChunk, LockableCatalogPartition}, + lifecycle::{LockableCatalogChunk, LockableCatalogPartition, WeakDb}, }, JobRegistry, }; @@ -367,6 +367,11 @@ pub struct Db { /// The cleanup job needs exclusive access and hence will acquire a write-guard. Creating parquet files and creating /// catalog transaction only needs shared access and hence will acquire a read-guard. cleanup_lock: Arc>, + + /// Lifecycle policy. + /// + /// Optional because it will be created after `Arc`. + lifcycle_policy: tokio::sync::Mutex>>, } /// All the information needed to commit a database @@ -382,7 +387,7 @@ pub(crate) struct DatabaseToCommit { } impl Db { - pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc) -> Self { + pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc) -> Arc { let db_name = database_to_commit.rules.name.clone(); let rules = RwLock::new(database_to_commit.rules); @@ -408,7 +413,7 @@ impl Db { let process_clock = process_clock::ProcessClock::new(); - Self { + let this = Self { rules, server_id, store, @@ -425,7 +430,13 @@ impl Db { ingest_metrics, write_buffer: database_to_commit.write_buffer, cleanup_lock: Default::default(), - } + lifcycle_policy: tokio::sync::Mutex::new(None), + }; + let this = Arc::new(this); + *this.lifcycle_policy.try_lock().expect("not used yet") = Some( + ::lifecycle::LifecyclePolicy::new(WeakDb(Arc::downgrade(&this))), + ); + this } /// Return a handle to the executor used to run queries @@ -763,13 +774,15 @@ impl Db { tokio::join!( // lifecycle policy loop async { - let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(self))); - while !shutdown.is_cancelled() { self.worker_iterations_lifecycle .fetch_add(1, Ordering::Relaxed); tokio::select! { - _ = policy.check_for_work(Utc::now(), std::time::Instant::now()) => {}, + _ = async { + let mut guard = self.lifcycle_policy.lock().await; + let policy = guard.as_mut().expect("lifecycle policy should be initialized"); + policy.check_for_work(Utc::now(), std::time::Instant::now()).await + } => {}, _ = shutdown.cancelled() => break, } } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index aa16c7cbda..698ad9cf2c 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -25,7 +25,12 @@ use lifecycle::{ use observability_deps::tracing::{info, trace}; use persistence_windows::persistence_windows::FlushHandle; use query::QueryChunkMeta; -use std::{fmt::Display, sync::Arc, time::Instant}; +use std::{ + convert::TryInto, + fmt::Display, + sync::{Arc, Weak}, + time::Instant, +}; use tracker::{RwLock, TaskTracker}; pub(crate) use compact::compact_chunks; @@ -43,17 +48,9 @@ mod persist; mod unload; mod write; -/// A newtype wrapper around `Arc` to workaround trait orphan rules +/// A newtype wrapper around `Weak` to workaround trait orphan rules #[derive(Debug, Clone)] -pub struct ArcDb(pub(super) Arc); - -impl std::ops::Deref for ArcDb { - type Target = Db; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} +pub struct WeakDb(pub(super) Weak); /// /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` @@ -232,28 +229,42 @@ impl LockablePartition for LockableCatalogPartition { } } -impl LifecycleDb for ArcDb { +impl LifecycleDb for WeakDb { type Chunk = LockableCatalogChunk; type Partition = LockableCatalogPartition; fn buffer_size(&self) -> usize { - self.catalog.metrics().memory().total() + self.0 + .upgrade() + .map(|db| db.catalog.metrics().memory().total()) + .unwrap_or_default() } fn rules(&self) -> LifecycleRules { - self.rules.read().lifecycle_rules.clone() + self.0 + .upgrade() + .map(|db| db.rules.read().lifecycle_rules.clone()) + .unwrap_or_default() } fn partitions(&self) -> Vec { - self.catalog - .partitions() - .into_iter() - .map(|partition| LockableCatalogPartition::new(Arc::clone(&self.0), partition)) - .collect() + self.0 + .upgrade() + .map(|db| { + db.catalog + .partitions() + .into_iter() + .map(|partition| LockableCatalogPartition::new(Arc::clone(&db), partition)) + .collect() + }) + .unwrap_or_default() } fn name(&self) -> DatabaseName<'static> { - self.rules.read().name.clone() + self.0 + .upgrade() + .map(|db| db.rules.read().name.clone()) + .unwrap_or_else(|| "gone".to_string().try_into().unwrap()) } } diff --git a/server/src/utils.rs b/server/src/utils.rs index e28b5f9060..704b8c864d 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -107,7 +107,7 @@ impl TestDbBuilder { TestDb { metric_registry: metrics::TestMetricRegistry::new(metrics_registry), - db: Arc::new(Db::new(database_to_commit, Arc::new(JobRegistry::new()))), + db: Db::new(database_to_commit, Arc::new(JobRegistry::new())), replay_plan, } }