fix: do not break chunk ordering during persistence

Fixes #2729.
pull/24376/head
Marco Neumann 2021-10-08 15:17:15 +02:00
parent 7bd32a4916
commit 8185feddb9
2 changed files with 118 additions and 68 deletions

View File

@ -1,6 +1,6 @@
use crate::{
LifecycleChunk, LifecycleDb, LifecyclePartition, LockableChunk, LockablePartition,
PersistHandle,
LifecycleChunk, LifecycleDb, LifecyclePartition, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle,
};
use chrono::{DateTime, Utc};
use data_types::{
@ -387,51 +387,12 @@ where
}
};
let mut to_persist = Vec::new();
for chunk in &chunks {
let chunk = chunk.read();
trace!(%db_name, %partition, chunk=%chunk.addr(), "considering chunk for persistence");
// Check if chunk is eligible for persistence
match chunk.storage() {
ChunkStorage::OpenMutableBuffer
| ChunkStorage::ClosedMutableBuffer
| ChunkStorage::ReadBuffer => {}
ChunkStorage::ReadBufferAndObjectStore | ChunkStorage::ObjectStoreOnly => {
debug!(%db_name, %partition, chunk=%chunk.addr(), storage=?chunk.storage(),
"chunk not eligible due to storage");
continue;
}
}
// Chunk's data is entirely after the time we are flushing
// up to, and thus there is reason to include it in the
// plan
if chunk.min_timestamp() > persist_handle.timestamp() {
// Can safely ignore chunk
debug!(%db_name, %partition, chunk=%chunk.addr(),
"chunk does not contain data eligible for persistence");
continue;
}
// If the chunk has an outstanding lifecycle action
if let Some(action) = chunk.lifecycle_action() {
// see if we should stall subsequent pull it is
// preventing us from persisting
let stall = action.metadata() == &ChunkLifecycleAction::Compacting;
info!(%db_name, ?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action");
// NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`.
let chunks = match select_persistable_chunks(&chunks, db_name, persist_handle.timestamp()) {
Ok(chunks) => chunks,
Err(stall) => {
return stall;
}
to_persist.push(chunk);
}
let chunks = to_persist
.into_iter()
.map(|chunk| chunk.upgrade())
.collect();
};
let tracker = LockablePartition::persist_chunks(partition, chunks, persist_handle)
.expect("failed to persist chunks")
@ -650,6 +611,73 @@ fn sort_free_candidates<P>(candidates: &mut Vec<FreeCandidate<'_, P>>) {
})
}
pub fn select_persistable_chunks<'a, P, D>(
chunks: &'a [D],
db_name: &DatabaseName<'static>,
flush_ts: DateTime<Utc>,
) -> Result<Vec<LifecycleWriteGuard<'a, P, D>>, bool>
where
D: LockableChunk<Chunk = P>,
P: LifecycleChunk,
{
let mut to_persist = Vec::new();
let mut to_persist_gap = Vec::new();
for chunk in chunks {
let chunk = chunk.read();
trace!(%db_name, chunk=%chunk.addr(), "considering chunk for persistence");
// Check if chunk is eligible for persistence
match chunk.storage() {
ChunkStorage::OpenMutableBuffer
| ChunkStorage::ClosedMutableBuffer
| ChunkStorage::ReadBuffer => {}
ChunkStorage::ReadBufferAndObjectStore | ChunkStorage::ObjectStoreOnly => {
debug!(%db_name, chunk=%chunk.addr(), storage=?chunk.storage(),
"chunk not eligible due to storage");
continue;
}
}
// Chunk's data is entirely after the time we are flushing
// up to, and thus there is reason to include it in the
// plan
if chunk.min_timestamp() > flush_ts {
// Ignore chunk for now, but we might need it later to closer chunk order gaps
debug!(%db_name, chunk=%chunk.addr(),
"chunk does not contain data eligible for persistence");
if chunk.lifecycle_action().is_none() {
to_persist_gap.push(chunk);
}
continue;
}
// If the chunk has an outstanding lifecycle action
if let Some(action) = chunk.lifecycle_action() {
// see if we should stall subsequent pull it is
// preventing us from persisting
let stall = action.metadata() == &ChunkLifecycleAction::Compacting;
info!(%db_name, ?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action");
// NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`.
return Err(stall);
}
// persist this chunk and the gap
to_persist.append(&mut to_persist_gap);
to_persist.push(chunk);
}
// At this point `to_persist_gap` might be non-empty. This is fine since these are only chunks at the end of the
// order-based list, so it's not really a gap.
let chunks = to_persist
.into_iter()
.map(|chunk| chunk.upgrade())
.collect();
Ok(chunks)
}
#[cfg(test)]
mod tests {
use super::*;
@ -1712,13 +1740,35 @@ mod tests {
.with_min_timestamp(from_secs(21))
.with_action(ChunkLifecycleAction::Compacting),
TestChunk::new(ChunkId::new_test(14), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(21)),
.with_min_timestamp(from_secs(21))
.with_order(ChunkOrder::new(10).unwrap()),
TestChunk::new(ChunkId::new_test(15), 0, ChunkStorage::ClosedMutableBuffer)
.with_min_timestamp(from_secs(10)),
TestChunk::new(ChunkId::new_test(16), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5)),
])
.with_persistence(1_000, now, from_secs(20)),
// Checks that we include chunks in a closed "order"-based interval.
// Note that the chunks here are ordered in reverse to check if the lifecycle policy really uses the chunk
// order during iteration.
TestPartition::new(vec![
TestChunk::new(ChunkId::new_test(24), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(25))
.with_order(ChunkOrder::new(5).unwrap()),
TestChunk::new(ChunkId::new_test(25), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5))
.with_order(ChunkOrder::new(4).unwrap()),
TestChunk::new(ChunkId::new_test(26), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(25))
.with_order(ChunkOrder::new(3).unwrap()),
TestChunk::new(ChunkId::new_test(27), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(5))
.with_order(ChunkOrder::new(2).unwrap()),
TestChunk::new(ChunkId::new_test(28), 0, ChunkStorage::ReadBuffer)
.with_min_timestamp(from_secs(25))
.with_order(ChunkOrder::new(1).unwrap()),
])
.with_persistence(1_000, now, from_secs(20)),
];
let db = TestDb::from_partitions(rules, partitions);
@ -1735,7 +1785,16 @@ mod tests {
MoverEvents::Persist(vec![ChunkId::new_test(15), ChunkId::new_test(16)]),
// 17 is the resulting chunk from the persist split above
// This is "quirk" of TestPartition operations being instantaneous
MoverEvents::Compact(vec![ChunkId::new_test(14), ChunkId::new_test(17)])
MoverEvents::Compact(vec![ChunkId::new_test(17), ChunkId::new_test(14)]),
MoverEvents::Persist(vec![
ChunkId::new_test(28),
ChunkId::new_test(27),
ChunkId::new_test(26),
ChunkId::new_test(25)
]),
// 29 is the resulting chunk from the persist split above
// This is "quirk" of TestPartition operations being instantaneous
MoverEvents::Compact(vec![ChunkId::new_test(29), ChunkId::new_test(24)]),
]
);
}

