feat: add flag to skip replay

Closes #2169.
pull/24376/head
Marco Neumann 2021-08-02 18:14:19 +02:00
parent bb8021d9fd
commit c912e91c95
7 changed files with 214 additions and 22 deletions

View File

@ -737,9 +737,20 @@ impl Db {
}
/// Perform sequencer-driven replay for this DB.
pub async fn perform_replay(&self, replay_plan: &ReplayPlan) -> Result<()> {
use crate::db::replay::perform_replay;
perform_replay(self, replay_plan).await.context(ReplayError)
///
/// When `skip_and_seek` is set then no real replay will be performed. Instead the write buffer streams will be set
/// to the current high watermark and normal playback will continue from there.
pub async fn perform_replay(
&self,
replay_plan: &ReplayPlan,
skip_and_seek: bool,
) -> Result<()> {
use crate::db::replay::{perform_replay, seek_to_end};
if skip_and_seek {
seek_to_end(self).await.context(ReplayError)
} else {
perform_replay(self, replay_plan).await.context(ReplayError)
}
}
/// Background worker function

View File

@ -65,6 +65,34 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Seek to latest known sequence.
///
/// This can be used when no replay is wanted.
pub async fn seek_to_end(db: &Db) -> Result<()> {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &db.write_buffer {
let mut write_buffer = write_buffer
.try_lock()
.expect("no streams should exist at this point");
let mut watermarks = vec![];
for (sequencer_id, stream) in write_buffer.streams() {
let watermark = (stream.fetch_high_watermark)()
.await
.context(SeekError { sequencer_id })?;
watermarks.push((sequencer_id, watermark));
}
for (sequencer_id, watermark) in watermarks {
write_buffer
.seek(sequencer_id, watermark)
.await
.context(SeekError { sequencer_id })?;
}
}
Ok(())
}
/// Perform sequencer-driven replay for this DB.
pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> {
if let Some(WriteBufferConfig::Reading(write_buffer)) = &db.write_buffer {
@ -342,7 +370,7 @@ mod tests {
Step::Replay => {
test_db
.db
.perform_replay(&test_db.replay_plan)
.perform_replay(&test_db.replay_plan, false)
.await
.unwrap();
}
@ -1226,7 +1254,7 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(&replay_plan).await;
let res = db.perform_replay(&replay_plan, false).await;
assert_contains!(
res.unwrap_err().to_string(),
"Replay plan references unknown sequencer"
@ -1274,10 +1302,105 @@ mod tests {
let replay_plan = replay_planner.build().unwrap();
// replay fails
let res = db.perform_replay(&replay_plan).await;
let res = db.perform_replay(&replay_plan, false).await;
assert_contains!(
res.unwrap_err().to_string(),
"Cannot replay: For sequencer 0 expected to find sequence 1 but replay jumped to 2"
);
}
#[tokio::test]
async fn seek_to_end_works() {
// setup watermarks:
// 0 -> 3 + 1 = 4
// 1 -> 1 + 1 = 2
// 2 -> no content = 0
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(3);
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 0),
Utc::now(),
lp_to_entry("cpu bar=0 0"),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 3),
Utc::now(),
lp_to_entry("cpu bar=3 3"),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(1, 1),
Utc::now(),
lp_to_entry("cpu bar=11 11"),
));
let write_buffer = MockBufferForReading::new(write_buffer_state.clone());
// create DB
let test_db = TestDb::builder()
.write_buffer(WriteBufferConfig::Reading(Arc::new(
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
)))
.build()
.await;
let db = &test_db.db;
// seek
db.perform_replay(&test_db.replay_plan, true).await.unwrap();
// add more data
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(0, 4),
Utc::now(),
lp_to_entry("cpu bar=4 4"),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(1, 9),
Utc::now(),
lp_to_entry("cpu bar=19 19"),
));
write_buffer_state.push_entry(SequencedEntry::new_from_sequence(
Sequence::new(2, 0),
Utc::now(),
lp_to_entry("cpu bar=20 20"),
));
// start background worker
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// wait until checks pass
let checks = vec![Check::Query(
"select * from cpu order by time",
vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 4 | 1970-01-01T00:00:00.000000004Z |",
"| 19 | 1970-01-01T00:00:00.000000019Z |",
"| 20 | 1970-01-01T00:00:00.000000020Z |",
"+-----+--------------------------------+",
],
)];
let t_0 = Instant::now();
loop {
println!("Try checks...");
if ReplayTest::eval_checks(&checks, false, &test_db).await {
break;
}
if t_0.elapsed() >= Duration::from_secs(10) {
println!("Running into timeout...");
// try to produce nice assertion message
ReplayTest::eval_checks(&checks, true, &test_db).await;
println!("being lucky, assertion passed on last try.");
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
// stop background worker
shutdown.cancel();
join_handle.await.unwrap();
}
}

