From e8bc7ee909033061966b06d7899b52e5cb08ae76 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 12 Aug 2021 14:18:38 +0200 Subject: [PATCH] feat: server functionality to recover DB by skipping replay --- server/src/database.rs | 205 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 204 insertions(+), 1 deletion(-) diff --git a/server/src/database.rs b/server/src/database.rs index fcb6df16be..732887824c 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -60,6 +60,12 @@ pub enum Error { db_name: String, source: Box, }, + + #[snafu(display("failed to skip replay for database ({}): {}", db_name, source))] + SkipReplay { + db_name: String, + source: Box, + }, } /// A `Database` represents a single configured IOx database - i.e. an entity with a corresponding @@ -248,6 +254,50 @@ impl Database { Ok(()) }) } + + /// Recover from a ReplayError by skipping replay + pub fn skip_replay(&self) -> Result>, Error> { + let db_name = &self.shared.config.name; + let (mut current_state, handle) = { + let state = self.shared.state.read(); + let current_state = match &**state { + DatabaseState::ReplayError(rules_loaded, _) => rules_loaded.clone(), + _ => { + return Err(Error::InvalidState { + db_name: db_name.to_string(), + state: state.state_code(), + transition: "SkipReplay".to_string(), + }) + } + }; + + let handle = state.try_freeze().ok_or(Error::TransitionInProgress { + db_name: db_name.to_string(), + state: state.state_code(), + })?; + + (current_state, handle) + }; + + let shared = Arc::clone(&self.shared); + + Ok(async move { + let db_name = &shared.config.name; + current_state.replay_plan = Arc::new(None); + let current_state = current_state + .advance() + .await + .map_err(Box::new) + .context(SkipReplay { db_name })?; + + { + let mut state = shared.state.write(); + *state.unfreeze(handle) = DatabaseState::Initialized(current_state); + } + + Ok(()) + }) + } } /// State shared with the `Database` background worker @@ -677,8 +727,13 @@ pub(super) async fn persist_database_rules( #[cfg(test)] mod tests { + use chrono::Utc; + use data_types::database_rules::{PartitionTemplate, TemplatePart, WriteBufferConnection}; + use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry}; + use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState}; + use super::*; - use std::num::NonZeroU32; + use std::{convert::TryFrom, num::NonZeroU32, time::Instant}; #[tokio::test] async fn database_shutdown_waits_for_jobs() { @@ -735,4 +790,152 @@ mod tests { // Shouldn't have waited for server tracker to finish assert!(!server_dummy_job.is_complete()); } + + #[tokio::test] + async fn skip_replay() { + // create write buffer + let state = MockBufferSharedState::empty_with_n_sequencers(1); + let partition_template = PartitionTemplate { + parts: vec![TemplatePart::Column("partition_by".to_string())], + }; + let entry_a = lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template) + .pop() + .unwrap(); + let entry_b = lp_to_entries("table_1,partition_by=b foo=2 20", &partition_template) + .pop() + .unwrap(); + state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 10), + Utc::now(), + entry_a, + )); + state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 11), + Utc::now(), + entry_b, + )); + + // setup application + let mut factory = WriteBufferConfigFactory::new(); + factory.register_mock("my_mock".to_string(), state.clone()); + let application = Arc::new(ApplicationState::with_write_buffer_factory( + Arc::new(ObjectStore::new_in_memory()), + Arc::new(factory), + None, + )); + let server_id = ServerId::try_from(1).unwrap(); + + // setup DB + let db_name = DatabaseName::new("test_db").unwrap(); + let rules = DatabaseRules { + name: db_name.clone(), + partition_template: partition_template.clone(), + lifecycle_rules: data_types::database_rules::LifecycleRules { + late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), + ..Default::default() + }, + routing_rules: None, + worker_cleanup_avg_sleep: Duration::from_secs(2), + write_buffer_connection: Some(WriteBufferConnection::Reading( + "mock://my_mock".to_string(), + )), + }; + let store_prefix = application.object_store().new_path(); + Database::create(Arc::clone(&application), &store_prefix, rules, server_id) + .await + .unwrap(); + let db_config = DatabaseConfig { + name: db_name, + server_id, + store_prefix, + wipe_catalog_on_error: false, + skip_replay: false, + }; + let database = Database::new(Arc::clone(&application), db_config.clone()); + database.wait_for_init().await.unwrap(); + + // wait for ingest + let db = database.initialized_db().unwrap(); + let t_0 = Instant::now(); + loop { + // use later partition here so that we can implicitely wait for both entries + if db.partition_summary("table_1", "partition_by_b").is_some() { + break; + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // partition a was forgotten, partition b is still persisted + assert!(db.partition_summary("table_1", "partition_by_a").is_some()); + + // persist one partition + db.persist_partition( + "table_1", + "partition_by_b", + Instant::now() + Duration::from_secs(2), + ) + .await + .unwrap(); + + // shutdown first database + database.shutdown(); + database.join().await.unwrap(); + + // break write buffer by removing entries + state.clear_messages(0); + let entry_c = lp_to_entries("table_1,partition_by=c foo=3 30", &partition_template) + .pop() + .unwrap(); + state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 12), + Utc::now(), + entry_c, + )); + + // boot actual test database + let database = Database::new(Arc::clone(&application), db_config.clone()); + + // db is broken + let err = database.wait_for_init().await.unwrap_err(); + assert!(matches!(err.as_ref(), InitError::Replay { .. })); + + // skip replay + database.skip_replay().unwrap().await.unwrap(); + database.wait_for_init().await.unwrap(); + + // wait for ingest + let entry_d = lp_to_entries("table_1,partition_by=d foo=4 40", &partition_template) + .pop() + .unwrap(); + state.push_entry(SequencedEntry::new_from_sequence( + Sequence::new(0, 13), + Utc::now(), + entry_d, + )); + let db = database.initialized_db().unwrap(); + let t_0 = Instant::now(); + loop { + if db.partition_summary("table_1", "partition_by_d").is_some() { + break; + } + + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // partition a was forgotten, partition b is still persisted, partition c was skipped + assert!(db.partition_summary("table_1", "partition_by_a").is_none()); + assert!(db.partition_summary("table_1", "partition_by_b").is_some()); + assert!(db.partition_summary("table_1", "partition_by_c").is_none()); + + // cannot skip when database is initialized + let res = database.skip_replay(); + assert!(matches!(res, Err(Error::InvalidState { .. }))); + + // clean up + database.shutdown(); + database.join().await.unwrap(); + } }