feat: server functionality to recover DB by skipping replay
parent
caa47cf640
commit
e8bc7ee909
|
@ -60,6 +60,12 @@ pub enum Error {
|
|||
db_name: String,
|
||||
source: Box<parquet_file::catalog::Error>,
|
||||
},
|
||||
|
||||
#[snafu(display("failed to skip replay for database ({}): {}", db_name, source))]
|
||||
SkipReplay {
|
||||
db_name: String,
|
||||
source: Box<InitError>,
|
||||
},
|
||||
}
|
||||
|
||||
/// A `Database` represents a single configured IOx database - i.e. an entity with a corresponding
|
||||
|
@ -248,6 +254,50 @@ impl Database {
|
|||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Recover from a ReplayError by skipping replay
|
||||
pub fn skip_replay(&self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
|
||||
let db_name = &self.shared.config.name;
|
||||
let (mut current_state, handle) = {
|
||||
let state = self.shared.state.read();
|
||||
let current_state = match &**state {
|
||||
DatabaseState::ReplayError(rules_loaded, _) => rules_loaded.clone(),
|
||||
_ => {
|
||||
return Err(Error::InvalidState {
|
||||
db_name: db_name.to_string(),
|
||||
state: state.state_code(),
|
||||
transition: "SkipReplay".to_string(),
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
let handle = state.try_freeze().ok_or(Error::TransitionInProgress {
|
||||
db_name: db_name.to_string(),
|
||||
state: state.state_code(),
|
||||
})?;
|
||||
|
||||
(current_state, handle)
|
||||
};
|
||||
|
||||
let shared = Arc::clone(&self.shared);
|
||||
|
||||
Ok(async move {
|
||||
let db_name = &shared.config.name;
|
||||
current_state.replay_plan = Arc::new(None);
|
||||
let current_state = current_state
|
||||
.advance()
|
||||
.await
|
||||
.map_err(Box::new)
|
||||
.context(SkipReplay { db_name })?;
|
||||
|
||||
{
|
||||
let mut state = shared.state.write();
|
||||
*state.unfreeze(handle) = DatabaseState::Initialized(current_state);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// State shared with the `Database` background worker
|
||||
|
@ -677,8 +727,13 @@ pub(super) async fn persist_database_rules(
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use chrono::Utc;
|
||||
use data_types::database_rules::{PartitionTemplate, TemplatePart, WriteBufferConnection};
|
||||
use entry::{test_helpers::lp_to_entries, Sequence, SequencedEntry};
|
||||
use write_buffer::{config::WriteBufferConfigFactory, mock::MockBufferSharedState};
|
||||
|
||||
use super::*;
|
||||
use std::num::NonZeroU32;
|
||||
use std::{convert::TryFrom, num::NonZeroU32, time::Instant};
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_shutdown_waits_for_jobs() {
|
||||
|
@ -735,4 +790,152 @@ mod tests {
|
|||
// Shouldn't have waited for server tracker to finish
|
||||
assert!(!server_dummy_job.is_complete());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skip_replay() {
|
||||
// create write buffer
|
||||
let state = MockBufferSharedState::empty_with_n_sequencers(1);
|
||||
let partition_template = PartitionTemplate {
|
||||
parts: vec![TemplatePart::Column("partition_by".to_string())],
|
||||
};
|
||||
let entry_a = lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template)
|
||||
.pop()
|
||||
.unwrap();
|
||||
let entry_b = lp_to_entries("table_1,partition_by=b foo=2 20", &partition_template)
|
||||
.pop()
|
||||
.unwrap();
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 10),
|
||||
Utc::now(),
|
||||
entry_a,
|
||||
));
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 11),
|
||||
Utc::now(),
|
||||
entry_b,
|
||||
));
|
||||
|
||||
// setup application
|
||||
let mut factory = WriteBufferConfigFactory::new();
|
||||
factory.register_mock("my_mock".to_string(), state.clone());
|
||||
let application = Arc::new(ApplicationState::with_write_buffer_factory(
|
||||
Arc::new(ObjectStore::new_in_memory()),
|
||||
Arc::new(factory),
|
||||
None,
|
||||
));
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
|
||||
// setup DB
|
||||
let db_name = DatabaseName::new("test_db").unwrap();
|
||||
let rules = DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
partition_template: partition_template.clone(),
|
||||
lifecycle_rules: data_types::database_rules::LifecycleRules {
|
||||
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
|
||||
..Default::default()
|
||||
},
|
||||
routing_rules: None,
|
||||
worker_cleanup_avg_sleep: Duration::from_secs(2),
|
||||
write_buffer_connection: Some(WriteBufferConnection::Reading(
|
||||
"mock://my_mock".to_string(),
|
||||
)),
|
||||
};
|
||||
let store_prefix = application.object_store().new_path();
|
||||
Database::create(Arc::clone(&application), &store_prefix, rules, server_id)
|
||||
.await
|
||||
.unwrap();
|
||||
let db_config = DatabaseConfig {
|
||||
name: db_name,
|
||||
server_id,
|
||||
store_prefix,
|
||||
wipe_catalog_on_error: false,
|
||||
skip_replay: false,
|
||||
};
|
||||
let database = Database::new(Arc::clone(&application), db_config.clone());
|
||||
database.wait_for_init().await.unwrap();
|
||||
|
||||
// wait for ingest
|
||||
let db = database.initialized_db().unwrap();
|
||||
let t_0 = Instant::now();
|
||||
loop {
|
||||
// use later partition here so that we can implicitely wait for both entries
|
||||
if db.partition_summary("table_1", "partition_by_b").is_some() {
|
||||
break;
|
||||
}
|
||||
|
||||
assert!(t_0.elapsed() < Duration::from_secs(10));
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// partition a was forgotten, partition b is still persisted
|
||||
assert!(db.partition_summary("table_1", "partition_by_a").is_some());
|
||||
|
||||
// persist one partition
|
||||
db.persist_partition(
|
||||
"table_1",
|
||||
"partition_by_b",
|
||||
Instant::now() + Duration::from_secs(2),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// shutdown first database
|
||||
database.shutdown();
|
||||
database.join().await.unwrap();
|
||||
|
||||
// break write buffer by removing entries
|
||||
state.clear_messages(0);
|
||||
let entry_c = lp_to_entries("table_1,partition_by=c foo=3 30", &partition_template)
|
||||
.pop()
|
||||
.unwrap();
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 12),
|
||||
Utc::now(),
|
||||
entry_c,
|
||||
));
|
||||
|
||||
// boot actual test database
|
||||
let database = Database::new(Arc::clone(&application), db_config.clone());
|
||||
|
||||
// db is broken
|
||||
let err = database.wait_for_init().await.unwrap_err();
|
||||
assert!(matches!(err.as_ref(), InitError::Replay { .. }));
|
||||
|
||||
// skip replay
|
||||
database.skip_replay().unwrap().await.unwrap();
|
||||
database.wait_for_init().await.unwrap();
|
||||
|
||||
// wait for ingest
|
||||
let entry_d = lp_to_entries("table_1,partition_by=d foo=4 40", &partition_template)
|
||||
.pop()
|
||||
.unwrap();
|
||||
state.push_entry(SequencedEntry::new_from_sequence(
|
||||
Sequence::new(0, 13),
|
||||
Utc::now(),
|
||||
entry_d,
|
||||
));
|
||||
let db = database.initialized_db().unwrap();
|
||||
let t_0 = Instant::now();
|
||||
loop {
|
||||
if db.partition_summary("table_1", "partition_by_d").is_some() {
|
||||
break;
|
||||
}
|
||||
|
||||
assert!(t_0.elapsed() < Duration::from_secs(10));
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// partition a was forgotten, partition b is still persisted, partition c was skipped
|
||||
assert!(db.partition_summary("table_1", "partition_by_a").is_none());
|
||||
assert!(db.partition_summary("table_1", "partition_by_b").is_some());
|
||||
assert!(db.partition_summary("table_1", "partition_by_c").is_none());
|
||||
|
||||
// cannot skip when database is initialized
|
||||
let res = database.skip_replay();
|
||||
assert!(matches!(res, Err(Error::InvalidState { .. })));
|
||||
|
||||
// clean up
|
||||
database.shutdown();
|
||||
database.join().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue