fix: Use into_iter instead of VecDeque
Nothing was pushing into the VecDeque; can just sort and iterate over the collection as a Vec. In `identify_files_to_split`, the collection should be empty after the `for` loop, so it doesn't need to be pushed into the result.pull/24376/head
parent
e0e544abe5
commit
194cb9c6b8
|
@ -1,5 +1,3 @@
|
|||
use std::collections::VecDeque;
|
||||
|
||||
use data_types::{CompactionLevel, ParquetFile, Timestamp};
|
||||
|
||||
use crate::components::{
|
||||
|
@ -43,21 +41,22 @@ pub fn limit_files_to_compact(
|
|||
let split = TargetLevelSplit::new();
|
||||
let (start_level_files, mut target_level_files) = split.apply(files, start_level);
|
||||
|
||||
// Order start-level files by to group the files to commpact them correctly
|
||||
// Order start-level files to group the files to compact them correctly
|
||||
let start_level_files = order_files(start_level_files, &start_level);
|
||||
let mut start_level_files = start_level_files.iter().collect::<VecDeque<_>>();
|
||||
let mut start_level_files = start_level_files.into_iter();
|
||||
|
||||
// Go over start-level files and find overlapped files in target level
|
||||
let mut start_level_files_to_compact: Vec<ParquetFile> = Vec::new();
|
||||
let mut target_level_files_to_compact = Vec::new();
|
||||
let mut files_to_keep = Vec::new();
|
||||
let mut total_size = 0;
|
||||
while let Some(file) = start_level_files.pop_front() {
|
||||
|
||||
for file in start_level_files.by_ref() {
|
||||
// A start-level file, if compacted, must be compacted with all of its overlapped target-level files.
|
||||
// Thus compute the size needed before deciding to compact this file and its overlaps or not
|
||||
|
||||
// Time range of start_level_files_to_compact plus this file
|
||||
let (min_time, max_time) = time_range(file, &start_level_files_to_compact);
|
||||
let (min_time, max_time) = time_range(&file, &start_level_files_to_compact);
|
||||
|
||||
// Get all target-level files that overlaps with the time range and not yet in target_level_files_to_compact
|
||||
let overlapped_files: Vec<&ParquetFile> = target_level_files
|
||||
|
@ -75,13 +74,13 @@ pub fn limit_files_to_compact(
|
|||
|
||||
// If total size is under limit, add this file and its overlapped files to files_to_compact
|
||||
if total_size + size <= max_compact_size as i64 {
|
||||
start_level_files_to_compact.push(file.clone());
|
||||
start_level_files_to_compact.push(file);
|
||||
target_level_files_to_compact
|
||||
.extend(overlapped_files.into_iter().cloned().collect::<Vec<_>>());
|
||||
total_size += size;
|
||||
} else {
|
||||
// Over limit, stop here
|
||||
files_to_keep.push(file.clone());
|
||||
files_to_keep.push(file);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -90,7 +89,7 @@ pub fn limit_files_to_compact(
|
|||
target_level_files.retain(|f| !target_level_files_to_compact.iter().any(|x| x == f));
|
||||
|
||||
// All files left in start_level_files and target_level_files are kept for next round
|
||||
target_level_files.extend(start_level_files.into_iter().cloned().collect::<Vec<_>>());
|
||||
target_level_files.extend(start_level_files);
|
||||
files_to_keep.extend(target_level_files);
|
||||
|
||||
// All files in start_level_files_to_compact and target_level_files_to_compact will be compacted
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
use std::collections::VecDeque;
|
||||
|
||||
use data_types::{CompactionLevel, ParquetFile};
|
||||
use itertools::Itertools;
|
||||
use observability_deps::tracing::debug;
|
||||
|
@ -44,20 +42,17 @@ pub fn identify_files_to_split(
|
|||
// Get start-level and target-level files
|
||||
let len = files.len();
|
||||
let split = TargetLevelSplit::new();
|
||||
let (start_level_files, mut target_level_files) = split.apply(files, start_level);
|
||||
let (mut start_level_files, mut target_level_files) = split.apply(files, start_level);
|
||||
|
||||
// sort start_level files in their max_l0_created_at and convert it to VecDeque for pop_front
|
||||
let mut start_level_files: VecDeque<ParquetFile> = start_level_files
|
||||
.into_iter()
|
||||
.sorted_by_key(|f| f.max_l0_created_at)
|
||||
.collect();
|
||||
// sort start_level files in their max_l0_created_at
|
||||
start_level_files.sort_by_key(|f| f.max_l0_created_at);
|
||||
// sort target level files in their min_time
|
||||
target_level_files.sort_by_key(|f| f.min_time);
|
||||
|
||||
// Get files in start level that overlap with any file in target level
|
||||
let mut files_to_split = Vec::new();
|
||||
let mut files_not_to_split = Vec::new();
|
||||
while let Some(file) = start_level_files.pop_front() {
|
||||
for file in start_level_files {
|
||||
// Get target_level files that overlaps with this file
|
||||
let overlapped_target_level_files: Vec<&ParquetFile> = target_level_files
|
||||
.iter()
|
||||
|
@ -93,8 +88,7 @@ pub fn identify_files_to_split(
|
|||
}
|
||||
|
||||
// keep the rest of the files for next round
|
||||
start_level_files.extend(target_level_files);
|
||||
files_not_to_split.extend(start_level_files);
|
||||
files_not_to_split.extend(target_level_files);
|
||||
|
||||
assert_eq!(files_to_split.len() + files_not_to_split.len(), len);
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{collections::VecDeque, num::NonZeroUsize, sync::Arc, time::Duration};
|
||||
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
|
||||
|
||||
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
|
||||
use futures::StreamExt;
|
||||
|
@ -187,15 +187,14 @@ async fn try_compact_partition(
|
|||
let (files_now, files_later) = components.round_split.split(files, round_info.as_ref());
|
||||
|
||||
// Each branch must not overlap with each other
|
||||
let mut branches = components
|
||||
let branches = components
|
||||
.divide_initial
|
||||
.divide(files_now, round_info.as_ref())
|
||||
.into_iter()
|
||||
.collect::<VecDeque<_>>();
|
||||
.into_iter();
|
||||
|
||||
let mut files_next = files_later;
|
||||
// loop for each "Branch"
|
||||
while let Some(branch) = branches.pop_front() {
|
||||
for branch in branches {
|
||||
let input_paths: Vec<ParquetFilePath> =
|
||||
branch.iter().map(ParquetFilePath::from).collect();
|
||||
|
||||
|
|
Loading…
Reference in New Issue