From 8185feddb9c2d8f7ed27e1eee8eadea4e52c4736 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 8 Oct 2021 15:17:15 +0200 Subject: [PATCH 1/5] fix: do not break chunk ordering during persistence Fixes #2729. --- lifecycle/src/policy.rs | 153 ++++++++++++++++++++++++++++------------ server/src/db.rs | 33 ++++----- 2 files changed, 118 insertions(+), 68 deletions(-) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index b83b37c5bd..e6e4a93444 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -1,6 +1,6 @@ use crate::{ - LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition, - PersistHandle, + LifecycleChunk, LifecycleDb, LifecyclePartition, LifecycleWriteGuard, LockableChunk, + LockablePartition, PersistHandle, }; use chrono::{DateTime, Utc}; use data_types::{ @@ -387,51 +387,12 @@ where } }; - let mut to_persist = Vec::new(); - for chunk in &chunks { - let chunk = chunk.read(); - trace!(%db_name, %partition, chunk=%chunk.addr(), "considering chunk for persistence"); - - // Check if chunk is eligible for persistence - match chunk.storage() { - ChunkStorage::OpenMutableBuffer - | ChunkStorage::ClosedMutableBuffer - | ChunkStorage::ReadBuffer => {} - ChunkStorage::ReadBufferAndObjectStore | ChunkStorage::ObjectStoreOnly => { - debug!(%db_name, %partition, chunk=%chunk.addr(), storage=?chunk.storage(), - "chunk not eligible due to storage"); - continue; - } - } - - // Chunk's data is entirely after the time we are flushing - // up to, and thus there is reason to include it in the - // plan - if chunk.min_timestamp() > persist_handle.timestamp() { - // Can safely ignore chunk - debug!(%db_name, %partition, chunk=%chunk.addr(), - "chunk does not contain data eligible for persistence"); - continue; - } - - // If the chunk has an outstanding lifecycle action - if let Some(action) = chunk.lifecycle_action() { - // see if we should stall subsequent pull it is - // preventing us from persisting - let stall = action.metadata() == &ChunkLifecycleAction::Compacting; - info!(%db_name, ?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action"); - - // NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`. + let chunks = match select_persistable_chunks(&chunks, db_name, persist_handle.timestamp()) { + Ok(chunks) => chunks, + Err(stall) => { return stall; } - - to_persist.push(chunk); - } - - let chunks = to_persist - .into_iter() - .map(|chunk| chunk.upgrade()) - .collect(); + }; let tracker = LockablePartition::persist_chunks(partition, chunks, persist_handle) .expect("failed to persist chunks") @@ -650,6 +611,73 @@ fn sort_free_candidates

