Merge pull request #2231 from influxdata/crepererum/gate_replay_plan_building

refactor: do not construct replay plan when skipping replay
pull/24376/head
kodiakhq[bot] 2021-08-09 13:31:10 +00:00 committed by GitHub
commit 3bb4c1ae03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 73 additions and 42 deletions

View File

@ -328,7 +328,7 @@ async fn initialize_database(shared: &DatabaseShared) {
Ok(state) => DatabaseState::CatalogLoaded(state),
Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)),
},
DatabaseState::CatalogLoaded(state) => match state.advance(shared).await {
DatabaseState::CatalogLoaded(state) => match state.advance().await {
Ok(state) => DatabaseState::Initialized(state),
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
},
@ -496,6 +496,7 @@ impl DatabaseStateRulesLoaded {
shared.config.server_id,
Arc::clone(shared.application.metric_registry()),
shared.config.wipe_catalog_on_error,
shared.config.skip_replay,
)
.await
.context(CatalogLoad)?;
@ -529,18 +530,15 @@ impl DatabaseStateRulesLoaded {
#[derive(Debug, Clone)]
struct DatabaseStateCatalogLoaded {
db: Arc<Db>,
replay_plan: Arc<ReplayPlan>,
replay_plan: Arc<Option<ReplayPlan>>,
}
impl DatabaseStateCatalogLoaded {
/// Perform replay
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateInitialized, InitError> {
async fn advance(&self) -> Result<DatabaseStateInitialized, InitError> {
let db = Arc::clone(&self.db);
db.perform_replay(self.replay_plan.as_ref(), shared.config.skip_replay)
db.perform_replay(self.replay_plan.as_ref().as_ref())
.await
.context(Replay)?;

View File

@ -783,18 +783,14 @@ impl Db {
/// Perform sequencer-driven replay for this DB.
///
/// When `skip_and_seek` is set then no real replay will be performed. Instead the write buffer streams will be set
/// When `replay_plan` is `None` then no real replay will be performed. Instead the write buffer streams will be set
/// to the current high watermark and normal playback will continue from there.
pub async fn perform_replay(
&self,
replay_plan: &ReplayPlan,
skip_and_seek: bool,
) -> Result<()> {
pub async fn perform_replay(&self, replay_plan: Option<&ReplayPlan>) -> Result<()> {
use crate::db::replay::{perform_replay, seek_to_end};
if skip_and_seek {
seek_to_end(self).await.context(ReplayError)
} else {
if let Some(replay_plan) = replay_plan {
perform_replay(self, replay_plan).await.context(ReplayError)
} else {
seek_to_end(self).await.context(ReplayError)
}
}

View File

@ -52,13 +52,19 @@ pub async fn load_or_create_preserved_catalog(
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
wipe_on_error: bool,
) -> Result<(PreservedCatalog, Catalog, ReplayPlan)> {
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
// first try to load existing catalogs
match PreservedCatalog::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
LoaderEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
LoaderEmptyInput::new(
db_name,
server_id,
Arc::clone(&metrics_registry),
skip_replay,
),
)
.await
{
@ -66,7 +72,10 @@ pub async fn load_or_create_preserved_catalog(
// successfull load
info!("Found existing catalog for DB {}", db_name);
let Loader { catalog, planner } = loader;
let plan = planner.build().context(CannotBuildReplayPlan)?;
let plan = planner
.map(|planner| planner.build())
.transpose()
.context(CannotBuildReplayPlan)?;
Ok((preserved_catalog, catalog, plan))
}
Ok(None) => {
@ -81,6 +90,7 @@ pub async fn load_or_create_preserved_catalog(
Arc::clone(&object_store),
server_id,
Arc::clone(&metrics_registry),
skip_replay,
)
.await
}
@ -99,6 +109,7 @@ pub async fn load_or_create_preserved_catalog(
Arc::clone(&object_store),
server_id,
Arc::clone(&metrics_registry),
skip_replay,
)
.await
} else {
@ -116,18 +127,27 @@ pub async fn create_preserved_catalog(
object_store: Arc<ObjectStore>,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
) -> Result<(PreservedCatalog, Catalog, ReplayPlan)> {
skip_replay: bool,
) -> Result<(PreservedCatalog, Catalog, Option<ReplayPlan>)> {
let (preserved_catalog, loader) = PreservedCatalog::new_empty(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
LoaderEmptyInput::new(db_name, server_id, Arc::clone(&metrics_registry)),
LoaderEmptyInput::new(
db_name,
server_id,
Arc::clone(&metrics_registry),
skip_replay,
),
)
.await
.context(CannotCreateCatalog)?;
let Loader { catalog, planner } = loader;
let plan = planner.build().context(CannotBuildReplayPlan)?;
let plan = planner
.map(|planner| planner.build())
.transpose()
.context(CannotBuildReplayPlan)?;
Ok((preserved_catalog, catalog, plan))
}
@ -137,10 +157,16 @@ pub struct LoaderEmptyInput {
domain: ::metrics::Domain,
metrics_registry: Arc<::metrics::MetricRegistry>,
metric_labels: Vec<KeyValue>,
skip_replay: bool,
}
impl LoaderEmptyInput {
fn new(db_name: &str, server_id: ServerId, metrics_registry: Arc<MetricRegistry>) -> Self {
fn new(
db_name: &str,
server_id: ServerId,
metrics_registry: Arc<MetricRegistry>,
skip_replay: bool,
) -> Self {
let metric_labels = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("svr_id", format!("{}", server_id)),
@ -151,6 +177,7 @@ impl LoaderEmptyInput {
domain,
metrics_registry,
metric_labels,
skip_replay,
}
}
}
@ -159,7 +186,7 @@ impl LoaderEmptyInput {
#[derive(Debug)]
struct Loader {
catalog: Catalog,
planner: ReplayPlanner,
planner: Option<ReplayPlanner>,
}
impl CatalogState for Loader {
@ -173,7 +200,7 @@ impl CatalogState for Loader {
data.metrics_registry,
data.metric_labels,
),
planner: ReplayPlanner::new(),
planner: (!data.skip_replay).then(ReplayPlanner::new),
}
}
@ -193,12 +220,14 @@ impl CatalogState for Loader {
})?;
// remember file for replay
self.planner
.register_checkpoints(&iox_md.partition_checkpoint, &iox_md.database_checkpoint)
.map_err(|e| Box::new(e) as _)
.context(ReplayPlanError {
path: info.path.clone(),
})?;
if let Some(planner) = self.planner.as_mut() {
planner
.register_checkpoints(&iox_md.partition_checkpoint, &iox_md.database_checkpoint)
.map_err(|e| Box::new(e) as _)
.context(ReplayPlanError {
path: info.path.clone(),
})?;
}
// Create a parquet chunk for this chunk
let metrics = self
@ -308,9 +337,16 @@ mod tests {
.await;
let metrics_registry = Arc::new(metrics::MetricRegistry::new());
load_or_create_preserved_catalog(db_name, object_store, server_id, metrics_registry, true)
.await
.unwrap();
load_or_create_preserved_catalog(
db_name,
object_store,
server_id,
metrics_registry,
true,
false,
)
.await
.unwrap();
}
fn checkpoint_data_from_loader(loader: &Loader) -> CheckpointData {
@ -324,6 +360,7 @@ mod tests {
domain: metrics_registry.register_domain("catalog"),
metrics_registry,
metric_labels: vec![],
skip_replay: false,
};
assert_catalog_state_implementation::<Loader, _>(empty_input, checkpoint_data_from_loader)
.await;

View File

@ -539,14 +539,12 @@ mod tests {
Step::Replay => {
let db = &test_db.db;
db.perform_replay(&test_db.replay_plan, false)
.await
.unwrap();
db.perform_replay(Some(&test_db.replay_plan)).await.unwrap();
}
Step::SkipReplay => {
let db = &test_db.db;
db.perform_replay(&test_db.replay_plan, true).await.unwrap();
db.perform_replay(None).await.unwrap();
}
Step::Persist(partitions) => {
let db = &test_db.db;
@ -2046,7 +2044,7 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(&replay_plan, false).await;
let res = db.perform_replay(Some(&replay_plan)).await;
assert_contains!(
res.unwrap_err().to_string(),
"Replay plan references unknown sequencer"
@ -2096,7 +2094,7 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(&replay_plan, false).await;
let res = db.perform_replay(Some(&replay_plan)).await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot replay: For sequencer 0 expected to find sequence 1 but replay jumped to 2"
@ -2137,7 +2135,7 @@ mod tests {
let db = &test_db.db;
// seek
db.perform_replay(&test_db.replay_plan, true).await.unwrap();
db.perform_replay(None).await.unwrap();
// add more data
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(

View File

@ -596,6 +596,7 @@ where
Arc::clone(self.shared.application.object_store()),
server_id,
Arc::clone(self.shared.application.metric_registry()),
true,
)
.await
.map_err(|e| Box::new(e) as _)

View File

@ -72,6 +72,7 @@ impl TestDbBuilder {
server_id,
Arc::clone(&metrics_registry),
true,
false,
)
.await
.unwrap();
@ -112,7 +113,7 @@ impl TestDbBuilder {
TestDb {
metric_registry: metrics::TestMetricRegistry::new(metrics_registry),
db: Db::new(database_to_commit, Arc::new(JobRegistry::new())),
replay_plan,
replay_plan: replay_plan.expect("did not skip replay"),
}
}