fix: `persist_row_threshold` limits the out chunk row count

`persist_row_threshold` should limit the rows of the post-compaction
output chunk (and hence the sum of rows over the input chunks), not
the number of rows of each individual input chunk.

Fixes #1874.
pull/24376/head
Marco Neumann 2021-07-06 15:17:56 +02:00
parent 76d2c317a5
commit 677314f52f
1 changed files with 19 additions and 9 deletions

View File

@ -187,7 +187,7 @@ where
rules: &LifecycleRules,
now: DateTime<Utc>,
) {
let row_threshold = rules.persist_row_threshold.get();
let mut rows_left = rules.persist_row_threshold.get();
// TODO: Encapsulate locking into a CatalogTransaction type
let partition = partition.read();
@ -217,9 +217,11 @@ where
to_compact.push(chunk);
}
ChunkStorage::ReadBuffer => {
if chunk.row_count() >= row_threshold {
let row_count = chunk.row_count();
if row_count >= rows_left {
continue;
}
rows_left -= row_count;
to_compact.push(chunk);
}
_ => {}
@ -1250,12 +1252,16 @@ mod tests {
]),
TestPartition::new(vec![
// closed => can compact
TestChunk::new(14, Some(0), Some(20), ChunkStorage::ReadBuffer),
// closed => can compact
TestChunk::new(15, Some(0), Some(20), ChunkStorage::ReadBuffer),
// too many rows => ignore
TestChunk::new(16, Some(0), Some(20), ChunkStorage::ReadBuffer)
TestChunk::new(14, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
// too many individual rows => ignore
TestChunk::new(15, Some(0), Some(20), ChunkStorage::ReadBuffer)
.with_row_count(1_000),
// closed => can compact
TestChunk::new(16, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job
TestChunk::new(17, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
// too many total rows => next compaction job
TestChunk::new(18, Some(0), Some(20), ChunkStorage::ReadBuffer).with_row_count(400),
]),
];
@ -1270,9 +1276,13 @@ mod tests {
MoverEvents::Compact(vec![3, 4, 5]),
MoverEvents::Compact(vec![8, 9]),
MoverEvents::Compact(vec![12]),
MoverEvents::Compact(vec![14, 15]),
]
MoverEvents::Compact(vec![14, 16]),
],
);
db.events.write().clear();
lifecycle.check_for_work(now, Instant::now());
assert_eq!(*db.events.read(), vec![MoverEvents::Compact(vec![17, 18])]);
}
#[test]