refactor: move lifecycle policy into Db struct

pull/24376/head
Marco Neumann 2021-08-02 13:58:49 +02:00
parent 6f163aba13
commit 0fe8eda89e
4 changed files with 53 additions and 32 deletions

View File

@ -543,10 +543,7 @@ impl<'a> DatabaseHandle<'a> {
write_buffer, write_buffer,
}; };
let db = Arc::new(Db::new( let db = Db::new(database_to_commit, Arc::clone(application.job_registry()));
database_to_commit,
Arc::clone(application.job_registry()),
));
self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan })); self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan }));

View File

@ -11,7 +11,7 @@ use crate::{
table::TableSchemaUpsertHandle, table::TableSchemaUpsertHandle,
Catalog, TableNameFilter, Catalog, TableNameFilter,
}, },
lifecycle::{ArcDb, LockableCatalogChunk, LockableCatalogPartition}, lifecycle::{LockableCatalogChunk, LockableCatalogPartition, WeakDb},
}, },
JobRegistry, JobRegistry,
}; };
@ -367,6 +367,11 @@ pub struct Db {
/// The cleanup job needs exclusive access and hence will acquire a write-guard. Creating parquet files and creating /// The cleanup job needs exclusive access and hence will acquire a write-guard. Creating parquet files and creating
/// catalog transaction only needs shared access and hence will acquire a read-guard. /// catalog transaction only needs shared access and hence will acquire a read-guard.
cleanup_lock: Arc<tokio::sync::RwLock<()>>, cleanup_lock: Arc<tokio::sync::RwLock<()>>,
/// Lifecycle policy.
///
/// Optional because it will be created after `Arc<Self>`.
lifcycle_policy: tokio::sync::Mutex<Option<::lifecycle::LifecyclePolicy<WeakDb>>>,
} }
/// All the information needed to commit a database /// All the information needed to commit a database
@ -382,7 +387,7 @@ pub(crate) struct DatabaseToCommit {
} }
impl Db { impl Db {
pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Self { pub(crate) fn new(database_to_commit: DatabaseToCommit, jobs: Arc<JobRegistry>) -> Arc<Self> {
let db_name = database_to_commit.rules.name.clone(); let db_name = database_to_commit.rules.name.clone();
let rules = RwLock::new(database_to_commit.rules); let rules = RwLock::new(database_to_commit.rules);
@ -408,7 +413,7 @@ impl Db {
let process_clock = process_clock::ProcessClock::new(); let process_clock = process_clock::ProcessClock::new();
Self { let this = Self {
rules, rules,
server_id, server_id,
store, store,
@ -425,7 +430,13 @@ impl Db {
ingest_metrics, ingest_metrics,
write_buffer: database_to_commit.write_buffer, write_buffer: database_to_commit.write_buffer,
cleanup_lock: Default::default(), cleanup_lock: Default::default(),
} lifcycle_policy: tokio::sync::Mutex::new(None),
};
let this = Arc::new(this);
*this.lifcycle_policy.try_lock().expect("not used yet") = Some(
::lifecycle::LifecyclePolicy::new(WeakDb(Arc::downgrade(&this))),
);
this
} }
/// Return a handle to the executor used to run queries /// Return a handle to the executor used to run queries
@ -763,13 +774,15 @@ impl Db {
tokio::join!( tokio::join!(
// lifecycle policy loop // lifecycle policy loop
async { async {
let mut policy = ::lifecycle::LifecyclePolicy::new(ArcDb(Arc::clone(self)));
while !shutdown.is_cancelled() { while !shutdown.is_cancelled() {
self.worker_iterations_lifecycle self.worker_iterations_lifecycle
.fetch_add(1, Ordering::Relaxed); .fetch_add(1, Ordering::Relaxed);
tokio::select! { tokio::select! {
_ = policy.check_for_work(Utc::now(), std::time::Instant::now()) => {}, _ = async {
let mut guard = self.lifcycle_policy.lock().await;
let policy = guard.as_mut().expect("lifecycle policy should be initialized");
policy.check_for_work(Utc::now(), std::time::Instant::now()).await
} => {},
_ = shutdown.cancelled() => break, _ = shutdown.cancelled() => break,
} }
} }

View File

@ -25,7 +25,12 @@ use lifecycle::{
use observability_deps::tracing::{info, trace}; use observability_deps::tracing::{info, trace};
use persistence_windows::persistence_windows::FlushHandle; use persistence_windows::persistence_windows::FlushHandle;
use query::QueryChunkMeta; use query::QueryChunkMeta;
use std::{fmt::Display, sync::Arc, time::Instant}; use std::{
convert::TryInto,
fmt::Display,
sync::{Arc, Weak},
time::Instant,
};
use tracker::{RwLock, TaskTracker}; use tracker::{RwLock, TaskTracker};
pub(crate) use compact::compact_chunks; pub(crate) use compact::compact_chunks;
@ -43,17 +48,9 @@ mod persist;
mod unload; mod unload;
mod write; mod write;
/// A newtype wrapper around `Arc<Db>` to workaround trait orphan rules /// A newtype wrapper around `Weak<Db>` to workaround trait orphan rules
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ArcDb(pub(super) Arc<Db>); pub struct WeakDb(pub(super) Weak<Db>);
impl std::ops::Deref for ArcDb {
type Target = Db;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// ///
/// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db` /// A `LockableCatalogChunk` combines a `CatalogChunk` with its owning `Db`
@ -232,28 +229,42 @@ impl LockablePartition for LockableCatalogPartition {
} }
} }
impl LifecycleDb for ArcDb { impl LifecycleDb for WeakDb {
type Chunk = LockableCatalogChunk; type Chunk = LockableCatalogChunk;
type Partition = LockableCatalogPartition; type Partition = LockableCatalogPartition;
fn buffer_size(&self) -> usize { fn buffer_size(&self) -> usize {
self.catalog.metrics().memory().total() self.0
.upgrade()
.map(|db| db.catalog.metrics().memory().total())
.unwrap_or_default()
} }
fn rules(&self) -> LifecycleRules { fn rules(&self) -> LifecycleRules {
self.rules.read().lifecycle_rules.clone() self.0
.upgrade()
.map(|db| db.rules.read().lifecycle_rules.clone())
.unwrap_or_default()
} }
fn partitions(&self) -> Vec<Self::Partition> { fn partitions(&self) -> Vec<Self::Partition> {
self.catalog self.0
.partitions() .upgrade()
.into_iter() .map(|db| {
.map(|partition| LockableCatalogPartition::new(Arc::clone(&self.0), partition)) db.catalog
.collect() .partitions()
.into_iter()
.map(|partition| LockableCatalogPartition::new(Arc::clone(&db), partition))
.collect()
})
.unwrap_or_default()
} }
fn name(&self) -> DatabaseName<'static> { fn name(&self) -> DatabaseName<'static> {
self.rules.read().name.clone() self.0
.upgrade()
.map(|db| db.rules.read().name.clone())
.unwrap_or_else(|| "gone".to_string().try_into().unwrap())
} }
} }

View File

@ -107,7 +107,7 @@ impl TestDbBuilder {
TestDb { TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry), metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Arc::new(Db::new(database_to_commit, Arc::new(JobRegistry::new()))), db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
replay_plan, replay_plan,
} }
} }