feat: teach compactor to compact smaller number of files (#4671)

* refactor: split compact_partition into two functions to handle concurrency better

* feat: limit number of files to compact

* test: add test for limit num files

* chore: fix cipply

* feat: split group if over max size

* fix: split the overlapped group to limit size or file num

* chore: reduce config values

* test: add tests and clearer comments for the split_overlapped_groups and test_limit_size_and_num_files

* chore: more comments

* chore: cleanup
pull/24376/head
Nga Tran 2022-05-25 15:54:34 -04:00 committed by GitHub
parent 31d1b37d73
commit 6cc767efcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 507 additions and 142 deletions

View File

@ -36,22 +36,31 @@ pub struct CompactorConfig {
/// The compactor will limit the number of simultaneous compaction jobs based on the
/// size of the input files to be compacted. This number should be less than 1/10th
/// of the available memory to ensure compactions have
/// enough space to run. Default is 100,000,000 (100MB).
/// enough space to run. Default is 10,000,000 (10MB).
#[clap(
long = "--compaction-concurrent-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES",
default_value = "100000000"
default_value = "10000000"
)]
pub max_concurrent_compaction_size_bytes: i64,
/// The compactor will compact overlapped files no matter how much large they are.
/// For non-overlapped and contiguous files, compactor will also compact them into
/// a larger file of max size defined by the config value.
/// Default is 100,000,000 (100MB).
/// Default is 5,000,000 (5MB)
#[clap(
long = "--compaction-max-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_MAX_SIZE_BYTES",
default_value = "100000000"
default_value = "5000000"
)]
pub compaction_max_size_bytes: i64,
/// Limit the number of files per compaction
/// Default is 10
#[clap(
long = "--compaction-max-file-count",
env = "INFLUXDB_IOX_COMPACTION_MAX_FILE_COUNT",
default_value = "10"
)]
pub compaction_max_file_count: i64,
}

View File

