refactor: do not construct replay plan when skipping replay

Up until now we only skipped the execution of the replay plan, not its
construction. The replay plan construction has some bugs left, so let's
move this part behind the toggle as well.
pull/24376/head
Marco Neumann 2021-08-09 14:56:32 +02:00
parent c11eb25d4e
commit 4dcee10d1e
6 changed files with 73 additions and 42 deletions

View File

@ -328,7 +328,7 @@ async fn initialize_database(shared: &DatabaseShared) {
Ok(state) => DatabaseState::CatalogLoaded(state),
Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)),
},
DatabaseState::CatalogLoaded(state) => match state.advance(shared).await {
DatabaseState::CatalogLoaded(state) => match state.advance().await {
Ok(state) => DatabaseState::Initialized(state),
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
},
@ -496,6 +496,7 @@ impl DatabaseStateRulesLoaded {
shared.config.server_id,
Arc::clone(shared.application.metric_registry()),
shared.config.wipe_catalog_on_error,
shared.config.skip_replay,
)
.await
.context(CatalogLoad)?;
@ -529,18 +530,15 @@ impl DatabaseStateRulesLoaded {
#[derive(Debug, Clone)]
struct DatabaseStateCatalogLoaded {
db: Arc<Db>,
replay_plan: Arc<ReplayPlan>,
replay_plan: Arc<Option<ReplayPlan>>,
}
impl DatabaseStateCatalogLoaded {
/// Perform replay
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateInitialized, InitError> {
async fn advance(&self) -> Result<DatabaseStateInitialized, InitError> {
let db = Arc::clone(&self.db);
db.perform_replay(self.replay_plan.as_ref(), shared.config.skip_replay)
db.perform_replay(self.replay_plan.as_ref().as_ref())
.await
.context(Replay)?;

View File

@ -783,18 +783,14 @@ impl Db {
/// Perform sequencer-driven replay for this DB.
///
/// When `skip_and_seek` is set then no real replay will be performed. Instead the write buffer streams will be set
/// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set
/// to the current high watermark and normal playback will continue from there.
pub async fn perform_replay(
&self,
replay_plan: &ReplayPlan,
skip_and_seek: bool,
) -> Result<()> {
pub async fn perform_replay(&self, replay_plan: Option<&ReplayPlan>) -> Result<()> {
use crate::db::replay::{perform_replay, seek_to_end};
if skip_and_seek {
seek_to_end(self).await.context(ReplayError)
} else {
if let Some(replay_plan) = replay_plan {
perform_replay(self, replay_plan).await.context(ReplayError)
} else {
seek_to_end(self).await.context(ReplayError)
}
}

View File

@ -52,13 +52,19 @@ pub async fn load_or_create_preserved_catalog(
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool,
) -> Result<(PreservedCatalog, Catalog, ReplayPlan)> {
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
// first try to load existing catalogs
match PreservedCatalog::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
LoaderEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
LoaderEmptyInput::new(
db_name,
server_id,
Arc::clone(&metrics_registry),
skip_replay,
),
)
.await
{
@ -66,7 +72,10 @@ pub async fn load_or_create_preserved_catalog(
// successfull load
info!("Found existing catalog for DB {}", db_name);
let Loader { catalog, planner } = loader;
let plan = planner.build().context(CannotBuildReplayPlan)?;
let plan = planner
.map(|planner| planner.build())
.transpose()
.context(CannotBuildReplayPlan)?;
Ok((preserved_catalog, catalog, plan))
}
Ok(None) => {
@ -81,6 +90,7 @@ pub async fn load_or_create_preserved_catalog(
Arc::clone(&object_store),
server_id,
Arc::clone(&metrics_registry),
skip_replay,
)
.await
}
@ -99,6 +109,7 @@ pub async fn load_or_create_preserved_catalog(
Arc::clone(&object_store),
server_id,
Arc::clone(&metrics_registry),
skip_replay,
)
.await
} else {
@ -116,18 +127,27 @@ pub async fn create_preserved_catalog(
object_store: Arc<ObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
) -> Result<(PreservedCatalog, Catalog, ReplayPlan)> {
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
LoaderEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
LoaderEmptyInput::new(
db_name,
server_id,
Arc::clone(&metrics_registry),
skip_replay,
),
)
.await
.context(CannotCreateCatalog)?;
let Loader { catalog, planner } = loader;
let plan = planner.build().context(CannotBuildReplayPlan)?;
let plan = planner
.map(|planner| planner.build())
.transpose()
.context(CannotBuildReplayPlan)?;
Ok((preserved_catalog, catalog, plan))
}
@ -137,10 +157,16 @@ pub struct LoaderEmptyInput {
domain: ::metrics::Domain,
metrics_registry: Arc<::metrics::MetricRegistry>,
metric_labels: Vec<KeyValue>,
skip_replay: bool,
}
impl LoaderEmptyInput {
fn new(db_name: &str, server_id: ServerId, metrics_registry: Arc<MetricRegistry>) -> Self {
fn new(
db_name: &str,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
skip_replay: bool,
) -> Self {
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
@ -151,6 +177,7 @@ impl LoaderEmptyInput {
domain,
metrics_registry,
metric_labels,
skip_replay,
}
}
}
@ -159,7 +186,7 @@ impl LoaderEmptyInput {
#[derive(Debug)]
struct Loader {
catalog: Catalog,
planner: ReplayPlanner,
planner: Option<ReplayPlanner>,
}
impl CatalogState for Loader {
@ -173,7 +200,7 @@ impl CatalogState for Loader {
data.metrics_registry,
data.metric_labels,
),
planner: ReplayPlanner::new(),
planner: (!data.skip_replay).then(ReplayPlanner::new),
}
}
@ -193,12 +220,14 @@ impl CatalogState for Loader {
})?;
// remember file for replay
self.planner
.register_checkpoints(&iox_md.partition_checkpoint, &iox_md.database_checkpoint)
.map_err(|e| Box::new(e) as _)
.context(ReplayPlanError {
path: info.path.clone(),
})?;
if let Some(planner) = self.planner.as_mut() {
planner
.register_checkpoints(&iox_md.partition_checkpoint, &iox_md.database_checkpoint)
.map_err(|e| Box::new(e) as _)
.context(ReplayPlanError {
path: info.path.clone(),
})?;
}
// Create a parquet chunk for this chunk
let metrics = self
@ -308,9 +337,16 @@ mod tests {
.await;
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
load_or_create_preserved_catalog(db_name, object_store, server_id, metrics_registry, true)
.await
.unwrap();
load_or_create_preserved_catalog(
db_name,
object_store,
server_id,
metrics_registry,
true,
false,
)
.await
.unwrap();
}
fn checkpoint_data_from_loader(loader: &Loader) -> CheckpointData {
@ -324,6 +360,7 @@ mod tests {
domain: metrics_registry.register_domain("catalog"),
metrics_registry,
metric_labels: vec![],
skip_replay: false,
};
assert_catalog_state_implementation::<Loader, _>(empty_input, checkpoint_data_from_loader)
.await;

View File

@ -539,14 +539,12 @@ mod tests {
Step::Replay => {
let db = &test_db.db;
db.perform_replay(&test_db.replay_plan, false)
.await
.unwrap();
db.perform_replay(Some(&test_db.replay_plan)).await.unwrap();
}
Step::SkipReplay => {
let db = &test_db.db;
db.perform_replay(&test_db.replay_plan, true).await.unwrap();
db.perform_replay(None).await.unwrap();
}
Step::Persist(partitions) => {
let db = &test_db.db;
@ -2046,7 +2044,7 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(&replay_plan, false).await;
let res = db.perform_replay(Some(&replay_plan)).await;
assert_contains!(
res.unwrap_err().to_string(),
"Replay plan references unknown sequencer"
@ -2096,7 +2094,7 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(&replay_plan, false).await;
let res = db.perform_replay(Some(&replay_plan)).await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot replay: For sequencer 0 expected to find sequence 1 but replay jumped to 2"
@ -2137,7 +2135,7 @@ mod tests {
let db = &test_db.db;
// seek
db.perform_replay(&test_db.replay_plan, true).await.unwrap();
db.perform_replay(None).await.unwrap();
// add more data
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(

View File

@ -596,6 +596,7 @@ where
Arc::clone(self.shared.application.object_store()),
server_id,
Arc::clone(self.shared.application.metric_registry()),
true,
)
.await
.map_err(|e| Box::new(e) as _)

View File

@ -72,6 +72,7 @@ impl TestDbBuilder {
server_id,
Arc::clone(&metrics_registry),
true,
false,
)
.await
.unwrap();
@ -112,7 +113,7 @@ impl TestDbBuilder {
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
replay_plan,
replay_plan: replay_plan.expect("did not skip replay"),
}
}