(candidates: &mut Vec>) { }) } +pub fn select_persistable_chunks<'a, P, D>( + chunks: &'a [D], + db_name: &DatabaseName<'static>, + flush_ts: DateTime, +) -> Result>, bool> +where + D: LockableChunk, + P: LifecycleChunk, +{ + let mut to_persist = Vec::new(); + let mut to_persist_gap = Vec::new(); + + for chunk in chunks { + let chunk = chunk.read(); + trace!(%db_name, chunk=%chunk.addr(), "considering chunk for persistence"); + + // Check if chunk is eligible for persistence + match chunk.storage() { + ChunkStorage::OpenMutableBuffer + | ChunkStorage::ClosedMutableBuffer + | ChunkStorage::ReadBuffer => {} + ChunkStorage::ReadBufferAndObjectStore | ChunkStorage::ObjectStoreOnly => { + debug!(%db_name, chunk=%chunk.addr(), storage=?chunk.storage(), + "chunk not eligible due to storage"); + continue; + } + } + + // Chunk's data is entirely after the time we are flushing + // up to, and thus there is reason to include it in the + // plan + if chunk.min_timestamp() > flush_ts { + // Ignore chunk for now, but we might need it later to closer chunk order gaps + debug!(%db_name, chunk=%chunk.addr(), + "chunk does not contain data eligible for persistence"); + if chunk.lifecycle_action().is_none() { + to_persist_gap.push(chunk); + } + continue; + } + + // If the chunk has an outstanding lifecycle action + if let Some(action) = chunk.lifecycle_action() { + // see if we should stall subsequent pull it is + // preventing us from persisting + let stall = action.metadata() == &ChunkLifecycleAction::Compacting; + info!(%db_name, ?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action"); + + // NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`. + return Err(stall); + } + + // persist this chunk and the gap + to_persist.append(&mut to_persist_gap); + to_persist.push(chunk); + } + + // At this point `to_persist_gap` might be non-empty. This is fine since these are only chunks at the end of the + // order-based list, so it's not really a gap. + + let chunks = to_persist + .into_iter() + .map(|chunk| chunk.upgrade()) + .collect(); + Ok(chunks) +} + #[cfg(test)] mod tests { use super::*; @@ -1712,13 +1740,35 @@ mod tests { .with_min_timestamp(from_secs(21)) .with_action(ChunkLifecycleAction::Compacting), TestChunk::new(ChunkId::new_test(14), 0, ChunkStorage::ClosedMutableBuffer) - .with_min_timestamp(from_secs(21)), + .with_min_timestamp(from_secs(21)) + .with_order(ChunkOrder::new(10).unwrap()), TestChunk::new(ChunkId::new_test(15), 0, ChunkStorage::ClosedMutableBuffer) .with_min_timestamp(from_secs(10)), TestChunk::new(ChunkId::new_test(16), 0, ChunkStorage::ReadBuffer) .with_min_timestamp(from_secs(5)), ]) .with_persistence(1_000, now, from_secs(20)), + // Checks that we include chunks in a closed "order"-based interval. + // Note that the chunks here are ordered in reverse to check if the lifecycle policy really uses the chunk + // order during iteration. + TestPartition::new(vec![ + TestChunk::new(ChunkId::new_test(24), 0, ChunkStorage::ReadBuffer) + .with_min_timestamp(from_secs(25)) + .with_order(ChunkOrder::new(5).unwrap()), + TestChunk::new(ChunkId::new_test(25), 0, ChunkStorage::ReadBuffer) + .with_min_timestamp(from_secs(5)) + .with_order(ChunkOrder::new(4).unwrap()), + TestChunk::new(ChunkId::new_test(26), 0, ChunkStorage::ReadBuffer) + .with_min_timestamp(from_secs(25)) + .with_order(ChunkOrder::new(3).unwrap()), + TestChunk::new(ChunkId::new_test(27), 0, ChunkStorage::ReadBuffer) + .with_min_timestamp(from_secs(5)) + .with_order(ChunkOrder::new(2).unwrap()), + TestChunk::new(ChunkId::new_test(28), 0, ChunkStorage::ReadBuffer) + .with_min_timestamp(from_secs(25)) + .with_order(ChunkOrder::new(1).unwrap()), + ]) + .with_persistence(1_000, now, from_secs(20)), ]; let db = TestDb::from_partitions(rules, partitions); @@ -1735,7 +1785,16 @@ mod tests { MoverEvents::Persist(vec![ChunkId::new_test(15), ChunkId::new_test(16)]), // 17 is the resulting chunk from the persist split above // This is "quirk" of TestPartition operations being instantaneous - MoverEvents::Compact(vec![ChunkId::new_test(14), ChunkId::new_test(17)]) + MoverEvents::Compact(vec![ChunkId::new_test(17), ChunkId::new_test(14)]), + MoverEvents::Persist(vec![ + ChunkId::new_test(28), + ChunkId::new_test(27), + ChunkId::new_test(26), + ChunkId::new_test(25) + ]), + // 29 is the resulting chunk from the persist split above + // This is "quirk" of TestPartition operations being instantaneous + MoverEvents::Compact(vec![ChunkId::new_test(29), ChunkId::new_test(24)]), ] ); } diff --git a/server/src/db.rs b/server/src/db.rs index e7df0e1017..1dc7923308 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -12,6 +12,7 @@ use std::{ time::Duration, }; +use ::lifecycle::select_persistable_chunks; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; use parking_lot::{Mutex, RwLock}; @@ -686,6 +687,8 @@ impl Db { partition_key: &str, force: bool, ) -> Result>> { + let db_name = self.rules.read().name.clone(); + // Use explicit scope to ensure the async generator doesn't // assume the locks have to possibly live across the `await` let fut = { @@ -708,28 +711,16 @@ impl Db { partition_key, })?; - // get chunks for persistence, break after first chunk that cannot be persisted due to lifecycle reasons - let chunks = chunks - .iter() - .filter_map(|chunk| { - let chunk = chunk.read(); - if matches!(chunk.stage(), ChunkStage::Persisted { .. }) - || (chunk.min_timestamp() > flush_handle.timestamp()) - { - None - } else { - Some(chunk) + let chunks = + match select_persistable_chunks(&chunks, &db_name, flush_handle.timestamp()) { + Ok(chunks) => chunks, + Err(_) => { + return Err(Error::CannotFlushPartition { + table_name: table_name.to_string(), + partition_key: partition_key.to_string(), + }); } - }) - .map(|chunk| match chunk.lifecycle_action() { - Some(_) => CannotFlushPartition { - table_name, - partition_key, - } - .fail(), - None => Ok(chunk.upgrade()), - }) - .collect::, _>>()?; + }; let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle) .context(LifecycleError)?; From 3561dfa16c0f178090c1c858960ced68f8e1e112 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 11 Oct 2021 16:46:55 +0200 Subject: [PATCH 2/5] refactor: reduce vector re-allocations Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> --- lifecycle/src/policy.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index e6e4a93444..50735131af 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -620,8 +620,8 @@ where D: LockableChunk, P: LifecycleChunk, { - let mut to_persist = Vec::new(); - let mut to_persist_gap = Vec::new(); + let mut to_persist = Vec::with_capacity(chunks.len()); + let mut to_persist_gap = Vec::with_capacity(chunks.len()); for chunk in chunks { let chunk = chunk.read(); From 9ff2213ecb41a23275f1768f50990c515c8b90a0 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 11 Oct 2021 16:47:18 +0200 Subject: [PATCH 3/5] fix: typo Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- lifecycle/src/policy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 50735131af..cb864d322b 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -643,7 +643,7 @@ where // up to, and thus there is reason to include it in the // plan if chunk.min_timestamp() > flush_ts { - // Ignore chunk for now, but we might need it later to closer chunk order gaps + // Ignore chunk for now, but we might need it later to close chunk order gaps debug!(%db_name, chunk=%chunk.addr(), "chunk does not contain data eligible for persistence"); if chunk.lifecycle_action().is_none() { From 8ab3ffde8c38faa4027e0dcead1f44087da9770f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 11 Oct 2021 16:56:49 +0200 Subject: [PATCH 4/5] docs: explain error handling for `select_persistable_chunks` --- lifecycle/src/policy.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index cb864d322b..422ef2333d 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -611,6 +611,17 @@ fn sort_free_candidates