View File

@ -96,6 +96,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub(crate) async fn initialize_server(
config: Arc<Config>,
wipe_on_error: bool,
skip_replay_and_seek_instead: bool,
) -> Result<Vec<(DatabaseName<'static>, Result<()>)>> {
let root = config.root_path();
@ -120,8 +121,14 @@ pub(crate) async fn initialize_server(
.ok()?;
Some(async move {
let result =
initialize_database(config, root, db_name.clone(), wipe_on_error).await;
let result = initialize_database(
config,
root,
db_name.clone(),
wipe_on_error,
skip_replay_and_seek_instead,
)
.await;
(db_name, result)
})
})
@ -135,6 +142,7 @@ async fn initialize_database(
root: Path,
db_name: DatabaseName<'static>,
wipe_on_error: bool,
skip_replay_and_seek_instead: bool,
) -> Result<()> {
// Reserve name before expensive IO (e.g. loading the preserved catalog)
let mut handle = config
@ -142,7 +150,13 @@ async fn initialize_database(
.map_err(Box::new)
.context(InitDbError)?;
match try_advance_database_init_process_until_complete(&mut handle, &root, wipe_on_error).await
match try_advance_database_init_process_until_complete(
&mut handle,
&root,
wipe_on_error,
skip_replay_and_seek_instead,
)
.await
{
Ok(true) => {
// finished init and keep DB
@ -216,7 +230,7 @@ pub(crate) async fn wipe_preserved_catalog_and_maybe_recover(
let root = config.root_path();
let result =
try_advance_database_init_process_until_complete(&mut handle, &root, true).await;
try_advance_database_init_process_until_complete(&mut handle, &root, true, true).await;
// Commit changes even if failed
handle.commit();
@ -248,9 +262,17 @@ async fn try_advance_database_init_process_until_complete(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
skip_replay_and_seek_instead: bool,
) -> Result<bool> {
loop {
match try_advance_database_init_process(handle, root, wipe_on_error).await? {
match try_advance_database_init_process(
handle,
root,
wipe_on_error,
skip_replay_and_seek_instead,
)
.await?
{
InitProgress::Unfinished => {}
InitProgress::Done => {
return Ok(true);
@ -267,6 +289,7 @@ async fn try_advance_database_init_process(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
skip_replay_and_seek_instead: bool,
) -> Result<InitProgress> {
match handle.state_code() {
DatabaseStateCode::Known => {
@ -326,7 +349,7 @@ async fn try_advance_database_init_process(
let replay_plan = handle
.replay_plan()
.expect("replay plan should exist in this state");
db.perform_replay(&replay_plan)
db.perform_replay(&replay_plan, skip_replay_and_seek_instead)
.await
.map_err(Box::new)
.context(ReplayError)?;

View File

@ -302,6 +302,7 @@ pub struct ServerConfig {
remote_template: Option<RemoteTemplate>,
wipe_catalog_on_error: bool,
skip_replay_and_seek_instead: bool,
}
impl ServerConfig {
@ -310,6 +311,7 @@ impl ServerConfig {
object_store: Arc<ObjectStore>,
metric_registry: Arc<MetricRegistry>,
remote_template: Option<RemoteTemplate>,
skip_replay_and_seek_instead: bool,
) -> Self {
Self {
num_worker_threads: None,
@ -317,6 +319,7 @@ impl ServerConfig {
metric_registry,
remote_template,
wipe_catalog_on_error: true,
skip_replay_and_seek_instead,
}
}
@ -453,11 +456,15 @@ pub struct Server<M: ConnectionManager> {
#[derive(Debug)]
enum ServerStage {
/// Server has started but doesn't have a server id yet
Startup { wipe_catalog_on_error: bool },
Startup {
wipe_catalog_on_error: bool,
skip_replay_and_seek_instead: bool,
},
/// Server can be initialized
InitReady {
wipe_catalog_on_error: bool,
skip_replay_and_seek_instead: bool,
config: Arc<Config>,
last_error: Option<Arc<init::Error>>,
},
@ -503,6 +510,7 @@ where
metric_registry,
remote_template,
wipe_catalog_on_error,
skip_replay_and_seek_instead,
} = config;
let num_worker_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
@ -518,6 +526,7 @@ where
resolver: RwLock::new(Resolver::new(remote_template)),
stage: Arc::new(RwLock::new(ServerStage::Startup {
wipe_catalog_on_error,
skip_replay_and_seek_instead,
})),
}
}
@ -531,9 +540,11 @@ where
match &mut *stage {
ServerStage::Startup {
wipe_catalog_on_error,
skip_replay_and_seek_instead,
} => {
*stage = ServerStage::InitReady {
wipe_catalog_on_error: *wipe_catalog_on_error,
skip_replay_and_seek_instead: *skip_replay_and_seek_instead,
config: Arc::new(Config::new(
Arc::clone(&self.jobs),
Arc::clone(&self.store),
@ -703,17 +714,19 @@ where
/// It will be a no-op if the configs are already loaded and the server is ready.
pub async fn maybe_initialize_server(&self) {
// Explicit scope to help async generator
let (wipe_catalog_on_error, config) = {
let (wipe_catalog_on_error, skip_replay_and_seek_instead, config) = {
let state = self.stage.upgradable_read();
match &*state {
ServerStage::InitReady {
wipe_catalog_on_error,
skip_replay_and_seek_instead,
config,
last_error,
} => {
let config = Arc::clone(config);
let last_error = last_error.clone();
let wipe_catalog_on_error = *wipe_catalog_on_error;
let skip_replay_and_seek_instead = *skip_replay_and_seek_instead;
// Mark the server as initializing and drop lock
@ -723,13 +736,18 @@ where
wipe_catalog_on_error,
last_error,
};
(wipe_catalog_on_error, config)
(wipe_catalog_on_error, skip_replay_and_seek_instead, config)
}
_ => return,
}
};
let init_result = init::initialize_server(Arc::clone(&config), wipe_catalog_on_error).await;
let init_result = init::initialize_server(
Arc::clone(&config),
wipe_catalog_on_error,
skip_replay_and_seek_instead,
)
.await;
let new_stage = match init_result {
// Success -> move to next stage
Ok(results) => {
@ -747,6 +765,7 @@ where
error!(%err, "error during server init");
ServerStage::InitReady {
wipe_catalog_on_error,
skip_replay_and_seek_instead,
config,
last_error: Some(Arc::new(err)),
}
@ -1295,7 +1314,8 @@ mod tests {
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(&registry));
(
test_registry,
ServerConfig::new(Arc::new(object_store), registry, None).with_num_worker_threads(1),
ServerConfig::new(Arc::new(object_store), registry, None, false)
.with_num_worker_threads(1),
)
}
@ -1386,8 +1406,9 @@ mod tests {
store.list_with_delimiter(&store.new_path()).await.unwrap();
let manager = TestConnectionManager::new();
let config2 = ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None)
.with_num_worker_threads(1);
let config2 =
ServerConfig::new(store, Arc::new(MetricRegistry::new()), Option::None, false)
.with_num_worker_threads(1);
let server2 = Server::new(manager, config2);
server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
server2.maybe_initialize_server().await;

View File

@ -239,6 +239,10 @@ Possible values (case insensitive):
default_value = "10485760" // 10 MiB
)]
pub max_http_request_size: usize,
/// Skip replaying the write buffer and seek to high watermark instead.
#[structopt(long = "--skip-replay", env = "IOX_SKIP_REPLAY")]
pub skip_replay_and_seek_instead: bool,
}
pub async fn command(config: Config) -> Result<()> {

View File

@ -138,7 +138,12 @@ pub async fn main(config: Config) -> Result<()> {
let object_storage = Arc::new(object_store);
let metric_registry = Arc::new(metrics::MetricRegistry::new());
let remote_template = config.remote_template.map(RemoteTemplate::new);
let server_config = AppServerConfig::new(object_storage, metric_registry, remote_template);
let server_config = AppServerConfig::new(
object_storage,
metric_registry,
remote_template,
config.skip_replay_and_seek_instead,
);
let server_config = if let Some(n) = config.num_worker_threads {
info!(

View File

@ -899,8 +899,13 @@ mod tests {
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(&registry));
(
test_registry,
AppServerConfig::new(Arc::new(ObjectStore::new_in_memory()), registry, None)
.with_num_worker_threads(1),
AppServerConfig::new(
Arc::new(ObjectStore::new_in_memory()),
registry,
None,
false,
)
.with_num_worker_threads(1),
)
}