feat: reintroduce compactor first file in partition exception (#6176)

* feat: compactor ignores max file count for first file

chore: typo in comment in compactor

* feat: restore special first file in partition compaction logic; add limit

* fix: calculation in compaction max file count

chore: clippy

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Luke Bond 2022-11-18 15:58:59 +00:00 committed by GitHub
parent 1a68da02ef
commit 7c813c170a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 366 additions and 25 deletions

View File

@ -167,6 +167,18 @@ macro_rules! gen_compactor_config {
)]
pub max_num_compacting_files: usize,
/// Max number of files to compact for a partition in which the first file and its
/// overlaps push the file count limit over `max_num_compacting_files`.
/// It's a special case of `max_num_compacting_files` that's higher just for the first
/// file in a partition
#[clap(
long = "compaction-max-num-compacting-files-first-in-partition",
env = "INFLUXDB_IOX_COMPACTION_MAX_COMPACTING_FILES_FIRST_IN_PARTITION",
default_value = "40",
action
)]
pub max_num_compacting_files_first_in_partition: usize,
/// Number of minutes without a write to a partition before it is considered cold
/// and thus a candidate for compaction
#[clap(
@ -223,6 +235,7 @@ impl CompactorOnceConfig {
min_num_rows_allocated_per_record_batch_to_datafusion_plan: self
.min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files: self.max_num_compacting_files,
max_num_compacting_files_first_in_partition: self.max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold: self.minutes_without_new_writes_to_be_cold,
hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2,

View File

@ -295,6 +295,7 @@ mod tests {
level_1,
compactor.config.memory_budget_bytes,
compactor.config.max_num_compacting_files,
compactor.config.max_num_compacting_files_first_in_partition,
compactor.config.max_desired_file_size_bytes,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
@ -490,6 +491,7 @@ mod tests {
level_1,
compactor.config.memory_budget_bytes,
compactor.config.max_num_compacting_files,
compactor.config.max_num_compacting_files_first_in_partition,
compactor.config.max_desired_file_size_bytes,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
@ -583,6 +585,7 @@ mod tests {
level_2,
compactor.config.memory_budget_bytes,
compactor.config.max_num_compacting_files,
compactor.config.max_num_compacting_files_first_in_partition,
compactor.config.max_desired_file_size_bytes,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
@ -703,6 +706,7 @@ mod tests {
memory_budget_bytes: 100_000_000,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 1,
max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10,
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,

View File

@ -693,6 +693,7 @@ pub mod tests {
memory_budget_bytes: 10 * 1024 * 1024,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10,
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,

View File

@ -157,6 +157,12 @@ pub struct CompactorConfig {
/// per partition.
pub max_num_compacting_files: usize,
/// Max number of files to compact for a partition in which the first file and its
/// overlaps push the file count limit over `max_num_compacting_files`.
/// It's a special case of `max_num_compacting_files` that's higher just for the first
/// file in a partition
pub max_num_compacting_files_first_in_partition: usize,
/// Minutes without any new data before a partition is considered cold
pub minutes_without_new_writes_to_be_cold: u64,
@ -312,7 +318,15 @@ mod tests {
let mut repos = compactor.catalog.repositories().await;
repos
.partitions()
.record_skipped_compaction(partition.partition.id, "Not today", 3, 2, 100_000, 100)
.record_skipped_compaction(
partition.partition.id,
"Not today",
3,
2,
4,
100_000,
100,
)
.await
.unwrap()
}
@ -348,7 +362,15 @@ mod tests {
let mut repos = compactor.catalog.repositories().await;
repos
.partitions()
.record_skipped_compaction(partition.partition.id, "Not today", 3, 2, 100_000, 100)
.record_skipped_compaction(
partition.partition.id,
"Not today",
3,
2,
4,
100_000,
100,
)
.await
.unwrap();
}

View File

@ -540,6 +540,7 @@ mod tests {
memory_budget_bytes: 10 * 1024 * 1024,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10,
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,

View File

@ -136,6 +136,7 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
level_n_plus_1,
remaining_budget_bytes,
compactor.config.max_num_compacting_files,
compactor.config.max_num_compacting_files_first_in_partition,
compactor.config.max_desired_file_size_bytes,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
@ -169,6 +170,8 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
num_files,
budget_bytes,
file_num_limit = compactor.config.max_num_compacting_files,
file_num_limit_first_file =
compactor.config.max_num_compacting_files_first_in_partition,
memory_budget_bytes = compactor.config.memory_budget_bytes,
"skipped; over limit of number of files"
);
@ -178,6 +181,7 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
"over limit of num_files",
num_files,
compactor.config.max_num_compacting_files,
compactor.config.max_num_compacting_files_first_in_partition,
budget_bytes,
compactor.config.memory_budget_bytes,
)
@ -202,6 +206,8 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
memory_budget_bytes = compactor.config.memory_budget_bytes,
?num_files,
limit_num_files = compactor.config.max_num_compacting_files,
limit_num_files_first_in_partition =
compactor.config.max_num_compacting_files_first_in_partition,
"skipped; over memory budget"
);
record_skipped_compaction(
@ -210,6 +216,7 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
"over memory budget",
num_files,
compactor.config.max_num_compacting_files,
compactor.config.max_num_compacting_files_first_in_partition,
needed_bytes,
compactor.config.memory_budget_bytes,
)
@ -263,12 +270,14 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
}
}
#[allow(clippy::too_many_arguments)]
async fn record_skipped_compaction(
partition_id: PartitionId,
compactor: Arc<Compactor>,
reason: &str,
num_files: usize,
limit_num_files: usize,
limit_num_files_first_in_partition: usize,
estimated_bytes: u64,
limit_bytes: u64,
) {
@ -280,6 +289,7 @@ async fn record_skipped_compaction(
reason,
num_files,
limit_num_files,
limit_num_files_first_in_partition,
estimated_bytes,
limit_bytes,
)
@ -564,6 +574,7 @@ pub mod tests {
memory_budget_bytes: budget,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 2,
max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10,
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
@ -945,6 +956,7 @@ pub mod tests {
memory_budget_bytes: 100_000_000,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10,
hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1,
hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2,
@ -1068,6 +1080,7 @@ pub mod tests {
level_1,
compactor.config.memory_budget_bytes,
compactor.config.max_num_compacting_files,
compactor.config.max_num_compacting_files_first_in_partition,
compactor.config.max_desired_file_size_bytes,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,

View File

@ -56,6 +56,8 @@ pub(crate) fn filter_parquet_files(
// stop considering level N files when the total files selected for compaction so far exceeds
// this value
max_num_files: usize,
// for the first file in a partition and its overlaps we have a separate max_num_files
max_num_files_first_in_partition: usize,
// max desired output file size in bytes
max_desired_file_size_bytes: u64,
// Gauge for the number of Parquet file candidates
@ -68,6 +70,7 @@ pub(crate) fn filter_parquet_files(
level_n_plus_1,
max_bytes,
max_num_files,
max_num_files_first_in_partition,
max_desired_file_size_bytes,
parquet_file_candidate_gauge,
parquet_file_candidate_bytes,
@ -79,6 +82,7 @@ pub(crate) fn filter_parquet_files(
}
}
#[allow(clippy::too_many_arguments)]
fn filter_parquet_files_inner(
// Level N files, sorted
level_n: Vec<CompactorParquetFile>,
@ -90,6 +94,8 @@ fn filter_parquet_files_inner(
// stop considering level N files when the total files selected for compaction so far exceeds
// this value
max_num_files: usize,
// for the first file in a partition and its overlaps we have a separate max_num_files
max_num_files_first_in_partition: usize,
// max desired output file size in bytes
max_desired_file_size_bytes: u64,
// Gauge for the number of Parquet file candidates
@ -187,8 +193,16 @@ fn filter_parquet_files_inner(
// Over limit of num files
// At this stage files_to_return only includes LN+1 files. To get both returning LN+1 and
// LN files, we need to consider both files_to_return and level_n_to_return
// The file limit only applies after the first file, because we want to avoid the situation
// where a file (and its overlapping files) will never get processed. In this scenario,
// however, we do not proceed to pick up any more files other than this first one (and its
// overlaps)
if files_to_return.len() + level_n_to_return.len() + 1 /* LN file */ + overlaps.len()
> max_num_files
&& !(files_to_return.is_empty() && level_n_to_return.is_empty())
|| (files_to_return.is_empty()
&& level_n_to_return.is_empty()
&& 1 /* LN file */ + overlaps.len() > max_num_files_first_in_partition)
{
if files_to_return.is_empty() && level_n_to_return.is_empty() {
// Cannot compact this partition because its first set of overlapped files
@ -222,8 +236,10 @@ fn filter_parquet_files_inner(
// Only compact the level 1 files under the desired file size
break;
} else {
// Still under budget and under limit of number of files.
// We do not add memory budget for output stream yet becasue it will change when we add
// Still under budget and under limit of number of files, OR
// it's the first LN file and it is under max_num_files_first_in_partition.
//
// We do not add memory budget for output stream yet because it will change when we add
// more files
total_estimated_budget += estimated_processing_file_bytes;
ln_estimated_budget.push(ln_estimated_file_bytes);
@ -517,6 +533,7 @@ mod tests {
level_1,
MEMORY_BUDGET,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -540,6 +557,7 @@ mod tests {
level_1,
0,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -570,6 +588,7 @@ mod tests {
level_1,
0,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
// output size smaller than input size to force return 2 output streams
(file_size_bytes - 1) as u64,
&files_metric,
@ -601,6 +620,7 @@ mod tests {
level_1,
0,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
// max desired output size ~1/3 input size to force returning 3 output streams
(file_size_bytes / 3 + 1) as u64,
&files_metric,
@ -631,6 +651,7 @@ mod tests {
level_1,
1000,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -646,7 +667,7 @@ mod tests {
}
#[test]
fn file_num_0_returns_over_file_limit() {
fn file_num_0_over_file_limit_but_proceeds() {
let level_0 = vec![ParquetFileBuilder::level_0().id(1).build()];
let level_1 = vec![];
let (files_metric, bytes_metric) = metrics();
@ -659,28 +680,145 @@ mod tests {
level_0,
level_1,
MEMORY_BUDGET,
0,
0, // note max file count is zero
1, // however we are not over the special limit for the first file in the partition
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
);
// we will process this despite > max file count because it's the first
assert!(
matches!(
&filter_result,
FilterResult::Proceed { files, budget_bytes }
if files.len() == 1
&& files.clone().pop().unwrap().id().get() == 1
&& *budget_bytes == expected_estimated_budget
),
"Match failed, got: {filter_result:?}"
);
}
#[test]
fn file_num_0_over_file_limit_for_first_in_partition() {
let level_0 = vec![
ParquetFileBuilder::level_0()
.id(1)
.min_time(200)
.max_time(300)
.build(),
// second file that shouldn't get picked up for processing
ParquetFileBuilder::level_0().id(2).build(),
];
let level_1 = vec![
// all overlap the L0 file
ParquetFileBuilder::level_1()
.id(102)
.min_time(150)
.max_time(250)
.build(),
ParquetFileBuilder::level_1()
.id(103)
.min_time(250)
.max_time(350)
.build(),
ParquetFileBuilder::level_1()
.id(104)
.min_time(270)
.max_time(280)
.build(),
];
let (files_metric, bytes_metric) = metrics();
let bytes_stored_4_files_in_memory = 4 * (2 * level_0[0].file_size_bytes()) as u64;
let expected_estimated_budget = bytes_stored_4_files_in_memory + 5 * ESTIMATED_STREAM_BYTES;
let filter_result = filter_parquet_files_inner(
level_0,
level_1,
MEMORY_BUDGET,
1, // this isn't a factor in this test
2, // we are over this limit for the first file in partition
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
);
// we will process this despite > max file count because it's the first
assert_eq!(
filter_result,
FilterResult::OverLimitFileNum {
num_files: 1,
budget_bytes: expected_estimated_budget // one input stream, one output stream
num_files: 4,
budget_bytes: expected_estimated_budget,
}
);
}
#[test]
fn file_num_limit_1_return_over_file_limit() {
let level_0 = vec![ParquetFileBuilder::level_0()
.id(1)
.min_time(200)
.max_time(300)
.build()];
fn file_num_0_over_file_limit_but_proceeds_but_no_more() {
let level_0 = vec![
ParquetFileBuilder::level_0()
.id(1)
.min_time(200)
.max_time(300)
.build(),
// second file that shouldn't get picked up for processing
ParquetFileBuilder::level_0().id(2).build(),
];
let level_1 = vec![
// overlaps with L0 id 1
ParquetFileBuilder::level_1()
.id(102)
.min_time(250)
.max_time(350)
.build(),
];
let (files_metric, bytes_metric) = metrics();
let bytes_stored_2_files_in_memory = 2 * (2 * level_0[0].file_size_bytes()) as u64;
let expected_estimated_budget = bytes_stored_2_files_in_memory + 3 * ESTIMATED_STREAM_BYTES; // two input streams, one output stream
let filter_result = filter_parquet_files_inner(
level_0,
level_1,
MEMORY_BUDGET,
0, // note max file count is zero
2, // ...but given it's the first file in the partition we use this limit
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
);
// L0 file and its overlapping files get picked despite > max file count because it's the
// first file
assert!(
matches!(
&filter_result,
FilterResult::Proceed { files, budget_bytes }
if files.len() == 2
// L0 file + its overlapping L1 file but not the next L0 file even though we
// have the memory for it
&& files.iter().map(|f| f.id().get()).collect::<Vec<_>>() == [102, 1]
&& *budget_bytes == expected_estimated_budget
),
"Match failed, got: {filter_result:?}"
);
}
#[test]
fn file_num_limit_1_over_file_limit_but_proceeds_but_no_more() {
// in this test it's the L1 file that pushes us over max file count, but it still proceeds
// because it overlaps the first L0 file
let level_0 = vec![
ParquetFileBuilder::level_0()
.id(1)
.min_time(200)
.max_time(300)
.build(),
// second file that shouldn't get picked up for processing
ParquetFileBuilder::level_0().id(2).build(),
];
let level_1 = vec![
// Completely contains the level 0 times
ParquetFileBuilder::level_1()
@ -699,6 +837,114 @@ mod tests {
level_1,
MEMORY_BUDGET,
1,
2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
);
assert!(
matches!(
&filter_result,
FilterResult::Proceed { files, budget_bytes }
if files.len() == 2
&& files.iter().map(|f| f.id().get()).collect::<Vec<_>>() == [102, 1]
&& *budget_bytes == expected_estimated_budget
),
"Match failed, got: {filter_result:?}"
);
}
#[test]
fn file_num_limit_second_l1_over_file_limit_but_proceeds_but_no_more() {
// in this test it's a second L1 file that pushes us over max file count, but it still
// proceeds because it overlaps the first L0 file
let level_0 = vec![
ParquetFileBuilder::level_0()
.id(1)
.min_time(200)
.max_time(300)
.build(),
// second file that shouldn't get picked up for processing
ParquetFileBuilder::level_0().id(2).build(),
];
let level_1 = vec![
// both overlap the L0 file
ParquetFileBuilder::level_1()
.id(102)
.min_time(150)
.max_time(250)
.build(),
ParquetFileBuilder::level_1()
.id(103)
.min_time(250)
.max_time(350)
.build(),
];
let (files_metric, bytes_metric) = metrics();
let bytes_stored_3_files_in_memory = 3 * (2 * level_0[0].file_size_bytes()) as u64;
let expected_estimated_budget = bytes_stored_3_files_in_memory + 4 * ESTIMATED_STREAM_BYTES; // three input streams, one output stream
let filter_result = filter_parquet_files_inner(
level_0,
level_1,
MEMORY_BUDGET,
2,
3,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
);
assert!(
matches!(
&filter_result,
FilterResult::Proceed { files, budget_bytes }
if files.len() == 3
&& files.iter().map(|f| f.id().get()).collect::<Vec<_>>() == [102, 103, 1]
&& *budget_bytes == expected_estimated_budget
),
"Match failed, got: {filter_result:?}"
);
}
#[test]
fn file_num_limit_first_over_file_limit_but_insufficient_budget() {
// in this test the first file and its overlaps push us over the file limit, which usually
// means they're included anyway but in this case we don't have the memory budget
let level_0 = vec![ParquetFileBuilder::level_0()
.id(1)
.min_time(200)
.max_time(300)
.build()];
let level_1 = vec![
// both overlap the L0 file
ParquetFileBuilder::level_1()
.id(102)
.min_time(150)
.max_time(250)
.build(),
ParquetFileBuilder::level_1()
.id(103)
.min_time(250)
.max_time(350)
.build(),
];
let (files_metric, bytes_metric) = metrics();
let bytes_stored_3_files_in_memory = 3 * (2 * level_0[0].file_size_bytes()) as u64;
let expected_estimated_budget = bytes_stored_3_files_in_memory + 4 * ESTIMATED_STREAM_BYTES; // three input streams, one output stream
let filter_result = filter_parquet_files_inner(
level_0,
level_1,
// enough for 2 files and one output stream
(ESTIMATED_STREAM_BYTES * 3) + (FILE_SIZE_BYTES as u64 * 2 * 2) + 1,
// max file count not a factor here
1,
// it's under the limit
3,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -706,9 +952,9 @@ mod tests {
assert_eq!(
filter_result,
FilterResult::OverLimitFileNum {
num_files: 2,
budget_bytes: expected_estimated_budget
FilterResult::OverBudget {
budget_bytes: expected_estimated_budget,
num_files: 3
}
);
}
@ -766,6 +1012,7 @@ mod tests {
level_1.clone(),
MEMORY_BUDGET,
2,
4,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -789,6 +1036,7 @@ mod tests {
level_1,
MEMORY_BUDGET,
2,
4,
// max output size is just over 1/3 of (10 + 10) which is size of first L0 (id=1) and second L1 (id=102)
8,
&files_metric,
@ -859,6 +1107,7 @@ mod tests {
level_1,
MEMORY_BUDGET,
10,
20,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -932,6 +1181,7 @@ mod tests {
level_1,
MEMORY_BUDGET,
3,
6,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -987,6 +1237,7 @@ mod tests {
level_1,
MEMORY_BUDGET,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -1093,6 +1344,7 @@ mod tests {
// enough for 3 files and one output stream
(ESTIMATED_STREAM_BYTES * 4) + (FILE_SIZE_BYTES as u64 * 3 * 2) + 1,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -1126,7 +1378,7 @@ mod tests {
}
);
// Test 2: increase udget to fit all files
// Test 2: increase budget to fit all files
let (files_metric, bytes_metric) = metrics();
let bytes_stored_6_files_in_memory = 6 * (2 * level_0[0].file_size_bytes()) as u64;
@ -1139,6 +1391,7 @@ mod tests {
// level 1 files get returned. Enough for 6 files/input streams and one output stream.
(ESTIMATED_STREAM_BYTES * 7) + (FILE_SIZE_BYTES as u64 * 6 * 2) + 1,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -1261,6 +1514,7 @@ mod tests {
// enough for 3 files and one output stream
(ESTIMATED_STREAM_BYTES * 4) + (FILE_SIZE_BYTES as u64 * 3 * 2) + 1,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -1306,6 +1560,7 @@ mod tests {
// level 2 files get returned. Enough for 3 files and one output stream.
(ESTIMATED_STREAM_BYTES * 7) + (FILE_SIZE_BYTES as u64 * 6 * 2) + 1,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
DEFAULT_MAX_OUTPUT_SIZE,
&files_metric,
&bytes_metric,
@ -1364,6 +1619,7 @@ mod tests {
// enough to not limit based on this criteria
(ESTIMATED_STREAM_BYTES * 4) + (FILE_SIZE_BYTES as u64 * 3 * 2) + 1,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
// Limit to only 1 of the 2 files based on file size
FILE_SIZE_BYTES as u64 + 1,
&files_metric,
@ -1408,6 +1664,7 @@ mod tests {
// enough to not limit based on this criteria
(ESTIMATED_STREAM_BYTES * 4) + (FILE_SIZE_BYTES as u64 * 3 * 2) + 1,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
// Limit such that the first file is already over the limit
FILE_SIZE_BYTES as u64 - 1,
&files_metric,
@ -1458,6 +1715,7 @@ mod tests {
// Don't limit based on this criteria
u64::MAX,
FILE_NUM_LIMIT,
FILE_NUM_LIMIT * 2,
// Limit to only 2 of the 3 files based on file size
2 * FILE_SIZE_BYTES as u64 + 1,
&files_metric,

View File

@ -947,6 +947,8 @@ pub struct SkippedCompaction {
pub num_files: i64,
/// limit on num files
pub limit_num_files: i64,
/// limit on num files for the first file in a partition
pub limit_num_files_first_in_partition: i64,
}
/// Data object for a tombstone.

View File

@ -35,6 +35,10 @@ message SkippedCompaction {
// compaction was skipped.
int64 limit_num_files = 5;
// The compactor's limit on the number of files at the beginning of a partition at the time
// this copmaction was skipped
optional int64 limit_num_files_first_in_partition = 8;
// The number of bytes of memory estimated to be needed to compact this partition at the time
// this compaction was skipped.
int64 estimated_bytes = 6;

View File

@ -11,6 +11,7 @@ impl From<SkippedCompaction> for proto::SkippedCompaction {
limit_bytes,
num_files,
limit_num_files,
limit_num_files_first_in_partition,
} = skipped_compaction;
Self {
@ -21,6 +22,7 @@ impl From<SkippedCompaction> for proto::SkippedCompaction {
limit_bytes,
num_files,
limit_num_files,
limit_num_files_first_in_partition: Some(limit_num_files_first_in_partition),
}
}
}

View File

@ -426,6 +426,7 @@ impl Config {
memory_budget_bytes: 300_000,
min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100,
max_num_compacting_files: 20,
max_num_compacting_files_first_in_partition: 40,
minutes_without_new_writes_to_be_cold: 10,
hot_compaction_hours_threshold_1: 4,
hot_compaction_hours_threshold_2: 24,

View File

@ -0,0 +1,2 @@
ALTER TABLE IF EXISTS skipped_compactions
ADD COLUMN IF NOT EXISTS limit_num_files_first_in_partition BIGINT DEFAULT NULL;

View File

@ -453,12 +453,14 @@ pub trait PartitionRepo: Send + Sync {
/// Record an instance of a partition being selected for compaction but compaction was not
/// completed for the specified reason.
#[allow(clippy::too_many_arguments)]
async fn record_skipped_compaction(
&mut self,
partition_id: PartitionId,
reason: &str,
num_files: usize,
limit_num_files: usize,
limit_num_files_first_in_partition: usize,
estimated_bytes: u64,
limit_bytes: u64,
) -> Result<()>;
@ -1602,7 +1604,7 @@ pub(crate) mod test_helpers {
);
repos
.partitions()
.record_skipped_compaction(other_partition.id, "I am le tired", 1, 2, 10, 20)
.record_skipped_compaction(other_partition.id, "I am le tired", 1, 2, 4, 10, 20)
.await
.unwrap();
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
@ -1619,7 +1621,7 @@ pub(crate) mod test_helpers {
// again, but race conditions and all that)
repos
.partitions()
.record_skipped_compaction(other_partition.id, "I'm on fire", 11, 12, 110, 120)
.record_skipped_compaction(other_partition.id, "I'm on fire", 11, 12, 24, 110, 120)
.await
.unwrap();
let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap();
@ -3203,7 +3205,15 @@ pub(crate) mod test_helpers {
// The compactor skipped compacting partition_2
repos
.partitions()
.record_skipped_compaction(partition_2.id, "Not feeling up to it today", 1, 2, 10, 20)
.record_skipped_compaction(
partition_2.id,
"Not feeling up to it today",
1,
2,
4,
10,
20,
)
.await
.unwrap();
@ -3676,7 +3686,7 @@ pub(crate) mod test_helpers {
// The compactor skipped compacting another_partition
repos
.partitions()
.record_skipped_compaction(another_partition.id, "Secret reasons", 1, 2, 10, 20)
.record_skipped_compaction(another_partition.id, "Secret reasons", 1, 2, 4, 10, 20)
.await
.unwrap();

View File

@ -839,6 +839,7 @@ impl PartitionRepo for MemTxn {
reason: &str,
num_files: usize,
limit_num_files: usize,
limit_num_files_first_in_partition: usize,
estimated_bytes: u64,
limit_bytes: u64,
) -> Result<()> {
@ -856,6 +857,7 @@ impl PartitionRepo for MemTxn {
s.skipped_at = skipped_at;
s.num_files = num_files as i64;
s.limit_num_files = limit_num_files as i64;
s.limit_num_files_first_in_partition = limit_num_files_first_in_partition as i64;
s.estimated_bytes = estimated_bytes as i64;
s.limit_bytes = limit_bytes as i64;
}
@ -865,6 +867,7 @@ impl PartitionRepo for MemTxn {
skipped_at,
num_files: num_files as i64,
limit_num_files: limit_num_files as i64,
limit_num_files_first_in_partition: limit_num_files_first_in_partition as i64,
estimated_bytes: estimated_bytes as i64,
limit_bytes: limit_bytes as i64,
}),

View File

@ -246,7 +246,7 @@ decorate!(
"partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>;
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
"partition_update_sort_key" = update_sort_key(&mut self, partition_id: PartitionId, sort_key: &[&str]) -> Result<Partition>;
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize,estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
"partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>;

View File

@ -1273,21 +1273,23 @@ RETURNING *;
reason: &str,
num_files: usize,
limit_num_files: usize,
limit_num_files_first_in_partition: usize,
estimated_bytes: u64,
limit_bytes: u64,
) -> Result<()> {
sqlx::query(
r#"
INSERT INTO skipped_compactions
( partition_id, reason, num_files, limit_num_files, estimated_bytes, limit_bytes, skipped_at )
( partition_id, reason, num_files, limit_num_files, limit_num_files_first_in_partition, estimated_bytes, limit_bytes, skipped_at )
VALUES
( $1, $2, $3, $4, $5, $6, extract(epoch from NOW()) )
( $1, $2, $3, $4, $5, $6, $7, extract(epoch from NOW()) )
ON CONFLICT ( partition_id )
DO UPDATE
SET
reason = EXCLUDED.reason,
num_files = EXCLUDED.num_files,
limit_num_files = EXCLUDED.limit_num_files,
limit_num_files_first_in_partition = EXCLUDED.limit_num_files_first_in_partition,
estimated_bytes = EXCLUDED.estimated_bytes,
limit_bytes = EXCLUDED.limit_bytes,
skipped_at = EXCLUDED.skipped_at;
@ -1297,6 +1299,7 @@ skipped_at = EXCLUDED.skipped_at;
.bind(reason)
.bind(num_files as i64)
.bind(limit_num_files as i64)
.bind(limit_num_files_first_in_partition as i64)
.bind(estimated_bytes as i64)
.bind(limit_bytes as i64)
.execute(&mut self.inner)

View File

@ -210,6 +210,7 @@ pub async fn build_compactor_from_config(
memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files,
max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold,
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,
@ -226,6 +227,7 @@ pub async fn build_compactor_from_config(
memory_budget_bytes,
min_num_rows_allocated_per_record_batch_to_datafusion_plan,
max_num_compacting_files,
max_num_compacting_files_first_in_partition,
minutes_without_new_writes_to_be_cold,
hot_compaction_hours_threshold_1,
hot_compaction_hours_threshold_2,