(candidates: &mut Vec>) { }) } +/// Select persistable chunks. +/// +/// # Error Handling +/// This can fail if chunks that should be persisted have an active lifecycle action. In that case an `Err(bool)` is +/// returned. +/// +/// If the error boolean is `true`, compaction is currently blocking persistence and you should stall compaction (aka +/// to prevent new compaction jobs from starting) to be able to proceed with persistence. +/// +/// If the error boolean is `false`, there are other active lifecycle actions preventing persistence (e.g. a persistence +/// job that is already running). pub fn select_persistable_chunks<'a, P, D>( chunks: &'a [D], db_name: &DatabaseName<'static>, From ae0acf0024d58a4dbc88e2fb761072b40f13406d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 11 Oct 2021 17:01:01 +0200 Subject: [PATCH 5/5] refactor: remove `db_name` param from `select_persistable_chunks` This was only used for logging but is already part of `ChunkAddr`. --- lifecycle/src/policy.rs | 26 +++++++++++++++----------- server/src/db.rs | 21 +++++++++------------ 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 422ef2333d..0b85c6de97 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -387,7 +387,7 @@ where } }; - let chunks = match select_persistable_chunks(&chunks, db_name, persist_handle.timestamp()) { + let chunks = match select_persistable_chunks(&chunks, persist_handle.timestamp()) { Ok(chunks) => chunks, Err(stall) => { return stall; @@ -622,11 +622,10 @@ fn sort_free_candidates

