chore: refactor splitting files by level (#6857)
* chore: refactor splitting files by level * fix: docspull/24376/head
parent
2b36692488
commit
f1561676de
|
@ -2,7 +2,7 @@ use std::{collections::VecDeque, fmt::Display};
|
|||
|
||||
use data_types::{CompactionLevel, ParquetFile};
|
||||
|
||||
use crate::file_group::FilesTimeRange;
|
||||
use crate::file_group::{split_by_level, FilesTimeRange};
|
||||
|
||||
use super::FilesSplit;
|
||||
|
||||
|
@ -67,18 +67,9 @@ impl FilesSplit for TargetLevelNonOverlapSplit {
|
|||
let num_files = files.len();
|
||||
|
||||
// Split files into levels
|
||||
let mut target_level_files = Vec::with_capacity(num_files);
|
||||
let mut prev_level_files = Vec::with_capacity(num_files);
|
||||
let prev_level = target_level.prev();
|
||||
for file in files {
|
||||
if file.compaction_level == target_level {
|
||||
target_level_files.push(file);
|
||||
} else if file.compaction_level == prev_level {
|
||||
prev_level_files.push(file);
|
||||
} else {
|
||||
panic!("Unexpected compaction level: {}", file.compaction_level);
|
||||
}
|
||||
}
|
||||
let (mut target_level_files, prev_level_files) =
|
||||
split_by_level(files, target_level, prev_level);
|
||||
|
||||
// compute time range of prev_level_files
|
||||
let prev_level_range = if let Some(r) = FilesTimeRange::try_new(&prev_level_files) {
|
||||
|
@ -161,7 +152,9 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Unexpected compaction level: CompactionLevel::L2")]
|
||||
#[should_panic(
|
||||
expected = "Unexpected compaction level. Expected CompactionLevel::L1 or CompactionLevel::L0 but got CompactionLevel::L2."
|
||||
)]
|
||||
fn test_unexpected_compaction_level_2() {
|
||||
let files = create_overlapped_files();
|
||||
let split = TargetLevelNonOverlapSplit::new();
|
||||
|
@ -170,7 +163,9 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Unexpected compaction level: CompactionLevel::L0")]
|
||||
#[should_panic(
|
||||
expected = "Unexpected compaction level. Expected CompactionLevel::L2 or CompactionLevel::L1 but got CompactionLevel::L0."
|
||||
)]
|
||||
fn test_unexpected_compaction_level_0() {
|
||||
let files = create_overlapped_files();
|
||||
let split = TargetLevelNonOverlapSplit::new();
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::fmt::Display;
|
|||
use data_types::{CompactionLevel, ParquetFile};
|
||||
|
||||
use super::FilesSplit;
|
||||
use crate::file_group::{overlaps_in_time, FilesTimeRange};
|
||||
use crate::file_group::{overlaps_in_time, split_by_level, FilesTimeRange};
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Split files into `[files_to_compact]` and `[files_to_upgrade]`
|
||||
|
@ -72,18 +72,9 @@ impl FilesSplit for TargetLevelUpgradeSplit {
|
|||
let mut files_to_compact = Vec::with_capacity(files.len());
|
||||
|
||||
// Split files into levels
|
||||
let mut target_level_files = Vec::with_capacity(files.len());
|
||||
let mut prev_level_files = Vec::with_capacity(files.len());
|
||||
let prev_level = target_level.prev();
|
||||
for file in files {
|
||||
if file.compaction_level == target_level {
|
||||
target_level_files.push(file);
|
||||
} else if file.compaction_level == prev_level {
|
||||
prev_level_files.push(file);
|
||||
} else {
|
||||
panic!("Unexpected compaction level: {}", file.compaction_level);
|
||||
}
|
||||
}
|
||||
let (target_level_files, mut prev_level_files) =
|
||||
split_by_level(files, target_level, prev_level);
|
||||
|
||||
// compute time range of target_level_files, if any
|
||||
let target_time_range = FilesTimeRange::try_new(&target_level_files);
|
||||
|
@ -178,7 +169,9 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Unexpected compaction level: CompactionLevel::L2")]
|
||||
#[should_panic(
|
||||
expected = "Unexpected compaction level. Expected CompactionLevel::L1 or CompactionLevel::L0 but got CompactionLevel::L2."
|
||||
)]
|
||||
fn test_unexpected_compaction_level_2() {
|
||||
let files = create_overlapped_files();
|
||||
let split = TargetLevelUpgradeSplit::new(MAX_SIZE);
|
||||
|
@ -187,7 +180,9 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "Unexpected compaction level: CompactionLevel::L0")]
|
||||
#[should_panic(
|
||||
expected = "Unexpected compaction level. Expected CompactionLevel::L2 or CompactionLevel::L1 but got CompactionLevel::L0."
|
||||
)]
|
||||
fn test_unexpected_compaction_level_0() {
|
||||
let files = create_overlapped_files();
|
||||
let split = TargetLevelUpgradeSplit::new(MAX_SIZE);
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Utilities for working with groups of [`ParquetFile`]
|
||||
|
||||
use data_types::{ParquetFile, Timestamp};
|
||||
use data_types::{CompactionLevel, ParquetFile, Timestamp};
|
||||
use std::cmp::{max, min};
|
||||
|
||||
/// Represent the min/max time range for a group of [`ParquetFile`]s
|
||||
|
@ -43,6 +43,31 @@ pub fn overlaps_in_time<'a>(
|
|||
files.into_iter().any(|f| f.overlaps(file))
|
||||
}
|
||||
|
||||
/// Divide a list of [`ParquetFile`]s into two lists based on the two levels
|
||||
///
|
||||
/// Panics if there any level other than the two levels is present
|
||||
pub fn split_by_level(
|
||||
files: impl IntoIterator<Item = ParquetFile>,
|
||||
first_level: CompactionLevel,
|
||||
second_level: CompactionLevel,
|
||||
) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
|
||||
// Split files into levels
|
||||
files
|
||||
.into_iter()
|
||||
.partition(|file| {
|
||||
let file_level = file.compaction_level;
|
||||
if file_level == first_level {
|
||||
true
|
||||
} else if file_level == second_level {
|
||||
false
|
||||
} else {
|
||||
panic!(
|
||||
"Unexpected compaction level. Expected {first_level} or {second_level} but got {file_level}."
|
||||
);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
|
Loading…
Reference in New Issue