refactor: start background worker before performing replay
This enables compaction during replay.pull/24376/head
parent
0fe8eda89e
commit
9ea04a42ff
|
@ -9,6 +9,7 @@ use data_types::{
|
|||
};
|
||||
use metrics::MetricRegistry;
|
||||
use object_store::{path::ObjectStorePath, ObjectStore, ObjectStoreApi};
|
||||
use parking_lot::Mutex;
|
||||
use parquet_file::catalog::PreservedCatalog;
|
||||
use persistence_windows::checkpoint::ReplayPlan;
|
||||
use write_buffer::config::WriteBufferConfig;
|
||||
|
@ -320,12 +321,14 @@ enum DatabaseState {
|
|||
Replay {
|
||||
db: Arc<Db>,
|
||||
replay_plan: ReplayPlan,
|
||||
handle: Mutex<Option<JoinHandle<()>>>,
|
||||
shutdown: CancellationToken,
|
||||
},
|
||||
|
||||
/// Fully initialized database.
|
||||
Initialized {
|
||||
db: Arc<Db>,
|
||||
handle: Option<JoinHandle<()>>,
|
||||
handle: Mutex<Option<JoinHandle<()>>>,
|
||||
shutdown: CancellationToken,
|
||||
},
|
||||
}
|
||||
|
@ -333,7 +336,8 @@ enum DatabaseState {
|
|||
impl DatabaseState {
|
||||
fn join(&mut self) -> Option<JoinHandle<()>> {
|
||||
match self {
|
||||
DatabaseState::Initialized { handle, .. } => handle.take(),
|
||||
DatabaseState::Replay { handle, .. } => handle.lock().take(),
|
||||
DatabaseState::Initialized { handle, .. } => handle.lock().take(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -396,11 +400,14 @@ impl DatabaseState {
|
|||
|
||||
impl Drop for DatabaseState {
|
||||
fn drop(&mut self) {
|
||||
if let DatabaseState::Initialized {
|
||||
if let DatabaseState::Replay {
|
||||
handle, shutdown, ..
|
||||
}
|
||||
| DatabaseState::Initialized {
|
||||
handle, shutdown, ..
|
||||
} = self
|
||||
{
|
||||
if handle.is_some() {
|
||||
if handle.lock().is_some() {
|
||||
// Join should be called on `DatabaseState` prior to dropping, for example, by
|
||||
// calling drain() on the owning `Config`
|
||||
warn!("DatabaseState dropped without waiting for background task to complete");
|
||||
|
@ -545,7 +552,29 @@ impl<'a> DatabaseHandle<'a> {
|
|||
|
||||
let db = Db::new(database_to_commit, Arc::clone(application.job_registry()));
|
||||
|
||||
self.state = Some(Arc::new(DatabaseState::Replay { db, replay_plan }));
|
||||
if self.config.shutdown.is_cancelled() {
|
||||
error!("server is shutting down");
|
||||
return ServerShuttingDown.fail();
|
||||
}
|
||||
|
||||
let shutdown = self.config.shutdown.child_token();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&db);
|
||||
let rules = db.rules();
|
||||
|
||||
let handle = Mutex::new(Some(tokio::spawn(async move {
|
||||
db_captured
|
||||
.background_worker(shutdown_captured)
|
||||
.instrument(tracing::info_span!("db_worker", database=%rules.name))
|
||||
.await
|
||||
})));
|
||||
|
||||
self.state = Some(Arc::new(DatabaseState::Replay {
|
||||
db,
|
||||
replay_plan,
|
||||
handle,
|
||||
shutdown,
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -560,28 +589,17 @@ impl<'a> DatabaseHandle<'a> {
|
|||
/// Advance database state to [`Initialized`](DatabaseStateCode::Initialized).
|
||||
pub fn advance_init(&mut self) -> Result<()> {
|
||||
match self.state().as_ref() {
|
||||
DatabaseState::Replay { db, .. } => {
|
||||
if self.config.shutdown.is_cancelled() {
|
||||
error!("server is shutting down");
|
||||
return ServerShuttingDown.fail();
|
||||
}
|
||||
|
||||
let shutdown = self.config.shutdown.child_token();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(db);
|
||||
let rules = db.rules();
|
||||
|
||||
let handle = Some(tokio::spawn(async move {
|
||||
db_captured
|
||||
.background_worker(shutdown_captured)
|
||||
.instrument(tracing::info_span!("db_worker", database=%rules.name))
|
||||
.await
|
||||
}));
|
||||
|
||||
self.state = Some(Arc::new(DatabaseState::Initialized {
|
||||
db: Arc::clone(db),
|
||||
DatabaseState::Replay {
|
||||
db,
|
||||
handle,
|
||||
shutdown,
|
||||
..
|
||||
} => {
|
||||
let handle = handle.lock().take();
|
||||
self.state = Some(Arc::new(DatabaseState::Initialized {
|
||||
db: Arc::clone(db),
|
||||
handle: Mutex::new(handle),
|
||||
shutdown: shutdown.clone(),
|
||||
}));
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -45,7 +45,7 @@ use std::{
|
|||
any::Any,
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
|
@ -372,6 +372,11 @@ pub struct Db {
|
|||
///
|
||||
/// Optional because it will be created after `Arc<Self>`.
|
||||
lifcycle_policy: tokio::sync::Mutex<Option<::lifecycle::LifecyclePolicy<WeakDb>>>,
|
||||
|
||||
/// Flag to stop background worker from reading from the write buffer.
|
||||
///
|
||||
/// TODO: Move write buffer read loop out of Db.
|
||||
no_write_buffer_read: AtomicBool,
|
||||
}
|
||||
|
||||
/// All the information needed to commit a database
|
||||
|
@ -431,14 +436,31 @@ impl Db {
|
|||
write_buffer: database_to_commit.write_buffer,
|
||||
cleanup_lock: Default::default(),
|
||||
lifcycle_policy: tokio::sync::Mutex::new(None),
|
||||
no_write_buffer_read: AtomicBool::new(true),
|
||||
};
|
||||
let this = Arc::new(this);
|
||||
*this.lifcycle_policy.try_lock().expect("not used yet") = Some(
|
||||
::lifecycle::LifecyclePolicy::new(WeakDb(Arc::downgrade(&this))),
|
||||
::lifecycle::LifecyclePolicy::new_suppress_persistence(WeakDb(Arc::downgrade(&this))),
|
||||
);
|
||||
this
|
||||
}
|
||||
|
||||
/// Allow persistence if database rules all it.
|
||||
pub async fn unsuppress_persistence(&self) {
|
||||
let mut guard = self.lifcycle_policy.lock().await;
|
||||
let policy = guard
|
||||
.as_mut()
|
||||
.expect("lifecycle policy should be initialized");
|
||||
policy.unsuppress_persistence();
|
||||
}
|
||||
|
||||
/// Allow continuous reads from the write buffer (if configured).
|
||||
///
|
||||
/// TODO: Move write buffer read loop out of Db.
|
||||
pub fn allow_write_buffer_read(&self) {
|
||||
self.no_write_buffer_read.store(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// Return a handle to the executor used to run queries
|
||||
pub fn executor(&self) -> Arc<Executor> {
|
||||
Arc::clone(&self.exec)
|
||||
|
@ -814,6 +836,16 @@ impl Db {
|
|||
// streaming from the write buffer loop
|
||||
async {
|
||||
if let Some(WriteBufferConfig::Reading(write_buffer)) = &self.write_buffer {
|
||||
// wait for permission
|
||||
tokio::select! {
|
||||
_ = async {
|
||||
while self.no_write_buffer_read.load(Ordering::SeqCst) {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
} => {},
|
||||
_ = shutdown.cancelled() => return,
|
||||
}
|
||||
|
||||
let mut write_buffer = write_buffer
|
||||
.try_lock()
|
||||
.expect("no streams should exist at this point");
|
||||
|
@ -1535,6 +1567,7 @@ mod tests {
|
|||
let db_captured = Arc::clone(&db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
db.allow_write_buffer_read();
|
||||
|
||||
let query = "select * from cpu";
|
||||
|
||||
|
@ -1674,6 +1707,7 @@ mod tests {
|
|||
let db_captured = Arc::clone(&db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
db.allow_write_buffer_read();
|
||||
|
||||
// check: after a while the error should be reported in the database's metrics
|
||||
let t_0 = Instant::now();
|
||||
|
@ -2783,6 +2817,7 @@ mod tests {
|
|||
let db_captured = Arc::clone(&db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
db.allow_write_buffer_read();
|
||||
|
||||
// check: after a while the persistence windows should have the expected data
|
||||
let t_0 = Instant::now();
|
||||
|
|
|
@ -231,6 +231,7 @@ mod tests {
|
|||
};
|
||||
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk};
|
||||
use test_helpers::assert_contains;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use write_buffer::{
|
||||
config::WriteBufferConfig,
|
||||
|
@ -286,6 +287,9 @@ mod tests {
|
|||
Ingest(Vec<TestSequencedEntry>),
|
||||
|
||||
/// Restart DB
|
||||
///
|
||||
/// Background loop is started as well but neither persistence nor write buffer reads are allowed until
|
||||
/// [`Await`](Self::Await) is used.
|
||||
Restart,
|
||||
|
||||
/// Perform replay
|
||||
|
@ -301,7 +305,7 @@ mod tests {
|
|||
|
||||
/// Wait that background loop generates desired state (all checks pass).
|
||||
///
|
||||
/// Background loop is started before the check and stopped afterwards.
|
||||
/// Persistence and write buffer reads are enabled in preparation to this step.
|
||||
Await(Vec<Check>),
|
||||
}
|
||||
|
||||
|
@ -313,17 +317,13 @@ mod tests {
|
|||
/// What to do in which order.
|
||||
///
|
||||
/// # Serialization
|
||||
/// The execution of the entire test is purely serial with the exception of [`Await`](Step::Await) (see
|
||||
/// next section). That means that nothing happens concurrently during each step. Every step is finished and
|
||||
/// Every step is finished and
|
||||
/// checked for errors before the next is started (e.g. [`Replay`](Step::Replay) is fully executed and
|
||||
/// it is ensured that there were no errors before a subsequent [`Assert`](Step::Assert) is evaluated).
|
||||
/// The database background worker is NOT active during any non-[`Await`](Step::Await)
|
||||
///
|
||||
/// # Await
|
||||
/// Sometimes the background worker is needed to perform something, e.g. to consume some data from the write
|
||||
/// buffer. In that case [`Await`](Step::Await) can be used. During this check (and only during this
|
||||
/// check) the background worker is active and the checks passed to [`Await`](Step::Await) are
|
||||
/// evaluated until they succeed. The background worker is stopped before the next test step is evaluated.
|
||||
/// # Await / Background Worker
|
||||
/// The database background worker is started during when the DB is created but persistence and reads from the
|
||||
/// write buffer are disabled until [`Await`](Step::Await) is used.
|
||||
steps: Vec<Step>,
|
||||
}
|
||||
|
||||
|
@ -340,7 +340,7 @@ mod tests {
|
|||
};
|
||||
let write_buffer_state =
|
||||
MockBufferSharedState::empty_with_n_sequencers(self.n_sequencers);
|
||||
let mut test_db = Self::create_test_db(
|
||||
let (mut test_db, mut shutdown, mut join_handle) = Self::create_test_db(
|
||||
Arc::clone(&object_store),
|
||||
write_buffer_state.clone(),
|
||||
server_id,
|
||||
|
@ -360,11 +360,15 @@ mod tests {
|
|||
}
|
||||
}
|
||||
Step::Restart => {
|
||||
// first drop old DB
|
||||
// stop background worker
|
||||
shutdown.cancel();
|
||||
join_handle.await.unwrap();
|
||||
|
||||
// drop old DB
|
||||
drop(test_db);
|
||||
|
||||
// then create new one
|
||||
test_db = Self::create_test_db(
|
||||
let (test_db_tmp, shutdown_tmp, join_handle_tmp) = Self::create_test_db(
|
||||
Arc::clone(&object_store),
|
||||
write_buffer_state.clone(),
|
||||
server_id,
|
||||
|
@ -372,11 +376,14 @@ mod tests {
|
|||
partition_template.clone(),
|
||||
)
|
||||
.await;
|
||||
test_db = test_db_tmp;
|
||||
shutdown = shutdown_tmp;
|
||||
join_handle = join_handle_tmp;
|
||||
}
|
||||
Step::Replay => {
|
||||
test_db
|
||||
.db
|
||||
.perform_replay(&test_db.replay_plan, false)
|
||||
let db = &test_db.db;
|
||||
|
||||
db.perform_replay(&test_db.replay_plan, false)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
@ -413,14 +420,8 @@ mod tests {
|
|||
}
|
||||
Step::Await(checks) => {
|
||||
let db = &test_db.db;
|
||||
|
||||
// start background worker
|
||||
let shutdown: CancellationToken = Default::default();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(db);
|
||||
let join_handle = tokio::spawn(async move {
|
||||
db_captured.background_worker(shutdown_captured).await
|
||||
});
|
||||
db.unsuppress_persistence().await;
|
||||
db.allow_write_buffer_read();
|
||||
|
||||
// wait until checks pass
|
||||
let t_0 = Instant::now();
|
||||
|
@ -439,10 +440,6 @@ mod tests {
|
|||
}
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// stop background worker
|
||||
shutdown.cancel();
|
||||
join_handle.await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -471,9 +468,9 @@ mod tests {
|
|||
server_id: ServerId,
|
||||
db_name: &'static str,
|
||||
partition_template: PartitionTemplate,
|
||||
) -> TestDb {
|
||||
) -> (TestDb, CancellationToken, JoinHandle<()>) {
|
||||
let write_buffer = MockBufferForReading::new(write_buffer_state);
|
||||
TestDb::builder()
|
||||
let test_db = TestDb::builder()
|
||||
.object_store(object_store)
|
||||
.server_id(server_id)
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::new(
|
||||
|
@ -486,7 +483,16 @@ mod tests {
|
|||
.partition_template(partition_template)
|
||||
.db_name(db_name)
|
||||
.build()
|
||||
.await
|
||||
.await;
|
||||
|
||||
// start background worker
|
||||
let shutdown: CancellationToken = Default::default();
|
||||
let shutdown_captured = shutdown.clone();
|
||||
let db_captured = Arc::clone(&test_db.db);
|
||||
let join_handle =
|
||||
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
|
||||
|
||||
(test_db, shutdown, join_handle)
|
||||
}
|
||||
|
||||
/// Evaluates given checks.
|
||||
|
|
|
@ -353,6 +353,8 @@ async fn try_advance_database_init_process(
|
|||
.await
|
||||
.map_err(Box::new)
|
||||
.context(ReplayError)?;
|
||||
db.unsuppress_persistence().await;
|
||||
db.allow_write_buffer_read();
|
||||
|
||||
handle
|
||||
.advance_init()
|
||||
|
|
|
@ -593,6 +593,11 @@ where
|
|||
db_reservation.advance_replay(preserved_catalog, catalog, replay_plan, write_buffer)?;
|
||||
|
||||
// no actual replay required
|
||||
let db = db_reservation
|
||||
.db_any_state()
|
||||
.expect("DB should exist at this point");
|
||||
db.unsuppress_persistence().await;
|
||||
db.allow_write_buffer_read();
|
||||
db_reservation.advance_init()?;
|
||||
|
||||
// ready to commit
|
||||
|
|
Loading…
Reference in New Issue