Merge pull request #2774 from influxdata/crepererum/issue2729

fix: do not break chunk ordering during persistence
pull/24376/head
kodiakhq[bot] 2021-10-11 15:19:48 +00:00 committed by GitHub
commit ea6ba1eedc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 131 additions and 69 deletions

View File

@ -1,6 +1,6 @@
use crate::{
LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition,
PersistHandle,
LifecycleChunk, LifecycleDb, LifecyclePartition, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle,
};
use chrono::{DateTime, Utc};
use data_types::{
@ -387,51 +387,12 @@ where
}
};
let mut to_persist = Vec::new();
for chunk in &chunks {
let chunk = chunk.read();
trace!(%db_name, %partition, chunk=%chunk.addr(), "considering chunk for persistence");
// Check if chunk is eligible for persistence
match chunk.storage() {
ChunkStorage::OpenMutableBuffer
| ChunkStorage::ClosedMutableBuffer
| ChunkStorage::ReadBuffer => {}
ChunkStorage::ReadBufferAndObjectStore | ChunkStorage::ObjectStoreOnly => {
debug!(%db_name, %partition, chunk=%chunk.addr(), storage=?chunk.storage(),
"chunk not eligible due to storage");
continue;
}
}
// Chunk's data is entirely after the time we are flushing
// up to, and thus there is reason to include it in the
// plan
if chunk.min_timestamp() > persist_handle.timestamp() {
// Can safely ignore chunk
debug!(%db_name, %partition, chunk=%chunk.addr(),
"chunk does not contain data eligible for persistence");
continue;
}
// If the chunk has an outstanding lifecycle action
if let Some(action) = chunk.lifecycle_action() {
// see if we should stall subsequent pull it is
// preventing us from persisting
let stall = action.metadata() == &ChunkLifecycleAction::Compacting;
info!(%db_name, ?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action");
// NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`.
let chunks = match select_persistable_chunks(&chunks, persist_handle.timestamp()) {
Ok(chunks) => chunks,
Err(stall) => {
return stall;
}
to_persist.push(chunk);
}
let chunks = to_persist
.into_iter()
.map(|chunk| chunk.upgrade())
.collect();
};
let tracker = LockablePartition::persist_chunks(partition, chunks, persist_handle)
.expect("failed to persist chunks")
@ -650,6 +611,88 @@ fn sort_free_candidates<P>(candidates: &mut Vec<FreeCandidate<'_, P>>) {
})
}
/// Select persistable chunks.
///
/// # Error Handling
/// This can fail if chunks that should be persisted have an active lifecycle action. In that case an `Err(bool)` is
/// returned.
///
/// If the error boolean is `true`, compaction is currently blocking persistence and you should stall compaction (aka
/// to prevent new compaction jobs from starting) to be able to proceed with persistence.
///
/// If the error boolean is `false`, there are other active lifecycle actions preventing persistence (e.g. a persistence
/// job that is already running).
pub fn select_persistable_chunks<P, D>(
chunks: &[D],
flush_ts: DateTime<Utc>,
) -> Result<Vec<LifecycleWriteGuard<'_, P, D>>, bool>
where
D: LockableChunk<Chunk = P>,
P: LifecycleChunk,
{
let mut to_persist = Vec::with_capacity(chunks.len());
let mut to_persist_gap = Vec::with_capacity(chunks.len());
for chunk in chunks {
let chunk = chunk.read();
trace!(chunk=%chunk.addr(), "considering chunk for persistence");
// Check if chunk is eligible for persistence
match chunk.storage() {
ChunkStorage::OpenMutableBuffer
| ChunkStorage::ClosedMutableBuffer
| ChunkStorage::ReadBuffer => {}
ChunkStorage::ReadBufferAndObjectStore | ChunkStorage::ObjectStoreOnly => {
debug!(
chunk=%chunk.addr(),
storage=?chunk.storage(),
"chunk not eligible due to storage",
);
continue;
}
}
// Chunk's data is entirely after the time we are flushing
// up to, and thus there is reason to include it in the
// plan
if chunk.min_timestamp() > flush_ts {
// Ignore chunk for now, but we might need it later to close chunk order gaps
debug!(
chunk=%chunk.addr(),
"chunk does not contain data eligible for persistence",
);
if chunk.lifecycle_action().is_none() {
to_persist_gap.push(chunk);
}
continue;
}
// If the chunk has an outstanding lifecycle action
if let Some(action) = chunk.lifecycle_action() {
// see if we should stall subsequent pull it is
// preventing us from persisting
let stall = action.metadata() == &ChunkLifecycleAction::Compacting;
info!(?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action");
// NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`.
return Err(stall);
}
// persist this chunk and the gap
to_persist.append(&mut to_persist_gap);
to_persist.push(chunk);
}
// At this point `to_persist_gap` might be non-empty. This is fine since these are only chunks at the end of the
// order-based list, so it's not really a gap.
let chunks = to_persist
.into_iter()
.map(|chunk| chunk.upgrade())
.collect();
Ok(chunks)
}
#[cfg(test)]
mod tests {
use super::*;
@ -1712,13 +1755,35 @@ mod tests {
.with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(ChunkId::new_test(14), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)),
.with_min_timestamp(from_secs(21))
.with_order(ChunkOrder::new(10).unwrap()),
TestChunk::new(ChunkId::new_test(15), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(ChunkId::new_test(16), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
// Checks that we include chunks in a closed "order"-based interval.
// Note that the chunks here are ordered in reverse to check if the lifecycle policy really uses the chunk
// order during iteration.
TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(24), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(25))
.with_order(ChunkOrder::new(5).unwrap()),
TestChunk::new(ChunkId::new_test(25), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5))
.with_order(ChunkOrder::new(4).unwrap()),
TestChunk::new(ChunkId::new_test(26), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(25))
.with_order(ChunkOrder::new(3).unwrap()),
TestChunk::new(ChunkId::new_test(27), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5))
.with_order(ChunkOrder::new(2).unwrap()),
TestChunk::new(ChunkId::new_test(28), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(25))
.with_order(ChunkOrder::new(1).unwrap()),
])
.with_persistence(1_000, now, from_secs(20)),
];
let db = TestDb::from_partitions(rules, partitions);
@ -1735,7 +1800,16 @@ mod tests {
MoverEvents::Persist(vec![ChunkId::new_test(15), ChunkId::new_test(16)]),
// 17 is the resulting chunk from the persist split above
// This is "quirk" of TestPartition operations being instantaneous
MoverEvents::Compact(vec![ChunkId::new_test(14), ChunkId::new_test(17)])
MoverEvents::Compact(vec![ChunkId::new_test(17), ChunkId::new_test(14)]),
MoverEvents::Persist(vec![
ChunkId::new_test(28),
ChunkId::new_test(27),
ChunkId::new_test(26),
ChunkId::new_test(25)
]),
// 29 is the resulting chunk from the persist split above
// This is "quirk" of TestPartition operations being instantaneous
MoverEvents::Compact(vec![ChunkId::new_test(29), ChunkId::new_test(24)]),
]
);
}

View File

@ -12,6 +12,7 @@ use std::{
time::Duration,
};
use ::lifecycle::select_persistable_chunks;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use parking_lot::{Mutex, RwLock};
@ -708,28 +709,15 @@ impl Db {
partition_key,
})?;
// get chunks for persistence, break after first chunk that cannot be persisted due to lifecycle reasons
let chunks = chunks
.iter()
.filter_map(|chunk| {
let chunk = chunk.read();
if matches!(chunk.stage(), ChunkStage::Persisted { .. })
|| (chunk.min_timestamp() > flush_handle.timestamp())
{
None
} else {
Some(chunk)
}
})
.map(|chunk| match chunk.lifecycle_action() {
Some(_) => CannotFlushPartition {
table_name,
partition_key,
}
.fail(),
None => Ok(chunk.upgrade()),
})
.collect::<Result<Vec<_>, _>>()?;
let chunks = match select_persistable_chunks(&chunks, flush_handle.timestamp()) {
Ok(chunks) => chunks,
Err(_) => {
return Err(Error::CannotFlushPartition {
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
});
}
};
let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle)
.context(LifecycleError)?;