(candidates: &mut Vec>) { /// /// If the error boolean is `false`, there are other active lifecycle actions preventing persistence (e.g. a persistence /// job that is already running). -pub fn select_persistable_chunks<'a, P, D>( - chunks: &'a [D], - db_name: &DatabaseName<'static>, +pub fn select_persistable_chunks( + chunks: &[D], flush_ts: DateTime, -) -> Result>, bool> +) -> Result>, bool> where D: LockableChunk, P: LifecycleChunk, @@ -636,7 +635,7 @@ where for chunk in chunks { let chunk = chunk.read(); - trace!(%db_name, chunk=%chunk.addr(), "considering chunk for persistence"); + trace!(chunk=%chunk.addr(), "considering chunk for persistence"); // Check if chunk is eligible for persistence match chunk.storage() { @@ -644,8 +643,11 @@ where | ChunkStorage::ClosedMutableBuffer | ChunkStorage::ReadBuffer => {} ChunkStorage::ReadBufferAndObjectStore | ChunkStorage::ObjectStoreOnly => { - debug!(%db_name, chunk=%chunk.addr(), storage=?chunk.storage(), - "chunk not eligible due to storage"); + debug!( + chunk=%chunk.addr(), + storage=?chunk.storage(), + "chunk not eligible due to storage", + ); continue; } } @@ -655,8 +657,10 @@ where // plan if chunk.min_timestamp() > flush_ts { // Ignore chunk for now, but we might need it later to close chunk order gaps - debug!(%db_name, chunk=%chunk.addr(), - "chunk does not contain data eligible for persistence"); + debug!( + chunk=%chunk.addr(), + "chunk does not contain data eligible for persistence", + ); if chunk.lifecycle_action().is_none() { to_persist_gap.push(chunk); } @@ -668,7 +672,7 @@ where // see if we should stall subsequent pull it is // preventing us from persisting let stall = action.metadata() == &ChunkLifecycleAction::Compacting; - info!(%db_name, ?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action"); + info!(?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action"); // NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`. return Err(stall); diff --git a/server/src/db.rs b/server/src/db.rs index 1dc7923308..31225c949e 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -687,8 +687,6 @@ impl Db { partition_key: &str, force: bool, ) -> Result>> { - let db_name = self.rules.read().name.clone(); - // Use explicit scope to ensure the async generator doesn't // assume the locks have to possibly live across the `await` let fut = { @@ -711,16 +709,15 @@ impl Db { partition_key, })?; - let chunks = - match select_persistable_chunks(&chunks, &db_name, flush_handle.timestamp()) { - Ok(chunks) => chunks, - Err(_) => { - return Err(Error::CannotFlushPartition { - table_name: table_name.to_string(), - partition_key: partition_key.to_string(), - }); - } - }; + let chunks = match select_persistable_chunks(&chunks, flush_handle.timestamp()) { + Ok(chunks) => chunks, + Err(_) => { + return Err(Error::CannotFlushPartition { + table_name: table_name.to_string(), + partition_key: partition_key.to_string(), + }); + } + }; let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle) .context(LifecycleError)?;