From 696ebdc4db04f5a7b22bde405428f9e285bd5604 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 3 Jun 2021 16:46:33 +0100 Subject: [PATCH] feat: recover failed lifecycle actions (#1099) (#1592) * feat: recover failed lifecycle actions (#1099) * chore: review feedback * chore: fix logical conflicts --- server/src/db.rs | 2 +- server/src/db/catalog.rs | 12 ++++ server/src/db/catalog/chunk.rs | 31 ++++++--- server/src/db/lifecycle.rs | 112 ++++++++++++++++++++++++--------- tracker/src/task.rs | 5 ++ 5 files changed, 126 insertions(+), 36 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index f3ad58301e..30000a55bd 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -522,7 +522,7 @@ impl Db { partition_key, table_name, chunk_id, - action: lifecycle_action.name(), + action: lifecycle_action.metadata().name(), } .fail(); } diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index 219039983a..81208a0219 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -96,6 +96,18 @@ pub enum Error { actual: String, }, + #[snafu(display( + "Cannot clear a lifecycle action '{}' for chunk {}:{} that is still running", + action, + partition_key, + chunk_id + ))] + IncompleteLifecycleAction { + partition_key: String, + chunk_id: u32, + action: String, + }, + #[snafu(display( "Cannot add an empty chunk to the catalog {}:{}:{}", partition_key, diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 25b159d75b..4f43e53932 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -325,8 +325,8 @@ impl Chunk { &self.stage } - pub fn lifecycle_action(&self) -> Option<&ChunkLifecycleAction> { - self.lifecycle_action.as_ref().map(|x| x.metadata()) + pub fn lifecycle_action(&self) -> Option<&TaskTracker> { + self.lifecycle_action.as_ref() } pub fn time_of_first_write(&self) -> Option> { @@ -759,6 +759,23 @@ impl Chunk { self.lifecycle_action = None; Ok(()) } + + /// Abort the current lifecycle action if any + /// + /// Returns an error if the lifecycle action is still running + pub fn clear_lifecycle_action(&mut self) -> Result<()> { + if let Some(tracker) = &self.lifecycle_action { + if !tracker.is_complete() { + return Err(Error::IncompleteLifecycleAction { + partition_key: self.partition_key.to_string(), + chunk_id: self.id, + action: tracker.metadata().name().to_string(), + }); + } + self.lifecycle_action = None + } + Ok(()) + } } #[cfg(test)] @@ -830,15 +847,15 @@ mod tests { let registration = TaskRegistration::new(); // no action to begin with - assert_eq!(chunk.lifecycle_action(), None); + assert!(chunk.lifecycle_action().is_none()); // set some action chunk .set_lifecycle_action(ChunkLifecycleAction::Moving, ®istration) .unwrap(); assert_eq!( - chunk.lifecycle_action(), - Some(&ChunkLifecycleAction::Moving) + *chunk.lifecycle_action().unwrap().metadata(), + ChunkLifecycleAction::Moving ); // setting an action while there is one running fails @@ -860,8 +877,8 @@ mod tests { .set_lifecycle_action(ChunkLifecycleAction::Compacting, ®istration) .unwrap(); assert_eq!( - chunk.lifecycle_action(), - Some(&ChunkLifecycleAction::Compacting) + *chunk.lifecycle_action().unwrap().metadata(), + ChunkLifecycleAction::Compacting ); } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 7ac04e435f..23db4beaf5 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -15,9 +15,11 @@ use super::{ use data_types::database_rules::SortOrder; use futures::future::BoxFuture; use std::collections::HashSet; -use std::time::Duration; +use std::time::{Duration, Instant}; pub const DEFAULT_LIFECYCLE_BACKOFF: Duration = Duration::from_secs(1); +/// Number of seconds to wait before retying a failed lifecycle action +pub const LIFECYCLE_ACTION_BACKOFF: Duration = Duration::from_secs(10); /// Handles the lifecycle of chunks within a Db pub struct LifecycleManager { @@ -47,7 +49,7 @@ impl LifecycleManager { /// /// Returns a future that resolves when this method should be called next pub fn check_for_work(&mut self) -> BoxFuture<'static, ()> { - ChunkMover::check_for_work(self, Utc::now()) + ChunkMover::check_for_work(self, Utc::now(), Instant::now()) } } @@ -98,7 +100,11 @@ trait ChunkMover { /// The core policy logic /// /// Returns a future that resolves when this method should be called next - fn check_for_work(&mut self, now: DateTime) -> BoxFuture<'static, ()> { + fn check_for_work( + &mut self, + now: DateTime, + now_instant: Instant, + ) -> BoxFuture<'static, ()> { let rules = self.rules(); let chunks = self.chunks(&rules.sort_order); @@ -127,7 +133,17 @@ trait ChunkMover { let would_move = can_move(&rules, &*chunk_guard, now); let would_write = write_tracker.is_none() && rules.persist; - if chunk_guard.lifecycle_action().is_some() { + if let Some(lifecycle_action) = chunk_guard.lifecycle_action() { + if lifecycle_action.is_complete() + && now_instant.duration_since(lifecycle_action.start_instant()) + >= LIFECYCLE_ACTION_BACKOFF + { + std::mem::drop(chunk_guard); + chunk + .write() + .clear_lifecycle_action() + .expect("failed to clear lifecycle action"); + } continue; } @@ -171,8 +187,6 @@ trait ChunkMover { // TODO: unload read buffer (https://github.com/influxdata/influxdb_iox/issues/1400) _ => {} } - - // TODO: Find and recover cancelled move jobs (#1099) } if let Some(soft_limit) = rules.buffer_size_soft { @@ -379,7 +393,7 @@ mod tests { convert::TryFrom, num::{NonZeroU32, NonZeroUsize}, }; - use tracker::TaskRegistry; + use tracker::{TaskRegistration, TaskRegistry}; fn from_secs(secs: i64) -> DateTime { DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(secs, 0), Utc) @@ -635,7 +649,7 @@ mod tests { ]; let mut mover = DummyMover::new(rules, chunks); - mover.check_for_work(from_secs(40)); + mover.check_for_work(from_secs(40), Instant::now()); assert_eq!(mover.events, vec![]); } @@ -652,24 +666,24 @@ mod tests { ]; let mut mover = DummyMover::new(rules, chunks); - mover.check_for_work(from_secs(9)); + mover.check_for_work(from_secs(9), Instant::now()); assert_eq!(mover.events, vec![]); - mover.check_for_work(from_secs(11)); + mover.check_for_work(from_secs(11), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Move(2)]); - mover.check_for_work(from_secs(12)); + mover.check_for_work(from_secs(12), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Move(2)]); // Should move in the order of chunks in the case of multiple candidates - mover.check_for_work(from_secs(20)); + mover.check_for_work(from_secs(20), Instant::now()); assert_eq!( mover.events, vec![MoverEvents::Move(2), MoverEvents::Move(0)] ); - mover.check_for_work(from_secs(20)); + mover.check_for_work(from_secs(20), Instant::now()); assert_eq!( mover.events, @@ -695,13 +709,13 @@ mod tests { let (tracker, registration) = registry.register(()); mover.move_tracker = Some(tracker); - mover.check_for_work(from_secs(80)); + mover.check_for_work(from_secs(80), Instant::now()); assert_eq!(mover.events, vec![]); std::mem::drop(registration); - mover.check_for_work(from_secs(80)); + mover.check_for_work(from_secs(80), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Move(0)]); } @@ -721,12 +735,12 @@ mod tests { // of check_for_work had started a background move task mover.move_tracker = Some(tracker); - let future = mover.check_for_work(from_secs(0)); + let future = mover.check_for_work(from_secs(0), Instant::now()); tokio::time::timeout(Duration::from_millis(1), future) .await .expect_err("expected timeout"); - let future = mover.check_for_work(from_secs(0)); + let future = mover.check_for_work(from_secs(0), Instant::now()); std::mem::drop(registration); tokio::time::timeout(Duration::from_millis(1), future) .await @@ -750,13 +764,13 @@ mod tests { // Expect to move chunk_id=1 first, despite it coming second in // the order, because chunk_id=0 will only become old enough at t=100 - mover.check_for_work(from_secs(80)); + mover.check_for_work(from_secs(80), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Move(1)]); - mover.check_for_work(from_secs(90)); + mover.check_for_work(from_secs(90), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Move(1)]); - mover.check_for_work(from_secs(110)); + mover.check_for_work(from_secs(110), Instant::now()); assert_eq!( mover.events, @@ -784,7 +798,7 @@ mod tests { let mut mover = DummyMover::new(rules.clone(), chunks); - mover.check_for_work(from_secs(10)); + mover.check_for_work(from_secs(10), Instant::now()); assert_eq!(mover.events, vec![]); let chunks = vec![ @@ -801,7 +815,7 @@ mod tests { let mut mover = DummyMover::new(rules, chunks); - mover.check_for_work(from_secs(10)); + mover.check_for_work(from_secs(10), Instant::now()); assert_eq!( mover.events, vec![MoverEvents::Drop(2), MoverEvents::Drop(4)] @@ -827,7 +841,7 @@ mod tests { let mut mover = DummyMover::new(rules.clone(), chunks); - mover.check_for_work(from_secs(10)); + mover.check_for_work(from_secs(10), Instant::now()); assert_eq!(mover.events, vec![]); let chunks = vec![ @@ -844,7 +858,7 @@ mod tests { let mut mover = DummyMover::new(rules, chunks); - mover.check_for_work(from_secs(10)); + mover.check_for_work(from_secs(10), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Drop(4)]); } @@ -860,7 +874,7 @@ mod tests { let mut mover = DummyMover::new(rules, chunks); - mover.check_for_work(from_secs(10)); + mover.check_for_work(from_secs(10), Instant::now()); assert_eq!(mover.events, vec![]); } @@ -887,7 +901,7 @@ mod tests { let mut mover = DummyMover::new(rules, chunks); - mover.check_for_work(from_secs(0)); + mover.check_for_work(from_secs(0), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Write(1)]); } @@ -904,13 +918,55 @@ mod tests { let mut mover = DummyMover::new(rules, chunks); // Initially can't move - mover.check_for_work(from_secs(80)); + mover.check_for_work(from_secs(80), Instant::now()); assert_eq!(mover.events, vec![]); mover.chunks[0].write().freeze().unwrap(); // As soon as closed can move - mover.check_for_work(from_secs(80)); + mover.check_for_work(from_secs(80), Instant::now()); assert_eq!(mover.events, vec![MoverEvents::Move(0)]); } + + #[test] + fn test_recovers_lifecycle_action() { + let rules = LifecycleRules::default(); + let chunks = vec![new_chunk(0, None, None)]; + let mut mover = DummyMover::new(rules, chunks); + + let chunk = Arc::clone(&mover.chunks[0]); + chunk.write().freeze().unwrap(); + + let r0 = TaskRegistration::default(); + let tracker = { + let mut chunk = chunk.write(); + chunk.set_moving(&r0).unwrap(); + chunk.lifecycle_action().unwrap().clone() + }; + + // Shouldn't do anything + mover.check_for_work(from_secs(0), tracker.start_instant()); + assert!(chunk.read().lifecycle_action().is_some()); + + // Shouldn't do anything as job hasn't finished + mover.check_for_work( + from_secs(0), + tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF, + ); + assert!(chunk.read().lifecycle_action().is_some()); + + // "Finish" job + std::mem::drop(r0); + + // Shouldn't do anything as insufficient time passed + mover.check_for_work(from_secs(0), tracker.start_instant()); + assert!(chunk.read().lifecycle_action().is_some()); + + // Should clear job + mover.check_for_work( + from_secs(0), + tracker.start_instant() + LIFECYCLE_ACTION_BACKOFF, + ); + assert!(chunk.read().lifecycle_action().is_none()); + } } diff --git a/tracker/src/task.rs b/tracker/src/task.rs index 9f2800c9bc..28e8b70c70 100644 --- a/tracker/src/task.rs +++ b/tracker/src/task.rs @@ -269,6 +269,11 @@ impl TaskTracker { } } + /// Returns the instant the tracker was created + pub fn start_instant(&self) -> Instant { + self.state.start_instant + } + /// Returns if this tracker has been cancelled pub fn is_cancelled(&self) -> bool { self.state.cancel_token.is_cancelled()