View File

@ -12,6 +12,7 @@ use std::{
time::Duration,
};
use ::lifecycle::select_persistable_chunks;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use parking_lot::{Mutex, RwLock};
@ -686,6 +687,8 @@ impl Db {
partition_key: &str,
force: bool,
) -> Result<Option<Arc<DbChunk>>> {
let db_name = self.rules.read().name.clone();
// Use explicit scope to ensure the async generator doesn't
// assume the locks have to possibly live across the `await`
let fut = {
@ -708,28 +711,16 @@ impl Db {
partition_key,
})?;
// get chunks for persistence, break after first chunk that cannot be persisted due to lifecycle reasons
let chunks = chunks
.iter()
.filter_map(|chunk| {
let chunk = chunk.read();
if matches!(chunk.stage(), ChunkStage::Persisted { .. })
|| (chunk.min_timestamp() > flush_handle.timestamp())
{
None
} else {
Some(chunk)
let chunks =
match select_persistable_chunks(&chunks, &db_name, flush_handle.timestamp()) {
Ok(chunks) => chunks,
Err(_) => {
return Err(Error::CannotFlushPartition {
table_name: table_name.to_string(),
partition_key: partition_key.to_string(),
});
}
})
.map(|chunk| match chunk.lifecycle_action() {
Some(_) => CannotFlushPartition {
table_name,
partition_key,
}
.fail(),
None => Ok(chunk.upgrade()),
})
.collect::<Result<Vec<_>, _>>()?;
};
let (_, fut) = lifecycle::persist_chunks(partition, chunks, flush_handle)
.context(LifecycleError)?;