feat: recover failed lifecycle actions (#1099) (#1592)

* feat: recover failed lifecycle actions (#1099)

* chore: review feedback

* chore: fix logical conflicts
pull/24376/head
Raphael Taylor-Davies 2021-06-03 16:46:33 +01:00 committed by GitHub
parent 5037f5e23c
commit 696ebdc4db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 126 additions and 36 deletions

View File

@ -522,7 +522,7 @@ impl Db {
partition_key,
table_name,
chunk_id,
action: lifecycle_action.name(),
action: lifecycle_action.metadata().name(),
}
.fail();
}

View File

@ -96,6 +96,18 @@ pub enum Error {
actual: String,
},
#[snafu(display(
"Cannot clear a lifecycle action '{}' for chunk {}:{} that is still running",
action,
partition_key,
chunk_id
))]
IncompleteLifecycleAction {
partition_key: String,
chunk_id: u32,
action: String,
},
#[snafu(display(
"Cannot add an empty chunk to the catalog {}:{}:{}",
partition_key,

View File

@ -325,8 +325,8 @@ impl Chunk {
&self.stage
}
pub fn lifecycle_action(&self) -> Option<&ChunkLifecycleAction> {
self.lifecycle_action.as_ref().map(|x| x.metadata())
pub fn lifecycle_action(&self) -> Option<&TaskTracker<ChunkLifecycleAction>> {
self.lifecycle_action.as_ref()
}
pub fn time_of_first_write(&self) -> Option<DateTime<Utc>> {
@ -759,6 +759,23 @@ impl Chunk {
self.lifecycle_action = None;
Ok(())
}
/// Abort the current lifecycle action if any
///
/// Returns an error if the lifecycle action is still running
pub fn clear_lifecycle_action(&mut self) -> Result<()> {
if let Some(tracker) = &self.lifecycle_action {
if !tracker.is_complete() {
return Err(Error::IncompleteLifecycleAction {
partition_key: self.partition_key.to_string(),
chunk_id: self.id,
action: tracker.metadata().name().to_string(),
});
}
self.lifecycle_action = None
}
Ok(())
}
}
#[cfg(test)]
@ -830,15 +847,15 @@ mod tests {
let registration = TaskRegistration::new();
// no action to begin with
assert_eq!(chunk.lifecycle_action(), None);
assert!(chunk.lifecycle_action().is_none());
// set some action
chunk
.set_lifecycle_action(ChunkLifecycleAction::Moving, &registration)
.unwrap();
assert_eq!(
chunk.lifecycle_action(),
Some(&ChunkLifecycleAction::Moving)
*chunk.lifecycle_action().unwrap().metadata(),
ChunkLifecycleAction::Moving
);
// setting an action while there is one running fails
@ -860,8 +877,8 @@ mod tests {
.set_lifecycle_action(ChunkLifecycleAction::Compacting, &registration)
.unwrap();
assert_eq!(
chunk.lifecycle_action(),
Some(&ChunkLifecycleAction::Compacting)
*chunk.lifecycle_action().unwrap().metadata(),
ChunkLifecycleAction::Compacting
);
}

View File

@ -15,9 +15,11 @@ use super::{
use data_types::database_rules::SortOrder;
use futures::future::BoxFuture;
use std::collections::HashSet;
use std::time::Duration;
use std::time::{Duration, Instant};
pub const DEFAULT_LIFECYCLE_BACKOFF: Duration = Duration::from_secs(1);
/// Number of seconds to wait before retying a failed lifecycle action
pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10);
/// Handles the lifecycle of chunks within a Db
pub struct LifecycleManager {
@ -47,7 +49,7 @@ impl LifecycleManager {
///
/// Returns a future that resolves when this method should be called next
pub fn check_for_work(&mut self) -> BoxFuture<'static, ()> {
ChunkMover::check_for_work(self, Utc::now())
ChunkMover::check_for_work(self, Utc::now(), Instant::now())
}
}
@ -98,7 +100,11 @@ trait ChunkMover {
/// The core policy logic
///
/// Returns a future that resolves when this method should be called next
fn check_for_work(&mut self, now: DateTime<Utc>) -> BoxFuture<'static, ()> {
fn check_for_work(
&mut self,
now: DateTime<Utc>,
now_instant: Instant,
) -> BoxFuture<'static, ()> {
let rules = self.rules();
let chunks = self.chunks(&rules.sort_order);
@ -127,7 +133,17 @@ trait ChunkMover {
let would_move = can_move(&rules, &*chunk_guard, now);
let would_write = write_tracker.is_none() && rules.persist;
if chunk_guard.lifecycle_action().is_some() {
if let Some(lifecycle_action) = chunk_guard.lifecycle_action() {
if lifecycle_action.is_complete()
&& now_instant.duration_since(lifecycle_action.start_instant())
>= LIFECYCLE_ACTION_BACKOFF
{
std::mem::drop(chunk_guard);
chunk
.write()
.clear_lifecycle_action()
.expect("failed to clear lifecycle action");
}
continue;
}
@ -171,8 +187,6 @@ trait ChunkMover {
// TODO: unload read buffer (https://github.com/influxdata/influxdb_iox/issues/1400)
_ => {}
}
// TODO: Find and recover cancelled move jobs (#1099)
}
if let Some(soft_limit) = rules.buffer_size_soft {
@ -379,7 +393,7 @@ mod tests {
convert::TryFrom,
num::{NonZeroU32, NonZeroUsize},
};
use tracker::TaskRegistry;
use tracker::{TaskRegistration, TaskRegistry};
fn from_secs(secs: i64) -> DateTime<Utc> {
DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc)
@ -635,7 +649,7 @@ mod tests {
];
let mut mover = DummyMover::new(rules, chunks);
mover.check_for_work(from_secs(40));
mover.check_for_work(from_secs(40), Instant::now());
assert_eq!(mover.events, vec![]);
}
@ -652,24 +666,24 @@ mod tests {
];
let mut mover = DummyMover::new(rules, chunks);
mover.check_for_work(from_secs(9));
mover.check_for_work(from_secs(9), Instant::now());
assert_eq!(mover.events, vec![]);
mover.check_for_work(from_secs(11));
mover.check_for_work(from_secs(11), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Move(2)]);
mover.check_for_work(from_secs(12));
mover.check_for_work(from_secs(12), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Move(2)]);
// Should move in the order of chunks in the case of multiple candidates
mover.check_for_work(from_secs(20));
mover.check_for_work(from_secs(20), Instant::now());
assert_eq!(
mover.events,
vec![MoverEvents::Move(2), MoverEvents::Move(0)]
);
mover.check_for_work(from_secs(20));
mover.check_for_work(from_secs(20), Instant::now());
assert_eq!(
mover.events,
@ -695,13 +709,13 @@ mod tests {
let (tracker, registration) = registry.register(());
mover.move_tracker = Some(tracker);
mover.check_for_work(from_secs(80));
mover.check_for_work(from_secs(80), Instant::now());
assert_eq!(mover.events, vec![]);
std::mem::drop(registration);
mover.check_for_work(from_secs(80));
mover.check_for_work(from_secs(80), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Move(0)]);
}
@ -721,12 +735,12 @@ mod tests {
// of check_for_work had started a background move task
mover.move_tracker = Some(tracker);
let future = mover.check_for_work(from_secs(0));
let future = mover.check_for_work(from_secs(0), Instant::now());
tokio::time::timeout(Duration::from_millis(1), future)
.await
.expect_err("expected timeout");
let future = mover.check_for_work(from_secs(0));
let future = mover.check_for_work(from_secs(0), Instant::now());
std::mem::drop(registration);
tokio::time::timeout(Duration::from_millis(1), future)
.await
@ -750,13 +764,13 @@ mod tests {
// Expect to move chunk_id=1 first, despite it coming second in
// the order, because chunk_id=0 will only become old enough at t=100
mover.check_for_work(from_secs(80));
mover.check_for_work(from_secs(80), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Move(1)]);
mover.check_for_work(from_secs(90));
mover.check_for_work(from_secs(90), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Move(1)]);
mover.check_for_work(from_secs(110));
mover.check_for_work(from_secs(110), Instant::now());
assert_eq!(
mover.events,
@ -784,7 +798,7 @@ mod tests {
let mut mover = DummyMover::new(rules.clone(), chunks);
mover.check_for_work(from_secs(10));
mover.check_for_work(from_secs(10), Instant::now());
assert_eq!(mover.events, vec![]);
let chunks = vec![
@ -801,7 +815,7 @@ mod tests {
let mut mover = DummyMover::new(rules, chunks);
mover.check_for_work(from_secs(10));
mover.check_for_work(from_secs(10), Instant::now());
assert_eq!(
mover.events,
vec![MoverEvents::Drop(2), MoverEvents::Drop(4)]
@ -827,7 +841,7 @@ mod tests {
let mut mover = DummyMover::new(rules.clone(), chunks);
mover.check_for_work(from_secs(10));
mover.check_for_work(from_secs(10), Instant::now());
assert_eq!(mover.events, vec![]);
let chunks = vec![
@ -844,7 +858,7 @@ mod tests {
let mut mover = DummyMover::new(rules, chunks);
mover.check_for_work(from_secs(10));
mover.check_for_work(from_secs(10), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Drop(4)]);
}
@ -860,7 +874,7 @@ mod tests {
let mut mover = DummyMover::new(rules, chunks);
mover.check_for_work(from_secs(10));
mover.check_for_work(from_secs(10), Instant::now());
assert_eq!(mover.events, vec![]);
}
@ -887,7 +901,7 @@ mod tests {
let mut mover = DummyMover::new(rules, chunks);
mover.check_for_work(from_secs(0));
mover.check_for_work(from_secs(0), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Write(1)]);
}
@ -904,13 +918,55 @@ mod tests {
let mut mover = DummyMover::new(rules, chunks);
// Initially can't move
mover.check_for_work(from_secs(80));
mover.check_for_work(from_secs(80), Instant::now());
assert_eq!(mover.events, vec![]);
mover.chunks[0].write().freeze().unwrap();
// As soon as closed can move
mover.check_for_work(from_secs(80));
mover.check_for_work(from_secs(80), Instant::now());
assert_eq!(mover.events, vec![MoverEvents::Move(0)]);
}
#[test]
fn test_recovers_lifecycle_action() {
let rules = LifecycleRules::default();
let chunks = vec![new_chunk(0, None, None)];
let mut mover = DummyMover::new(rules, chunks);
let chunk = Arc::clone(&mover.chunks[0]);
chunk.write().freeze().unwrap();
let r0 = TaskRegistration::default();
let tracker = {
let mut chunk = chunk.write();
chunk.set_moving(&r0).unwrap();
chunk.lifecycle_action().unwrap().clone()
};
// Shouldn't do anything
mover.check_for_work(from_secs(0), tracker.start_instant());
assert!(chunk.read().lifecycle_action().is_some());
// Shouldn't do anything as job hasn't finished
mover.check_for_work(
from_secs(0),
tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF,
);
assert!(chunk.read().lifecycle_action().is_some());
// "Finish" job
std::mem::drop(r0);
// Shouldn't do anything as insufficient time passed
mover.check_for_work(from_secs(0), tracker.start_instant());
assert!(chunk.read().lifecycle_action().is_some());
// Should clear job
mover.check_for_work(
from_secs(0),
tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF,
);
assert!(chunk.read().lifecycle_action().is_none());
}
}

View File

@ -269,6 +269,11 @@ impl<T> TaskTracker<T> {
}
}
/// Returns the instant the tracker was created
pub fn start_instant(&self) -> Instant {
self.state.start_instant
}
/// Returns if this tracker has been cancelled
pub fn is_cancelled(&self) -> bool {
self.state.cancel_token.is_cancelled()