@ -124,6 +124,36 @@ pub enum Error {
/// A specialized `Error` for Compactor Data errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Data of parquet files to compact and upgrade
pub struct CompactAndUpgrade {
// sequencer ID of all files in this struct
sequencer_id: Option<SequencerId>,
// Each group will be compacted into one file
groups_to_compact: Vec<GroupWithTombstones>,
// level-0 files to be upgraded to level 1
files_to_upgrade: Vec<ParquetFileId>,
}
impl CompactAndUpgrade {
fn new(sequencer_id: Option<SequencerId>) -> Self {
Self {
sequencer_id,
groups_to_compact: vec![],
files_to_upgrade: vec![],
}
}
/// Return true if there are files to compact and/or upgrade
pub fn compactable(&self) -> bool {
!self.groups_to_compact.is_empty() || !self.files_to_upgrade.is_empty()
}
/// Return sequncer ID
pub fn sequencer_id(&self) -> Option<SequencerId> {
self.sequencer_id
}
}
/// Data points need to run a compactor
#[derive(Debug)]
pub struct Compactor {
@ -148,10 +178,6 @@ pub struct Compactor {
/// Counter for the number of files compacted
compaction_counter: Metric<U64Counter>,
/// Counter for the actual number of files compacted
/// compaction_actual_counter = compaction_counter - some large-enough non-overlapped files
compaction_actual_counter: Metric<U64Counter>,
/// Counter for level promotion from level 0 to 1
/// These are large enough and non-overlapped file
level_promotion_counter: Metric<U64Counter>,
@ -187,11 +213,6 @@ impl Compactor {
"counter for the number of files compacted",
);
let compaction_actual_counter = registry.register_metric(
"compactor_actual_compacted_files_total",
"counter for the number of actual files compacted",
);
let compaction_output_counter = registry.register_metric(
"compactor_output_compacted_files_total",
"counter for the number of output compacted files",
@ -229,7 +250,7 @@ impl Compactor {
backoff_config,
config,
compaction_counter,
compaction_actual_counter,
// compaction_actual_counter,
compaction_output_counter,
level_promotion_counter,
compaction_candidate_gauge,
@ -339,16 +360,18 @@ impl Compactor {
Ok(partition.sort_key())
}
/// Runs compaction in a partition resolving any tombstones and compacting data so that parquet
/// files will be non-overlapping in time.
pub async fn compact_partition(
/// Group files to be compacted together and level-0 files that will get upgraded
/// for a given partition.
/// The number of compacting files per group will be limited by thier total size and number of files
pub async fn groups_to_compact_and_files_to_upgrade(
&self,
partition_id: PartitionId,
compaction_max_size_bytes: i64,
) -> Result<()> {
info!("compacting partition {}", partition_id);
let start_time = self.time_provider.now();
compaction_max_size_bytes: i64, // max size of files to get compacted
compaction_max_file_count: i64, // max number of files to get compacted
) -> Result<CompactAndUpgrade> {
let mut compact_and_upgrade = CompactAndUpgrade::new(None);
// List all valid (not soft deletded) files of the partition
let parquet_files = self
.catalog
.repositories()
@ -358,9 +381,63 @@ impl Compactor {
.await
.context(ListParquetFilesSnafu)?;
if parquet_files.is_empty() {
return Ok(compact_and_upgrade);
}
compact_and_upgrade.sequencer_id = Some(parquet_files[0].sequencer_id);
// Group overlapped files
// Each group will be limited by thier size and number of files
let overlapped_file_groups = Self::overlapped_groups(
parquet_files,
compaction_max_size_bytes,
compaction_max_file_count,
);
// Group time-contiguous non-overlapped groups if their total size is smaller than a threshold
// If their
let compact_file_groups = Self::group_small_contiguous_groups(
overlapped_file_groups,
compaction_max_size_bytes,
compaction_max_file_count,
);
// Attach appropriate tombstones to each file
let groups_with_tombstones = self.add_tombstones_to_groups(compact_file_groups).await?;
info!("compacting {} groups", groups_with_tombstones.len());
// File groups to compact and files to upgrade
for group in groups_with_tombstones {
// Only one file without tombstones, no need to compact.
if group.parquet_files.len() == 1 && group.tombstones.is_empty() {
// If it is level 0, upgrade it since it is non-overlapping
if group.parquet_files[0].data.compaction_level == INITIAL_COMPACTION_LEVEL {
compact_and_upgrade
.files_to_upgrade
.push(group.parquet_files[0].parquet_file_id())
}
} else {
compact_and_upgrade.groups_to_compact.push(group);
}
}
Ok(compact_and_upgrade)
}
/// Runs compaction in a partition resolving any tombstones and compacting data so that parquet
/// files will be non-overlapping in time.
pub async fn compact_partition(
&self,
partition_id: PartitionId,
compact_and_upgrade: CompactAndUpgrade,
) -> Result<()> {
if !compact_and_upgrade.compactable() {
return Ok(());
}
info!("compacting partition {}", partition_id);
let start_time = self.time_provider.now();
let sort_key_from_catalog = self
.sort_key_from_catalog(partition_id)
.await?
@ -369,39 +446,20 @@ impl Compactor {
// old catalogs)
.expect("Partition sort key should have been available in the catalog");
let sequencer_id = parquet_files[0].sequencer_id;
let file_count = parquet_files.len();
let mut actual_compacted_file_count = file_count;
// Group overlapped files
let overlapped_file_groups = Self::overlapped_groups(parquet_files);
// Group time-contiguous non-overlapped groups if their total size is smaller than a threshold
let compact_file_groups =
Self::group_small_contiguous_groups(overlapped_file_groups, compaction_max_size_bytes);
// Attach appropriate tombstones to each file
let groups_with_tombstones = self.add_tombstones_to_groups(compact_file_groups).await?;
info!("compacting {} groups", groups_with_tombstones.len());
let mut file_count = 0;
// Compact, persist,and update catalog accordingly for each overlaped file
let mut tombstones = BTreeMap::new();
let mut upgrade_level_list: Vec<ParquetFileId> = vec![];
let mut output_file_count = 0;
for group in groups_with_tombstones {
let sequencer_id = compact_and_upgrade
.sequencer_id()
.expect("Should have sequencer ID");
for group in compact_and_upgrade.groups_to_compact {
file_count += group.parquet_files.len();
// keep tombstone ids
tombstones = Self::union_tombstones(tombstones, &group);
// Only one file without tombstones, no need to compact.
if group.parquet_files.len() == 1 && group.tombstones.is_empty() {
actual_compacted_file_count -= 1;
// If it is level 0, upgrade it since it is non-overlapping
if group.parquet_files[0].data.compaction_level == INITIAL_COMPACTION_LEVEL {
upgrade_level_list.push(group.parquet_files[0].parquet_file_id());
}
continue;
}
// Collect all the parquet file IDs, to be able to set their catalog records to be
// deleted. These should already be unique, no need to dedupe.
let original_parquet_file_ids: Vec<_> =
@ -478,12 +536,13 @@ impl Compactor {
self.remove_fully_processed_tombstones(tombstones).await?;
// Upgrade old level-0 to level 1
self.update_to_level_1(&upgrade_level_list).await?;
self.update_to_level_1(&compact_and_upgrade.files_to_upgrade)
.await?;
let attributes = Attributes::from([("sequencer_id", format!("{}", sequencer_id).into())]);
if !upgrade_level_list.is_empty() {
if !compact_and_upgrade.files_to_upgrade.is_empty() {
let promotion_counter = self.level_promotion_counter.recorder(attributes.clone());
promotion_counter.inc(upgrade_level_list.len() as u64);
promotion_counter.inc(compact_and_upgrade.files_to_upgrade.len() as u64);
}
if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) {
@ -494,9 +553,6 @@ impl Compactor {
let compaction_counter = self.compaction_counter.recorder(attributes.clone());
compaction_counter.inc(file_count as u64);
let compaction_actual_counter = self.compaction_actual_counter.recorder(attributes.clone());
compaction_actual_counter.inc(actual_compacted_file_count as u64);
let compaction_output_counter = self.compaction_output_counter.recorder(attributes);
compaction_output_counter.inc(output_file_count as u64);
@ -507,6 +563,7 @@ impl Compactor {
fn group_small_contiguous_groups(
mut file_groups: Vec<GroupWithMinTimeAndSize>,
compaction_max_size_bytes: i64,
compaction_max_file_count: i64,
) -> Vec<Vec<ParquetFileWithMetadata>> {
let mut groups = Vec::with_capacity(file_groups.len());
if file_groups.is_empty() {
@ -518,9 +575,14 @@ impl Compactor {
let mut current_group = vec![];
let mut current_size = 0;
let mut current_num_files = 0;
for g in file_groups {
if current_size + g.total_file_size_bytes < compaction_max_size_bytes {
if current_size + g.total_file_size_bytes < compaction_max_size_bytes
&& current_num_files + g.parquet_files.len()
< compaction_max_file_count.try_into().unwrap()
{
// Group this one with the current_group
current_num_files += g.parquet_files.len();
current_group.extend(g.parquet_files);
current_size += g.total_file_size_bytes;
} else {
@ -528,7 +590,7 @@ impl Compactor {
if !current_group.is_empty() {
groups.push(current_group);
}
current_num_files = g.parquet_files.len();
current_group = g.parquet_files;
current_size = g.total_file_size_bytes;
}
@ -748,15 +810,72 @@ impl Compactor {
Ok(())
}
// Split overlapped groups into smaller groups if there are so mnay files in each group or
// their size are too large. The files are sorted by their min time before splitting so files
// are guarannteed to be overlapped in each new group
fn split_overlapped_groups(
groups: &mut Vec<Vec<ParquetFileWithMetadata>>,
max_size_bytes: i64,
max_file_count: i64,
) -> Vec<Vec<ParquetFileWithMetadata>> {
let mut overlapped_groups: Vec<Vec<ParquetFileWithMetadata>> =
Vec::with_capacity(groups.len() * 2);
let max_count = max_file_count.try_into().unwrap();
for group in groups {
let total_size_bytes: i64 = group.iter().map(|f| f.file_size_bytes).sum();
if group.len() <= max_count && total_size_bytes <= max_size_bytes {
overlapped_groups.push(group.to_vec());
} else {
group.sort_by_key(|f| f.min_time);
while !group.is_empty() {
// limit file num
let mut count = max_count;
// limit total file size
let mut size = 0;
for (i, item) in group.iter().enumerate() {
if i >= max_count {
count = max_count;
break;
}
size += item.file_size_bytes;
if size >= max_size_bytes {
count = i + 1;
break;
}
}
if count > group.len() {
count = group.len();
}
let group_new = group.split_off(count);
overlapped_groups.push(group.to_vec());
*group = group_new;
}
}
}
overlapped_groups
}
// Given a list of parquet files that come from the same Table Partition, group files together
// if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering.
// If there are so mnay files in an overlapped group, the group will be split to ensure each
// group contains limited number of files
fn overlapped_groups(
parquet_files: Vec<ParquetFileWithMetadata>,
max_size_bytes: i64,
max_file_count: i64,
) -> Vec<GroupWithMinTimeAndSize> {
// group overlap files
let overlapped_groups =
let mut overlapped_groups =
group_potential_duplicates(parquet_files).expect("Error grouping overlapped chunks");
// split overlapped groups into smalller groups if they include so many files
let overlapped_groups =
Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count);
// Compute min time and total size for each overlapped group
let mut groups_with_min_time_and_size = Vec::with_capacity(overlapped_groups.len());
for group in overlapped_groups {
@ -984,6 +1103,8 @@ mod tests {
// Simulate unique ID generation
static NEXT_ID: AtomicI64 = AtomicI64::new(0);
static TEST_MAX_SIZE_BYTES: i64 = 100000;
static TEST_MAX_FILE_COUNT: i64 = 10;
#[tokio::test]
// This is integration test to verify all pieces are put together correctly
@ -1038,17 +1159,23 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
compactor
.compact_partition(
let compact_and_upgrade = compactor
.groups_to_compact_and_files_to_upgrade(
partition.partition.id,
compactor.config.compaction_max_size_bytes(),
compactor.config.compaction_max_file_count(),
)
.await
.unwrap();
compactor
.compact_partition(partition.partition.id, compact_and_upgrade)
.await
.unwrap();
// should have 2 non-deleted level_0 files. The original file was marked deleted and not counted
let mut files = catalog
.list_by_table_not_to_delete_with_metadata(table.table.id)
@ -1177,7 +1304,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
@ -1260,13 +1387,18 @@ mod tests {
// ------------------------------------------------
// Compact
compactor
.compact_partition(
let compact_and_upgrade = compactor
.groups_to_compact_and_files_to_upgrade(
partition.partition.id,
compactor.config.compaction_max_size_bytes(),
compactor.config.compaction_max_file_count(),
)
.await
.unwrap();
compactor
.compact_partition(partition.partition.id, compact_and_upgrade)
.await
.unwrap();
// Should have 3 non-soft-deleted files: pf1 not compacted and stay, and 2 newly created after compacting pf2, pf3, pf4
let mut files = catalog
@ -1373,7 +1505,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
@ -1488,7 +1620,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
@ -1609,7 +1741,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
@ -1804,7 +1936,11 @@ mod tests {
let pf1 = arbitrary_parquet_file(1, 2);
let pf2 = arbitrary_parquet_file(3, 4);
let groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
let groups = Compactor::overlapped_groups(
vec![pf1.clone(), pf2.clone()],
TEST_MAX_SIZE_BYTES,
TEST_MAX_FILE_COUNT,
);
// They should be 2 groups
assert_eq!(groups.len(), 2, "There should have been two group");
@ -1819,7 +1955,11 @@ mod tests {
let pf1 = arbitrary_parquet_file(1, 3);
let pf2 = arbitrary_parquet_file(2, 4);
let groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
let groups = Compactor::overlapped_groups(
vec![pf1.clone(), pf2.clone()],
TEST_MAX_SIZE_BYTES,
TEST_MAX_FILE_COUNT,
);
// They should be in one group (order not guaranteed)
assert_eq!(groups.len(), 1, "There should have only been one group");
@ -1857,7 +1997,8 @@ mod tests {
partial_overlap.clone(),
];
let mut groups = Compactor::overlapped_groups(all);
let mut groups =
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
dbg!(&groups);
assert_eq!(groups.len(), 3);
@ -1918,7 +2059,11 @@ mod tests {
let pf1 = arbitrary_parquet_file_with_size(1, 2, 100);
let pf2 = arbitrary_parquet_file_with_size(3, 4, 200);
let overlapped_groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
let overlapped_groups = Compactor::overlapped_groups(
vec![pf1.clone(), pf2.clone()],
TEST_MAX_SIZE_BYTES,
TEST_MAX_FILE_COUNT,
);
// 2 overlapped groups
assert_eq!(overlapped_groups.len(), 2);
let g1 = GroupWithMinTimeAndSize {
@ -1936,13 +2081,162 @@ mod tests {
// Group them by size
let compaction_max_size_bytes = 100000;
let groups =
Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes);
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
compaction_max_size_bytes,
TEST_MAX_FILE_COUNT,
);
// 2 small groups should be grouped in one
assert_eq!(groups.len(), 1);
assert_eq!(groups, vec![vec![pf1, pf2]]);
}
// This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files
// Keep all the variables the same names in both tests for us to follow them easily
#[test]
fn test_split_overlapped_groups() {
let compaction_max_size_bytes = 100000;
// oldest overlapped and very small
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500);
let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400);
let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500);
let oldest_overlapped_group = vec![
overlaps_many.clone(),
contained_completely_within.clone(),
max_equals_min.clone(),
min_equals_max.clone(),
];
// newest files and very large
let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group
let newest_overlapped_group = vec![alone.clone()];
// small files in the middle
let another = arbitrary_parquet_file_with_size(13, 15, 1000);
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()];
let mut overlapped_groups = vec![
oldest_overlapped_group,
newest_overlapped_group,
middle_overlapped_group,
];
let max_size_bytes = 1000;
let max_file_count = 2;
// Three input groups but will produce 5 output ones becasue of limit in size and file count.
// Note that the 3 input groups each includes overlapped files but the groups do not overlap.
// The function split_overlapped_groups is to split each overlapped group. It does not merge any groups.
let groups = Compactor::split_overlapped_groups(
&mut overlapped_groups,
max_size_bytes,
max_file_count,
);
// must be 5
assert_eq!(groups.len(), 5);
// oldest_overlapped_group was split into groups[0] and groups[1] due to file count limit
assert_eq!(groups[0].len(), 2); // reach limit file count
assert!(groups[0].contains(&max_equals_min)); // min_time = 3
assert!(groups[0].contains(&overlaps_many)); // min_time = 5
assert_eq!(groups[1].len(), 2); // reach limit file count
assert!(groups[1].contains(&contained_completely_within)); // min_time = 6
assert!(groups[1].contains(&min_equals_max)); // min_time = 10
// newest_overlapped_group stays the same length one file and the corresponding output is groups[2]
assert_eq!(groups[2].len(), 1); // reach limit file size
assert!(groups[2].contains(&alone));
// middle_overlapped_group was split into groups[3] and groups[4] due to size limit
assert_eq!(groups[3].len(), 1); // reach limit file size
assert!(groups[3].contains(&another)); // min_time = 13
assert_eq!(groups[4].len(), 1); // reach limit file size
assert!(groups[4].contains(&partial_overlap)); // min_time = 14
}
// This tests
// 1. overlapped_groups which focuses on the detail of both its children:
// 1.a. group_potential_duplicates that groups files into overlapped groups
// 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit
// 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit
#[test]
fn test_limit_size_and_num_files() {
let compaction_max_size_bytes = 100000;
// oldest overlapped and very small
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500);
let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400);
let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500);
// newest files and very large
let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group
// small files in the middle
let another = arbitrary_parquet_file_with_size(13, 15, 1000);
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
// Given a bunch of files in an arbitrary order,
let all = vec![
min_equals_max.clone(),
overlaps_many.clone(),
alone.clone(),
another.clone(),
max_equals_min.clone(),
contained_completely_within.clone(),
partial_overlap.clone(),
];
// Group into overlapped groups
let max_size_bytes = 1000;
let max_file_count = 2;
let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count);
// Must be 5
assert_eq!(overlapped_groups.len(), 5);
assert_eq!(overlapped_groups[0].parquet_files.len(), 2); // reach limit file count
assert_eq!(overlapped_groups[1].parquet_files.len(), 2); // reach limit file count
assert_eq!(overlapped_groups[2].parquet_files.len(), 1); // reach limit file size
assert_eq!(overlapped_groups[3].parquet_files.len(), 1); // reach limit file size
assert_eq!(overlapped_groups[4].parquet_files.len(), 1); // reach limit file size
// Group further into group by size and file count limit
// Due to the merge with correct time range, this function has to sort the groups hence output data will be in time order
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
compaction_max_size_bytes,
max_file_count,
);
// Still 5 groups. Nothing is merged due to the limit of size and file num
assert_eq!(groups.len(), 5);
assert_eq!(groups[0].len(), 2); // reach file num limit
assert!(groups[0].contains(&max_equals_min)); // min_time = 3
assert!(groups[0].contains(&overlaps_many)); // min_time = 5
assert_eq!(groups[1].len(), 2); // reach file num limit
assert!(groups[1].contains(&contained_completely_within)); // min_time = 6
assert!(groups[1].contains(&min_equals_max)); // min_time = 10
assert_eq!(groups[2].len(), 1); // reach size limit
assert!(groups[2].contains(&another)); // min_time = 13
assert_eq!(groups[3].len(), 1); // reach size limit
assert!(groups[3].contains(&partial_overlap)); // min_time = 14
assert_eq!(groups[4].len(), 1); // reach size limit
assert!(groups[4].contains(&alone));
}
// This tests
// 1. overlapped_groups which focuses on the detail of both its children:
// 1.a. group_potential_duplicates that groups files into overlapped groups
// 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit
// 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit
#[test]
fn test_group_small_contiguous_overlapped_groups_no_group() {
let compaction_max_size_bytes = 100000;
@ -1951,7 +2245,11 @@ mod tests {
let pf1 = arbitrary_parquet_file_with_size(1, 2, 100);
let pf2 = arbitrary_parquet_file_with_size(3, 4, compaction_max_size_bytes); // too large to group
let overlapped_groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]);
let overlapped_groups = Compactor::overlapped_groups(
vec![pf1.clone(), pf2.clone()],
TEST_MAX_SIZE_BYTES,
TEST_MAX_FILE_COUNT,
);
// 2 overlapped groups
assert_eq!(overlapped_groups.len(), 2);
let g1 = GroupWithMinTimeAndSize {
@ -1968,13 +2266,21 @@ mod tests {
assert_eq!(overlapped_groups, vec![g1, g2]);
// Group them by size
let groups =
Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes);
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
compaction_max_size_bytes,
TEST_MAX_FILE_COUNT,
);
// Files too big to group further
assert_eq!(groups.len(), 2);
assert_eq!(groups, vec![vec![pf1], vec![pf2]]);
}
// This tests
// 1. overlapped_groups which focuses on the detail of both its children:
// 1.a. group_potential_duplicates that groups files into overlapped groups
// 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit
// 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit
#[test]
fn test_group_small_contiguous_overlapped_groups_many_files() {
let compaction_max_size_bytes = 100000;
@ -2004,12 +2310,16 @@ mod tests {
];
// Group into overlapped groups
let overlapped_groups = Compactor::overlapped_groups(all);
let overlapped_groups =
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
assert_eq!(overlapped_groups.len(), 3);
// group further into group by size
let groups =
Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes);
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
compaction_max_size_bytes,
TEST_MAX_FILE_COUNT,
);
// should be 2 groups
assert_eq!(groups.len(), 2);
// first group includes 6 oldest files in 2 overlapped groups
@ -2025,22 +2335,24 @@ mod tests {
assert!(groups[1].contains(&alone));
}
// This tests
// 1. overlapped_groups which focuses on the detail of both its children:
// 1.a. group_potential_duplicates that groups files into overlapped groups
// 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit
// 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit
#[test]
fn test_group_small_contiguous_overlapped_groups_many_files_too_large() {
let compaction_max_size_bytes = 100000;
// oldest overlapped and large
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300);
let max_equals_min =
arbitrary_parquet_file_with_size(3, 5, compaction_max_size_bytes + 400); // too large to group
let max_equals_min = arbitrary_parquet_file_with_size(3, 5, TEST_MAX_SIZE_BYTES + 400); // too large to group
let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500);
// newest files and large
let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group
let alone = arbitrary_parquet_file_with_size(30, 35, TEST_MAX_SIZE_BYTES); // too large to group
// files in the middle and also large
let another = arbitrary_parquet_file_with_size(13, 15, compaction_max_size_bytes + 100); // too large to group
let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
// Given a bunch of files in an arbitrary order
@ -2055,33 +2367,39 @@ mod tests {
];
// Group into overlapped groups
let overlapped_groups = Compactor::overlapped_groups(all);
assert_eq!(overlapped_groups.len(), 3);
let overlapped_groups =
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
assert_eq!(overlapped_groups.len(), 5);
// group further into group by size
let groups =
Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes);
// Still 3 groups because they are all large
assert_eq!(groups.len(), 3);
// first group includes 4 oldest files
assert_eq!(groups[0].len(), 4);
assert!(groups[0].contains(&overlaps_many));
assert!(groups[0].contains(&contained_completely_within));
assert!(groups[0].contains(&max_equals_min));
assert!(groups[0].contains(&min_equals_max));
// second group
assert_eq!(groups[1].len(), 2);
assert!(groups[1].contains(&another));
assert!(groups[1].contains(&partial_overlap));
// third group
// 5 input groups and 5 output groups because they are too large to group further
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
TEST_MAX_SIZE_BYTES,
TEST_MAX_FILE_COUNT,
);
assert_eq!(groups.len(), 5);
// first group includes oldest and large file
assert_eq!(groups[0].len(), 1);
assert!(groups[0].contains(&max_equals_min)); // min_time = 3
// second group
assert_eq!(groups[1].len(), 3);
assert!(groups[1].contains(&overlaps_many)); // min_time = 5
assert!(groups[1].contains(&contained_completely_within)); // min_time = 6
assert!(groups[1].contains(&min_equals_max)); // min_time = 10
// third group
assert_eq!(groups[2].len(), 1);
assert!(groups[2].contains(&alone));
assert!(groups[2].contains(&another)); // min_time = 13
// forth group
assert_eq!(groups[3].len(), 1); // min_time = 14
assert!(groups[3].contains(&partial_overlap));
// fifth group
assert_eq!(groups[4].len(), 1);
assert!(groups[4].contains(&alone)); // min_time = 30
}
#[test]
fn test_group_small_contiguous_overlapped_groups_many_files_middle_too_large() {
let compaction_max_size_bytes = 100000;
// oldest overlapped and very small
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300);
@ -2092,7 +2410,7 @@ mod tests {
let alone = arbitrary_parquet_file_with_size(30, 35, 200);
// large files in the middle
let another = arbitrary_parquet_file_with_size(13, 15, compaction_max_size_bytes + 100); // too large to group
let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
// Given a bunch of files in an arbitrary order
@ -2107,27 +2425,32 @@ mod tests {
];
// Group into overlapped groups
let overlapped_groups = Compactor::overlapped_groups(all);
assert_eq!(overlapped_groups.len(), 3);
let overlapped_groups =
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
assert_eq!(overlapped_groups.len(), 4);
// group further into group by size
let groups =
Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes);
// Nothing will be group further because the middle one is large
// 4 input groups but 3 output groups
// The last 2 groups will be grouped together because they are small
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
TEST_MAX_SIZE_BYTES,
TEST_MAX_FILE_COUNT,
);
assert_eq!(groups.len(), 3);
// first group includes 4 oldest files
assert_eq!(groups[0].len(), 4);
assert!(groups[0].contains(&overlaps_many));
assert!(groups[0].contains(&contained_completely_within));
assert!(groups[0].contains(&max_equals_min));
assert!(groups[0].contains(&min_equals_max));
// second group
assert_eq!(groups[1].len(), 2);
assert!(groups[1].contains(&another));
assert!(groups[1].contains(&partial_overlap));
// third group
assert_eq!(groups[2].len(), 1);
assert!(groups[2].contains(&alone));
assert!(groups[0].contains(&max_equals_min)); // min _time = 3
assert!(groups[0].contains(&overlaps_many)); // min _time = 5
assert!(groups[0].contains(&contained_completely_within)); // min _time = 6
assert!(groups[0].contains(&min_equals_max)); // min _time = 10
// second group
assert_eq!(groups[1].len(), 1);
assert!(groups[1].contains(&another)); // min _time = 13
// third group
assert_eq!(groups[2].len(), 2);
assert!(groups[2].contains(&partial_overlap)); // min _time = 3
assert!(groups[2].contains(&alone)); // min _time = 30
}
#[tokio::test]
@ -2141,7 +2464,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
@ -2314,7 +2637,7 @@ mod tests {
// Given a bunch of files in a particular order to exercise the algorithm:
let all = vec![one, two, three];
let groups = Compactor::overlapped_groups(all);
let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
dbg!(&groups);
// All should be in the same group.
@ -2328,7 +2651,7 @@ mod tests {
// Given a bunch of files in a particular order to exercise the algorithm:
let all = vec![one, two, three, four];
let groups = Compactor::overlapped_groups(all);
let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
dbg!(&groups);
// All should be in the same group.
@ -2346,7 +2669,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
@ -2673,7 +2996,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);
@ -2747,7 +3070,7 @@ mod tests {
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
CompactorConfig::new(90, 100000, 100000),
CompactorConfig::new(90, 100000, 100000, 10),
Arc::new(metric::Registry::new()),
);

View File

@ -112,6 +112,8 @@ pub struct CompactorConfig {
/// For non-overlapped and contiguous files, compactor will also compact them into
/// a larger file of max size defined by this config value.
compaction_max_size_bytes: i64,
/// Limit the number of files to compact into one file
compaction_max_file_count: i64,
}
impl CompactorConfig {
@ -120,6 +122,7 @@ impl CompactorConfig {
split_percentage: i64,
max_concurrent_compaction_size_bytes: i64,
compaction_max_size_bytes: i64,
compaction_max_file_count: i64,
) -> Self {
assert!(split_percentage > 0 && split_percentage <= 100);
@ -127,6 +130,7 @@ impl CompactorConfig {
split_percentage,
max_concurrent_compaction_size_bytes,
compaction_max_size_bytes,
compaction_max_file_count,
}
}
@ -150,6 +154,11 @@ impl CompactorConfig {
pub fn compaction_max_size_bytes(&self) -> i64 {
self.compaction_max_size_bytes
}
/// Max number of files to compact at a time
pub fn compaction_max_file_count(&self) -> i64 {
self.compaction_max_file_count
}
}
/// Checks for candidate partitions to compact and spawns tokio tasks to compact as many
@ -170,28 +179,50 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
let mut used_size = 0;
let max_size = compactor.config.max_concurrent_compaction_size_bytes();
let max_file_size = compactor.config.compaction_max_size_bytes();
let max_file_count = compactor.config.compaction_max_file_count();
let mut handles = vec![];
for c in candidates {
let compactor = Arc::clone(&compactor);
let handle = tokio::task::spawn(async move {
debug!(candidate=?c, "compacting candidate");
if let Err(e) = compactor
.compact_partition(c.partition_id, max_file_size)
.await
{
let compact_and_upgrade = compactor
.groups_to_compact_and_files_to_upgrade(
c.partition_id,
max_file_size,
max_file_count,
)
.await;
match compact_and_upgrade {
Err(e) => {
warn!(
"compaction on partition {} failed with: {:?}",
"groups file to compact and upgrade on partition {} failed with: {:?}",
c.partition_id, e
);
}
debug!(candidate=?c, "compaction complete");
});
used_size += c.file_size_bytes;
handles.push(handle);
if used_size > max_size {
debug!(%max_size, %used_size, n_compactions=%handles.len(), "reached maximum concurrent compaction size limit");
break;
Ok(compact_and_upgrade) => {
if compact_and_upgrade.compactable() {
let handle = tokio::task::spawn(async move {
debug!(candidate=?c, "compacting candidate");
let res = compactor
.compact_partition(c.partition_id, compact_and_upgrade)
.await;
if let Err(e) = res {
warn!(
"compaction on partition {} failed with: {:?}",
c.partition_id, e
);
}
debug!(candidate=?c, "compaction complete");
});
used_size += c.file_size_bytes;
handles.push(handle);
if used_size > max_size {
debug!(%max_size, %used_size, n_compactions=%handles.len(), "reached maximum concurrent compaction size limit");
break;
}
}
}
}
}

View File

@ -360,6 +360,7 @@ impl Config {
split_percentage: 90,
max_concurrent_compaction_size_bytes: 100000,
compaction_max_size_bytes: 100000,
compaction_max_file_count: 10,
};
let querier_config = QuerierConfig {

View File

@ -167,6 +167,7 @@ pub async fn create_compactor_server_type(
compactor_config.split_percentage,
compactor_config.max_concurrent_compaction_size_bytes,
compactor_config.compaction_max_size_bytes,
compactor_config.compaction_max_file_count,
);
let compactor_handler = Arc::new(CompactorHandlerImpl::new(
sequencers,