Merge pull request #5653 from influxdata/cn/share-code-with-full-compaction
feat: Re-enable full compaction to level 2, limited by the memory budgetpull/24376/head
commit
a5418de433
|
@ -2,18 +2,15 @@
|
|||
//! fully compacted.
|
||||
|
||||
use crate::{
|
||||
compact::{Compactor, PartitionCompactionCandidateWithInfo},
|
||||
compact_candidates_with_memory_budget, compact_in_parallel,
|
||||
parquet_file::CompactorParquetFile,
|
||||
parquet_file_combining,
|
||||
parquet_file_lookup::{self, ParquetFilesForCompaction},
|
||||
compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel,
|
||||
parquet_file_combining, parquet_file_lookup,
|
||||
};
|
||||
use backoff::Backoff;
|
||||
use data_types::{CompactionLevel, ParquetFileId};
|
||||
use data_types::CompactionLevel;
|
||||
use metric::Attributes;
|
||||
use observability_deps::tracing::*;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use snafu::Snafu;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Cold compaction. Returns the number of compacted partitions.
|
||||
pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
||||
|
@ -55,7 +52,20 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
|||
compact_candidates_with_memory_budget(
|
||||
Arc::clone(&compactor),
|
||||
compaction_type,
|
||||
CompactionLevel::Initial,
|
||||
compact_in_parallel,
|
||||
true, // split
|
||||
candidates.clone().into(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Compact level 1 files in parallel ("full compaction")
|
||||
compact_candidates_with_memory_budget(
|
||||
Arc::clone(&compactor),
|
||||
compaction_type,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
compact_in_parallel,
|
||||
false, // don't split
|
||||
candidates.into(),
|
||||
)
|
||||
.await;
|
||||
|
@ -92,146 +102,21 @@ pub(crate) enum Error {
|
|||
},
|
||||
}
|
||||
|
||||
/// Given a partition that needs to have full compaction run,
|
||||
///
|
||||
/// - Select all files in the partition, which this method assumes will only be level 1
|
||||
/// without overlaps (any level 0 and level 2 files passed into this function will be ignored)
|
||||
/// - Split the files into groups based on size: take files in the list until the current group size
|
||||
/// is greater than max_desired_file_size_bytes
|
||||
/// - Compact each group into a new level 2 file, no splitting
|
||||
///
|
||||
/// Uses a hashmap of size overrides to allow mocking of file sizes.
|
||||
#[allow(dead_code)]
|
||||
async fn full_compaction(
|
||||
compactor: &Compactor,
|
||||
partition: Arc<PartitionCompactionCandidateWithInfo>,
|
||||
size_overrides: &HashMap<ParquetFileId, i64>,
|
||||
) -> Result<(), Error> {
|
||||
// select all files in this partition
|
||||
let parquet_files_for_compaction =
|
||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||
Arc::clone(&compactor.catalog),
|
||||
Arc::clone(&partition),
|
||||
size_overrides,
|
||||
)
|
||||
.await
|
||||
.context(LookupSnafu)?;
|
||||
|
||||
let ParquetFilesForCompaction {
|
||||
level_1,
|
||||
.. // Ignore other levels
|
||||
} = parquet_files_for_compaction;
|
||||
|
||||
let groups = group_by_size(level_1, compactor.config.max_desired_file_size_bytes);
|
||||
|
||||
for group in groups {
|
||||
if group.len() == 1 {
|
||||
// upgrade the one file to l2, don't run compaction
|
||||
let mut repos = compactor.catalog.repositories().await;
|
||||
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_compaction_level(&[group[0].id()], CompactionLevel::Final)
|
||||
.await
|
||||
.context(UpgradingSnafu)?;
|
||||
} else {
|
||||
parquet_file_combining::compact_final_no_splits(
|
||||
group,
|
||||
Arc::clone(&partition),
|
||||
Arc::clone(&compactor.catalog),
|
||||
compactor.store.clone(),
|
||||
Arc::clone(&compactor.exec),
|
||||
Arc::clone(&compactor.time_provider),
|
||||
&compactor.compaction_input_file_bytes,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Error::Combining {
|
||||
source: Box::new(e),
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Given a list of parquet files and a size limit, iterate through the list in order. Create
|
||||
/// groups based on when the size of files in the group exceeds the size limit.
|
||||
fn group_by_size(
|
||||
files: Vec<CompactorParquetFile>,
|
||||
max_file_size_bytes: u64,
|
||||
) -> Vec<Vec<CompactorParquetFile>> {
|
||||
let num_files = files.len();
|
||||
let mut group_file_size_bytes = 0;
|
||||
|
||||
let mut group = Vec::with_capacity(num_files);
|
||||
let mut groups = Vec::with_capacity(num_files);
|
||||
|
||||
for file in files {
|
||||
group_file_size_bytes += file.file_size_bytes() as u64;
|
||||
group.push(file);
|
||||
|
||||
if group_file_size_bytes >= max_file_size_bytes {
|
||||
groups.push(group);
|
||||
group = Vec::with_capacity(num_files);
|
||||
group_file_size_bytes = 0;
|
||||
}
|
||||
}
|
||||
if !group.is_empty() {
|
||||
groups.push(group);
|
||||
}
|
||||
|
||||
groups
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{compact_one_partition, handler::CompactorConfig, parquet_file_filtering};
|
||||
use crate::{
|
||||
compact_one_partition, handler::CompactorConfig, parquet_file_filtering,
|
||||
ParquetFilesForCompaction,
|
||||
};
|
||||
use ::parquet_file::storage::ParquetStorage;
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use backoff::BackoffConfig;
|
||||
use data_types::{ColumnType, CompactionLevel};
|
||||
use data_types::{ColumnType, CompactionLevel, ParquetFileId};
|
||||
use iox_query::exec::Executor;
|
||||
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_group_by_size() {
|
||||
// Setup - create a bunch of files
|
||||
let catalog = TestCatalog::new();
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
let shard = ns.create_shard(1).await;
|
||||
let table = ns.create_table("table").await;
|
||||
table.create_column("field_int", ColumnType::I64).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
let partition = table.with_shard(&shard).create_partition("part").await;
|
||||
let arbitrary_lp = "table, field_int=9000i 1010101";
|
||||
|
||||
let builder = TestParquetFileBuilder::default().with_line_protocol(arbitrary_lp);
|
||||
let big = partition.create_parquet_file(builder).await.parquet_file;
|
||||
let big = CompactorParquetFile::with_size_override(big, 1_000);
|
||||
|
||||
let builder = TestParquetFileBuilder::default().with_line_protocol(arbitrary_lp);
|
||||
let little = partition.create_parquet_file(builder).await.parquet_file;
|
||||
let little = CompactorParquetFile::with_size_override(little, 2);
|
||||
|
||||
// Empty in, empty out
|
||||
let groups = group_by_size(vec![], 0);
|
||||
assert!(groups.is_empty(), "Expected empty, got: {:?}", groups);
|
||||
|
||||
// One file in, one group out, even if the file limit is 0
|
||||
let groups = group_by_size(vec![big.clone()], 0);
|
||||
assert_eq!(groups, &[&[big.clone()]]);
|
||||
|
||||
// If the first file is already over the limit, return 2 groups
|
||||
let groups = group_by_size(vec![big.clone(), little.clone()], 100);
|
||||
assert_eq!(groups, &[&[big.clone()], &[little.clone()]]);
|
||||
|
||||
// If the first file does not go over the limit, add another file to the group
|
||||
let groups = group_by_size(vec![little.clone(), big.clone()], 100);
|
||||
assert_eq!(groups, &[&[little, big]]);
|
||||
}
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_remaining_level_0_files_many_files() {
|
||||
|
@ -381,26 +266,33 @@ mod tests {
|
|||
|
||||
// ------------------------------------------------
|
||||
// Compact
|
||||
let mut candidates = compactor
|
||||
let mut partition_candidates = compactor
|
||||
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(candidates.len(), 1);
|
||||
let c = candidates.pop().unwrap();
|
||||
assert_eq!(partition_candidates.len(), 1);
|
||||
let partition = partition_candidates.pop().unwrap();
|
||||
|
||||
let parquet_files_for_compaction =
|
||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||
Arc::clone(&compactor.catalog),
|
||||
Arc::clone(&c),
|
||||
Arc::clone(&partition),
|
||||
&size_overrides,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ParquetFilesForCompaction {
|
||||
level_0,
|
||||
level_1,
|
||||
.. // Ignore other levels
|
||||
} = parquet_files_for_compaction;
|
||||
|
||||
let to_compact = parquet_file_filtering::filter_parquet_files(
|
||||
c,
|
||||
parquet_files_for_compaction,
|
||||
partition,
|
||||
level_0,
|
||||
level_1,
|
||||
compactor.config.memory_budget_bytes,
|
||||
&compactor.parquet_file_candidate_gauge,
|
||||
&compactor.parquet_file_candidate_bytes,
|
||||
|
@ -408,7 +300,7 @@ mod tests {
|
|||
|
||||
let to_compact = to_compact.into();
|
||||
|
||||
compact_one_partition(&compactor, to_compact, "cold")
|
||||
compact_one_partition(&compactor, to_compact, "cold", true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -564,26 +456,33 @@ mod tests {
|
|||
|
||||
// ------------------------------------------------
|
||||
// Compact
|
||||
let mut candidates = compactor
|
||||
let mut partition_candidates = compactor
|
||||
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(candidates.len(), 1);
|
||||
let c = candidates.pop().unwrap();
|
||||
assert_eq!(partition_candidates.len(), 1);
|
||||
let partition = partition_candidates.pop().unwrap();
|
||||
|
||||
let parquet_files_for_compaction =
|
||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||
Arc::clone(&compactor.catalog),
|
||||
Arc::clone(&c),
|
||||
Arc::clone(&partition),
|
||||
&size_overrides,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ParquetFilesForCompaction {
|
||||
level_0,
|
||||
level_1,
|
||||
.. // Ignore other levels
|
||||
} = parquet_files_for_compaction;
|
||||
|
||||
let to_compact = parquet_file_filtering::filter_parquet_files(
|
||||
Arc::clone(&c),
|
||||
parquet_files_for_compaction,
|
||||
Arc::clone(&partition),
|
||||
level_0,
|
||||
level_1,
|
||||
compactor.config.memory_budget_bytes,
|
||||
&compactor.parquet_file_candidate_gauge,
|
||||
&compactor.parquet_file_candidate_bytes,
|
||||
|
@ -591,7 +490,7 @@ mod tests {
|
|||
|
||||
let to_compact = to_compact.into();
|
||||
|
||||
compact_one_partition(&compactor, to_compact, "cold")
|
||||
compact_one_partition(&compactor, to_compact, "cold", true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -653,7 +552,33 @@ mod tests {
|
|||
);
|
||||
|
||||
// Full compaction will now combine the two level 1 files into one level 2 file
|
||||
full_compaction(&compactor, c, &size_overrides)
|
||||
let parquet_files_for_compaction =
|
||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||
Arc::clone(&compactor.catalog),
|
||||
Arc::clone(&partition),
|
||||
&size_overrides,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ParquetFilesForCompaction {
|
||||
level_1,
|
||||
level_2,
|
||||
.. // Ignore other levels
|
||||
} = parquet_files_for_compaction;
|
||||
|
||||
let to_compact = parquet_file_filtering::filter_parquet_files(
|
||||
Arc::clone(&partition),
|
||||
level_1,
|
||||
level_2,
|
||||
compactor.config.memory_budget_bytes,
|
||||
&compactor.parquet_file_candidate_gauge,
|
||||
&compactor.parquet_file_candidate_bytes,
|
||||
);
|
||||
|
||||
let to_compact = to_compact.into();
|
||||
|
||||
compact_one_partition(&compactor, to_compact, "cold", false)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -731,15 +656,12 @@ mod tests {
|
|||
|
||||
// Should have 1 non-soft-deleted files:
|
||||
//
|
||||
// - the newly created file that was upgraded to level 1. Final compaction to level 2 is
|
||||
// currently disabled.
|
||||
// - the newly created file that was upgraded to level 1 then to level 2
|
||||
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 1);
|
||||
let file = files.pop().unwrap();
|
||||
assert_eq!(file.id.get(), 1); // ID doesn't change because the file doesn't get rewritten
|
||||
|
||||
// Final compaction is currently disabled.
|
||||
assert_eq!(file.compaction_level, CompactionLevel::FileNonOverlapped);
|
||||
assert_eq!(file.compaction_level, CompactionLevel::Final);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Verify the parquet file content
|
||||
|
@ -936,23 +858,280 @@ mod tests {
|
|||
|
||||
compact(compactor).await;
|
||||
|
||||
// Full cold compaction to level 2 is currently disabled.
|
||||
let files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 3);
|
||||
// Should have 1 non-soft-deleted file:
|
||||
//
|
||||
// - the level 2 file created after combining all 3 level 1 files created by the first step
|
||||
// of compaction to compact remaining level 0 files
|
||||
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 1, "{files:?}");
|
||||
let files_and_levels: Vec<_> = files
|
||||
.iter()
|
||||
.map(|f| (f.id.get(), f.compaction_level))
|
||||
.collect();
|
||||
|
||||
// The initial files are: L0 1-4, L1 5-6. The first step of cold compaction took files 1-5
|
||||
// and compacted them and split them into files 7 and 8. The second step of cold compaction
|
||||
// took 6, 7, and 8 and combined them all into file 9.
|
||||
assert_eq!(files_and_levels, vec![(9, CompactionLevel::Final)]);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Verify the parquet file content
|
||||
let file = files.pop().unwrap();
|
||||
let batches = table.read_parquet_file(file).await;
|
||||
assert_batches_sorted_eq!(
|
||||
&[
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
"| field_int | tag1 | tag2 | tag3 | time |",
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
|
||||
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
|
||||
"| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |",
|
||||
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
|
||||
"| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |",
|
||||
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
|
||||
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
|
||||
"| 421 | | OH | 21 | 1970-01-01T00:00:00.000091Z |",
|
||||
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
|
||||
"| 81601 | | PA | 15 | 1970-01-01T00:00:00.000090Z |",
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn full_cold_compaction_new_level_1_overlapping_with_level_2() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
// lp1 will be level 1 with min time 10, overlaps with lp5 (L2)
|
||||
let lp1 = vec![
|
||||
"table,tag1=WA field_int=1000i 10",
|
||||
"table,tag1=VT field_int=10i 20",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
// lp2 will be level 2 with min time 8000, overlaps with lp3 (L1)
|
||||
let lp2 = vec![
|
||||
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate with l1
|
||||
"table,tag1=VT field_int=10i 10000",
|
||||
"table,tag1=UT field_int=70i 20000",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
// lp3 will be level 1 with min time 6000, overlaps with lp2 (L2)
|
||||
let lp3 = vec![
|
||||
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
|
||||
"table,tag1=VT field_int=10i 6000",
|
||||
"table,tag1=UT field_int=270i 25000",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
// lp4 will be level 1 with min time 26000, no overlaps
|
||||
let lp4 = vec![
|
||||
"table,tag2=WA,tag3=10 field_int=1600i 28000",
|
||||
"table,tag2=VT,tag3=20 field_int=20i 26000",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
// lp5 will be level 2 with min time 21, overlaps with lp1 (L1)
|
||||
let lp5 = vec![
|
||||
"table,tag2=PA,tag3=15 field_int=1601i 9",
|
||||
"table,tag2=OH,tag3=21 field_int=21i 25",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
// lp6 will be level 2, no overlaps
|
||||
let lp6 = vec![
|
||||
"table,tag2=PA,tag3=15 field_int=81601i 90000",
|
||||
"table,tag2=OH,tag3=21 field_int=421i 91000",
|
||||
]
|
||||
.join("\n");
|
||||
|
||||
let ns = catalog.create_namespace("ns").await;
|
||||
let shard = ns.create_shard(1).await;
|
||||
let table = ns.create_table("table").await;
|
||||
table.create_column("field_int", ColumnType::I64).await;
|
||||
table.create_column("tag1", ColumnType::Tag).await;
|
||||
table.create_column("tag2", ColumnType::Tag).await;
|
||||
table.create_column("tag3", ColumnType::Tag).await;
|
||||
table.create_column("time", ColumnType::Time).await;
|
||||
|
||||
let partition = table.with_shard(&shard).create_partition("part").await;
|
||||
let time = Arc::new(SystemProvider::new());
|
||||
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
|
||||
let mut config = make_compactor_config();
|
||||
|
||||
// Set the memory budget such that only some of the files will be compacted in a group
|
||||
config.memory_budget_bytes = 1050;
|
||||
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let compactor = Arc::new(Compactor::new(
|
||||
vec![shard.shard.id],
|
||||
Arc::clone(&catalog.catalog),
|
||||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::new(Executor::new(1)),
|
||||
Arc::new(SystemProvider::new()),
|
||||
BackoffConfig::default(),
|
||||
config,
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
|
||||
// parquet files that are all in the same partition
|
||||
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
|
||||
|
||||
// pf1, L1, overlaps with lp5 (L2)
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp1)
|
||||
.with_max_seq(2)
|
||||
.with_min_time(10)
|
||||
.with_max_time(20)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped);
|
||||
let pf1 = partition.create_parquet_file(builder).await;
|
||||
size_overrides.insert(
|
||||
pf1.parquet_file.id,
|
||||
compactor.config.max_desired_file_size_bytes as i64 + 10,
|
||||
);
|
||||
|
||||
// pf2, L2, overlaps with lp3 (L1)
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp2)
|
||||
.with_max_seq(5)
|
||||
.with_min_time(8_000)
|
||||
.with_max_time(20_000)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::Final);
|
||||
let pf2 = partition.create_parquet_file(builder).await;
|
||||
size_overrides.insert(
|
||||
pf2.parquet_file.id,
|
||||
100, // small file
|
||||
);
|
||||
|
||||
// pf3, L1, overlaps with lp2 (L2)
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp3)
|
||||
.with_max_seq(3)
|
||||
.with_min_time(6_000)
|
||||
.with_max_time(25_000)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped);
|
||||
let pf3 = partition.create_parquet_file(builder).await;
|
||||
size_overrides.insert(
|
||||
pf3.parquet_file.id,
|
||||
100, // small file
|
||||
);
|
||||
|
||||
// pf4, L1, does not overlap with any, won't fit in budget with 1, 2, 3, 5
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp4)
|
||||
.with_max_seq(1)
|
||||
.with_min_time(26_000)
|
||||
.with_max_time(28_000)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped);
|
||||
let pf4 = partition.create_parquet_file(builder).await;
|
||||
size_overrides.insert(
|
||||
pf4.parquet_file.id,
|
||||
100, // small file
|
||||
);
|
||||
|
||||
// pf5, L2, overlaps with lp1 (L1)
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp5)
|
||||
.with_max_seq(1)
|
||||
.with_min_time(9)
|
||||
.with_max_time(25)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::Final);
|
||||
let pf5 = partition.create_parquet_file(builder).await;
|
||||
size_overrides.insert(
|
||||
pf5.parquet_file.id,
|
||||
100, // small file
|
||||
);
|
||||
|
||||
// pf6, L2, does not overlap with any
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(&lp6)
|
||||
.with_max_seq(20)
|
||||
.with_min_time(90000)
|
||||
.with_max_time(91000)
|
||||
.with_creation_time(time_38_hour_ago)
|
||||
.with_compaction_level(CompactionLevel::Final);
|
||||
let pf6 = partition.create_parquet_file(builder).await;
|
||||
size_overrides.insert(
|
||||
pf6.parquet_file.id,
|
||||
100, // small file
|
||||
);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Compact
|
||||
|
||||
compact(compactor).await;
|
||||
|
||||
// Should have 3 non-soft-deleted files:
|
||||
//
|
||||
// - pf4, the level 1 file untouched because it didn't fit in the memory budget
|
||||
// - pf6, the level 2 file untouched because it doesn't overlap anything
|
||||
// - the level 2 file created after combining all 3 level 1 files created by the first step
|
||||
// of compaction to compact remaining level 0 files
|
||||
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 3, "{files:?}");
|
||||
let files_and_levels: Vec<_> = files
|
||||
.iter()
|
||||
.map(|f| (f.id.get(), f.compaction_level))
|
||||
.collect();
|
||||
|
||||
// File 4 was L1 but didn't fit in the memory budget, so was untouched.
|
||||
// File 6 was already L2 and did not overlap with anything, so was untouched.
|
||||
// Cold compaction took files 1, 2, 3, 5 and compacted them into file 7.
|
||||
assert_eq!(
|
||||
files_and_levels,
|
||||
vec![
|
||||
(
|
||||
pf6.parquet_file.id.get(),
|
||||
CompactionLevel::FileNonOverlapped
|
||||
),
|
||||
(7, CompactionLevel::FileNonOverlapped),
|
||||
(8, CompactionLevel::FileNonOverlapped),
|
||||
(4, CompactionLevel::FileNonOverlapped),
|
||||
(6, CompactionLevel::Final),
|
||||
(7, CompactionLevel::Final),
|
||||
]
|
||||
);
|
||||
|
||||
// ------------------------------------------------
|
||||
// Verify the parquet file content
|
||||
let file1 = files.pop().unwrap();
|
||||
let batches = table.read_parquet_file(file1).await;
|
||||
assert_batches_sorted_eq!(
|
||||
&[
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
"| field_int | tag1 | tag2 | tag3 | time |",
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
|
||||
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
|
||||
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
|
||||
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
|
||||
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
|
||||
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
|
||||
"+-----------+------+------+------+--------------------------------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
|
||||
let file0 = files.pop().unwrap();
|
||||
let batches = table.read_parquet_file(file0).await;
|
||||
assert_batches_sorted_eq!(
|
||||
&[
|
||||
"+-----------+------+------+-----------------------------+",
|
||||
"| field_int | tag2 | tag3 | time |",
|
||||
"+-----------+------+------+-----------------------------+",
|
||||
"| 421 | OH | 21 | 1970-01-01T00:00:00.000091Z |",
|
||||
"| 81601 | PA | 15 | 1970-01-01T00:00:00.000090Z |",
|
||||
"+-----------+------+------+-----------------------------+",
|
||||
],
|
||||
&batches
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
use crate::{compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel};
|
||||
use backoff::Backoff;
|
||||
use data_types::CompactionLevel;
|
||||
use metric::Attributes;
|
||||
use observability_deps::tracing::*;
|
||||
use std::sync::Arc;
|
||||
|
@ -50,7 +51,9 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
|
|||
compact_candidates_with_memory_budget(
|
||||
Arc::clone(&compactor),
|
||||
compaction_type,
|
||||
CompactionLevel::Initial,
|
||||
compact_in_parallel,
|
||||
true, // split
|
||||
candidates.into(),
|
||||
)
|
||||
.await;
|
||||
|
@ -77,6 +80,7 @@ mod tests {
|
|||
handler::CompactorConfig,
|
||||
parquet_file_filtering, parquet_file_lookup,
|
||||
tests::{test_setup, TestSetup},
|
||||
ParquetFilesForCompaction,
|
||||
};
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use backoff::BackoffConfig;
|
||||
|
@ -262,7 +266,9 @@ mod tests {
|
|||
compact_candidates_with_memory_budget(
|
||||
Arc::clone(&compactor),
|
||||
"hot",
|
||||
CompactionLevel::Initial,
|
||||
mock_compactor.compaction_function(),
|
||||
true,
|
||||
candidates.into(),
|
||||
)
|
||||
.await;
|
||||
|
@ -485,7 +491,7 @@ mod tests {
|
|||
|
||||
// ------------------------------------------------
|
||||
// Compact
|
||||
let mut candidates = compactor
|
||||
let mut partition_candidates = compactor
|
||||
.hot_partitions_to_compact(
|
||||
compactor.config.max_number_partitions_per_shard,
|
||||
compactor
|
||||
|
@ -495,21 +501,28 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(candidates.len(), 1);
|
||||
let c = candidates.pop().unwrap();
|
||||
assert_eq!(partition_candidates.len(), 1);
|
||||
let partition = partition_candidates.pop().unwrap();
|
||||
|
||||
let parquet_files_for_compaction =
|
||||
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
|
||||
Arc::clone(&compactor.catalog),
|
||||
Arc::clone(&c),
|
||||
Arc::clone(&partition),
|
||||
&size_overrides,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let ParquetFilesForCompaction {
|
||||
level_0,
|
||||
level_1,
|
||||
.. // Ignore other levels
|
||||
} = parquet_files_for_compaction;
|
||||
|
||||
let to_compact = parquet_file_filtering::filter_parquet_files(
|
||||
c,
|
||||
parquet_files_for_compaction,
|
||||
partition,
|
||||
level_0,
|
||||
level_1,
|
||||
compactor.config.memory_budget_bytes,
|
||||
&compactor.parquet_file_candidate_gauge,
|
||||
&compactor.parquet_file_candidate_bytes,
|
||||
|
@ -517,7 +530,7 @@ mod tests {
|
|||
|
||||
let to_compact = to_compact.into();
|
||||
|
||||
compact_one_partition(&compactor, to_compact, "hot")
|
||||
compact_one_partition(&compactor, to_compact, "hot", true)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ use crate::{
|
|||
compact::{Compactor, PartitionCompactionCandidateWithInfo},
|
||||
parquet_file::CompactorParquetFile,
|
||||
parquet_file_filtering::{FilterResult, FilteredFiles},
|
||||
parquet_file_lookup::ParquetFilesForCompaction,
|
||||
};
|
||||
use data_types::CompactionLevel;
|
||||
use metric::Attributes;
|
||||
|
@ -49,10 +50,12 @@ use std::{collections::VecDeque, sync::Arc};
|
|||
async fn compact_candidates_with_memory_budget<C, Fut>(
|
||||
compactor: Arc<Compactor>,
|
||||
compaction_type: &'static str,
|
||||
initial_level: CompactionLevel,
|
||||
compact_function: C,
|
||||
split: bool,
|
||||
mut candidates: VecDeque<Arc<PartitionCompactionCandidateWithInfo>>,
|
||||
) where
|
||||
C: Fn(Arc<Compactor>, Vec<ReadyToCompact>, &'static str) -> Fut + Send + Sync + 'static,
|
||||
C: Fn(Arc<Compactor>, Vec<ReadyToCompact>, &'static str, bool) -> Fut + Send + Sync + 'static,
|
||||
Fut: futures::Future<Output = ()> + Send,
|
||||
{
|
||||
let mut remaining_budget_bytes = compactor.config.memory_budget_bytes;
|
||||
|
@ -107,9 +110,25 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
Ok(parquet_files_for_compaction) => {
|
||||
// Return only files under the `remaining_budget_bytes` that should be
|
||||
// compacted
|
||||
let ParquetFilesForCompaction {
|
||||
level_0,
|
||||
level_1,
|
||||
level_2,
|
||||
} = parquet_files_for_compaction;
|
||||
|
||||
let (level_n, level_n_plus_1) = match initial_level {
|
||||
CompactionLevel::Initial => (level_0, level_1),
|
||||
CompactionLevel::FileNonOverlapped => (level_1, level_2),
|
||||
_ => {
|
||||
// Focusing on compacting any other level is a bug
|
||||
panic!("Unsupported initial compaction level: {initial_level:?}");
|
||||
}
|
||||
};
|
||||
|
||||
let to_compact = parquet_file_filtering::filter_parquet_files(
|
||||
Arc::clone(&partition),
|
||||
parquet_files_for_compaction,
|
||||
level_n,
|
||||
level_n_plus_1,
|
||||
remaining_budget_bytes,
|
||||
&compactor.parquet_file_candidate_gauge,
|
||||
&compactor.parquet_file_candidate_bytes,
|
||||
|
@ -167,7 +186,11 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
budget_bytes,
|
||||
} => {
|
||||
remaining_budget_bytes -= budget_bytes;
|
||||
parallel_compacting_candidates.push(ReadyToCompact { files, partition });
|
||||
parallel_compacting_candidates.push(ReadyToCompact {
|
||||
files,
|
||||
partition,
|
||||
target_level: initial_level.next(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -192,6 +215,7 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
Arc::clone(&compactor),
|
||||
parallel_compacting_candidates,
|
||||
compaction_type,
|
||||
split,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -204,11 +228,13 @@ async fn compact_candidates_with_memory_budget<C, Fut>(
|
|||
}
|
||||
}
|
||||
|
||||
/// After filtering based on the memory budget, this is a group of files that should be compacted.
|
||||
/// After filtering based on the memory budget, this is a group of files that should be compacted
|
||||
/// into the target level specified.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ReadyToCompact {
|
||||
pub(crate) files: Vec<CompactorParquetFile>,
|
||||
pub(crate) partition: Arc<PartitionCompactionCandidateWithInfo>,
|
||||
pub(crate) target_level: CompactionLevel,
|
||||
}
|
||||
|
||||
// Compact given groups of files in parallel.
|
||||
|
@ -219,6 +245,7 @@ async fn compact_in_parallel(
|
|||
compactor: Arc<Compactor>,
|
||||
groups: Vec<ReadyToCompact>,
|
||||
compaction_type: &'static str,
|
||||
split: bool,
|
||||
) {
|
||||
let mut handles = Vec::with_capacity(groups.len());
|
||||
for group in groups {
|
||||
|
@ -226,7 +253,8 @@ async fn compact_in_parallel(
|
|||
let handle = tokio::task::spawn(async move {
|
||||
let partition_id = group.partition.id();
|
||||
debug!(?partition_id, compaction_type, "compaction starting");
|
||||
let compaction_result = compact_one_partition(&comp, group, compaction_type).await;
|
||||
let compaction_result =
|
||||
compact_one_partition(&comp, group, compaction_type, split).await;
|
||||
match compaction_result {
|
||||
Err(e) => {
|
||||
warn!(?e, ?partition_id, compaction_type, "compaction failed");
|
||||
|
@ -267,23 +295,28 @@ pub(crate) async fn compact_one_partition(
|
|||
compactor: &Compactor,
|
||||
to_compact: ReadyToCompact,
|
||||
compaction_type: &'static str,
|
||||
split: bool,
|
||||
) -> Result<(), CompactOnePartitionError> {
|
||||
let start_time = compactor.time_provider.now();
|
||||
|
||||
let ReadyToCompact { files, partition } = to_compact;
|
||||
let ReadyToCompact {
|
||||
files,
|
||||
partition,
|
||||
target_level,
|
||||
} = to_compact;
|
||||
|
||||
let shard_id = partition.shard_id();
|
||||
|
||||
if files.len() == 1 && files[0].compaction_level() == CompactionLevel::Initial {
|
||||
// upgrade the one l0 file to l1, don't run compaction
|
||||
if files.len() == 1 {
|
||||
// upgrade the one file, don't run compaction
|
||||
let mut repos = compactor.catalog.repositories().await;
|
||||
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_compaction_level(&[files[0].id()], CompactionLevel::FileNonOverlapped)
|
||||
.update_compaction_level(&[files[0].id()], target_level)
|
||||
.await
|
||||
.context(UpgradingSnafu)?;
|
||||
} else {
|
||||
} else if split {
|
||||
parquet_file_combining::compact_parquet_files(
|
||||
files,
|
||||
partition,
|
||||
|
@ -295,6 +328,22 @@ pub(crate) async fn compact_one_partition(
|
|||
compactor.config.max_desired_file_size_bytes,
|
||||
compactor.config.percentage_max_file_size,
|
||||
compactor.config.split_percentage,
|
||||
target_level,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| CompactOnePartitionError::Combining {
|
||||
source: Box::new(e),
|
||||
})?;
|
||||
} else {
|
||||
parquet_file_combining::compact_final_no_splits(
|
||||
files,
|
||||
partition,
|
||||
Arc::clone(&compactor.catalog),
|
||||
compactor.store.clone(),
|
||||
Arc::clone(&compactor.exec),
|
||||
Arc::clone(&compactor.time_provider),
|
||||
&compactor.compaction_input_file_bytes,
|
||||
target_level,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| CompactOnePartitionError::Combining {
|
||||
|
@ -303,8 +352,9 @@ pub(crate) async fn compact_one_partition(
|
|||
}
|
||||
|
||||
let attributes = Attributes::from([
|
||||
("shard_id", format!("{}", shard_id).into()),
|
||||
("shard_id", format!("{shard_id}").into()),
|
||||
("partition_type", compaction_type.into()),
|
||||
("target_level", format!("{}", target_level as i16).into()),
|
||||
]);
|
||||
if let Some(delta) = compactor
|
||||
.time_provider
|
||||
|
@ -353,7 +403,13 @@ pub mod tests {
|
|||
panic!("Expected to get FilterResult::Proceed, got {filter_result:?}");
|
||||
};
|
||||
|
||||
Self { files, partition }
|
||||
let target_level = files.last().unwrap().compaction_level().next();
|
||||
|
||||
Self {
|
||||
files,
|
||||
partition,
|
||||
target_level,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -372,7 +428,9 @@ pub mod tests {
|
|||
compact_candidates_with_memory_budget(
|
||||
Arc::clone(&compactor),
|
||||
"hot",
|
||||
CompactionLevel::Initial,
|
||||
mock_compactor.compaction_function(),
|
||||
true,
|
||||
sorted_candidates,
|
||||
)
|
||||
.await;
|
||||
|
@ -391,6 +449,7 @@ pub mod tests {
|
|||
Arc<Compactor>,
|
||||
Vec<ReadyToCompact>,
|
||||
&'static str,
|
||||
bool,
|
||||
) -> Pin<Box<dyn futures::Future<Output = ()> + Send>>
|
||||
+ Send
|
||||
+ Sync
|
||||
|
@ -403,7 +462,8 @@ pub mod tests {
|
|||
Box::new(
|
||||
move |_compactor: Arc<Compactor>,
|
||||
parallel_compacting_candidates: Vec<ReadyToCompact>,
|
||||
_compaction_type: &'static str| {
|
||||
_compaction_type: &'static str,
|
||||
_split: bool| {
|
||||
let compaction_groups_for_async = Arc::clone(&compaction_groups_for_closure);
|
||||
Box::pin(async move {
|
||||
compaction_groups_for_async
|
||||
|
|
|
@ -97,6 +97,8 @@ pub(crate) async fn compact_parquet_files(
|
|||
// When data is between a "small" and "large" amount, split the compacted files at roughly this
|
||||
// percentage in the earlier compacted file, and the remainder in the later compacted file.
|
||||
split_percentage: u16,
|
||||
// Compaction level the newly created file will have.
|
||||
target_level: CompactionLevel,
|
||||
) -> Result<(), Error> {
|
||||
let partition_id = partition.id();
|
||||
|
||||
|
@ -127,9 +129,12 @@ pub(crate) async fn compact_parquet_files(
|
|||
let num_level_1 = num_files_by_level
|
||||
.get(&CompactionLevel::FileNonOverlapped)
|
||||
.unwrap_or(&0);
|
||||
let num_level_2 = num_files_by_level
|
||||
.get(&CompactionLevel::Final)
|
||||
.unwrap_or(&0);
|
||||
debug!(
|
||||
?partition_id,
|
||||
num_files, num_level_0, num_level_1, "compact files to stream"
|
||||
num_files, num_level_0, num_level_1, num_level_2, "compact files to stream"
|
||||
);
|
||||
|
||||
// Collect all the parquet file IDs, to be able to set their catalog records to be
|
||||
|
@ -146,6 +151,7 @@ pub(crate) async fn compact_parquet_files(
|
|||
partition.table.name.clone(),
|
||||
&partition.table_schema,
|
||||
partition.sort_key.clone(),
|
||||
target_level,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
@ -244,7 +250,7 @@ pub(crate) async fn compact_parquet_files(
|
|||
Arc::clone(&partition),
|
||||
partition_id,
|
||||
max_sequence_number,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
target_level,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -268,9 +274,7 @@ pub(crate) async fn compact_parquet_files(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Compact all files given, no matter their size, into one level 2 file. When this is called by
|
||||
/// `full_compaction`, it should only receive a group of level 1 files that has already been
|
||||
/// selected to be an appropriate size.
|
||||
/// Compact all files given, no matter their size, into one file.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn compact_final_no_splits(
|
||||
files: Vec<CompactorParquetFile>,
|
||||
|
@ -284,6 +288,8 @@ pub(crate) async fn compact_final_no_splits(
|
|||
time_provider: Arc<dyn TimeProvider>,
|
||||
// Histogram for the sizes of the files compacted
|
||||
compaction_input_file_bytes: &Metric<U64Histogram>,
|
||||
// Compaction level the newly created file will have.
|
||||
target_level: CompactionLevel,
|
||||
) -> Result<(), Error> {
|
||||
let partition_id = partition.id();
|
||||
|
||||
|
@ -299,10 +305,7 @@ pub(crate) async fn compact_final_no_splits(
|
|||
// Save all file sizes for recording metrics if this compaction succeeds.
|
||||
let file_sizes: Vec<_> = files.iter().map(|f| f.file_size_bytes()).collect();
|
||||
|
||||
debug!(
|
||||
?partition_id,
|
||||
num_files, "final compaction of files to level 2"
|
||||
);
|
||||
debug!(?partition_id, num_files, "compact_final_no_splits");
|
||||
|
||||
// Collect all the parquet file IDs, to be able to set their catalog records to be
|
||||
// deleted. These should already be unique, no need to dedupe.
|
||||
|
@ -318,6 +321,7 @@ pub(crate) async fn compact_final_no_splits(
|
|||
partition.table.name.clone(),
|
||||
&partition.table_schema,
|
||||
partition.sort_key.clone(),
|
||||
target_level,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
@ -372,7 +376,7 @@ pub(crate) async fn compact_final_no_splits(
|
|||
Arc::clone(&partition),
|
||||
partition_id,
|
||||
max_sequence_number,
|
||||
CompactionLevel::Final,
|
||||
target_level,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -406,7 +410,7 @@ async fn compact_with_plan(
|
|||
partition: Arc<PartitionCompactionCandidateWithInfo>,
|
||||
partition_id: PartitionId,
|
||||
max_sequence_number: SequenceNumber,
|
||||
compaction_level: CompactionLevel,
|
||||
target_level: CompactionLevel,
|
||||
) -> Result<Vec<ParquetFileParams>, Error> {
|
||||
let ctx = exec.new_context(ExecutorType::Reorg);
|
||||
let physical_plan = ctx
|
||||
|
@ -456,7 +460,7 @@ async fn compact_with_plan(
|
|||
partition_id,
|
||||
partition_key: partition.partition_key.clone(),
|
||||
max_sequence_number,
|
||||
compaction_level,
|
||||
compaction_level: target_level,
|
||||
sort_key: Some(sort_key.clone()),
|
||||
};
|
||||
|
||||
|
@ -523,6 +527,7 @@ fn to_queryable_parquet_chunk(
|
|||
table_name: String,
|
||||
table_schema: &TableSchema,
|
||||
partition_sort_key: Option<SortKey>,
|
||||
target_level: CompactionLevel,
|
||||
) -> QueryableParquetChunk {
|
||||
let column_id_lookup = table_schema.column_id_map();
|
||||
let selection: Vec<_> = file
|
||||
|
@ -578,6 +583,7 @@ fn to_queryable_parquet_chunk(
|
|||
sort_key,
|
||||
partition_sort_key,
|
||||
file.compaction_level,
|
||||
target_level,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -853,6 +859,7 @@ mod tests {
|
|||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
DEFAULT_SPLIT_PERCENTAGE,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await;
|
||||
assert_error!(result, Error::NotEnoughParquetFiles { num_files: 0, .. });
|
||||
|
@ -893,6 +900,7 @@ mod tests {
|
|||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
DEFAULT_SPLIT_PERCENTAGE,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -954,6 +962,7 @@ mod tests {
|
|||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
DEFAULT_SPLIT_PERCENTAGE,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1038,6 +1047,7 @@ mod tests {
|
|||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
DEFAULT_SPLIT_PERCENTAGE,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1140,6 +1150,7 @@ mod tests {
|
|||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
split_percentage,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1223,6 +1234,7 @@ mod tests {
|
|||
DEFAULT_MAX_DESIRED_FILE_SIZE_BYTES,
|
||||
DEFAULT_PERCENTAGE_MAX_FILE_SIZE,
|
||||
DEFAULT_SPLIT_PERCENTAGE,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -1321,34 +1333,51 @@ mod tests {
|
|||
let compaction_input_file_bytes = metrics();
|
||||
let shard_id = candidate_partition.shard_id();
|
||||
|
||||
// Even though in the real cold compaction code, we'll usually pass a list of level 1 files
|
||||
// selected by size, nothing in this function checks any of that -- it will compact all
|
||||
// files it's given together into one level 2 file.
|
||||
let level_1_files = parquet_files
|
||||
.into_iter()
|
||||
.filter(|f| f.compaction_level() == CompactionLevel::FileNonOverlapped)
|
||||
.collect();
|
||||
|
||||
// Compact all files given together into one level 2 file.
|
||||
compact_final_no_splits(
|
||||
parquet_files,
|
||||
level_1_files,
|
||||
candidate_partition,
|
||||
Arc::clone(&catalog.catalog),
|
||||
ParquetStorage::new(Arc::clone(&catalog.object_store)),
|
||||
Arc::clone(&catalog.exec),
|
||||
Arc::clone(&catalog.time_provider) as Arc<dyn TimeProvider>,
|
||||
&compaction_input_file_bytes,
|
||||
CompactionLevel::Final,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Should have 1 level 2 file, not split at all
|
||||
// Should have 1 level 2 file, not split at all, and 4 level 0 files.
|
||||
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
|
||||
assert_eq!(files.len(), 1);
|
||||
assert_eq!(files.len(), 5);
|
||||
let files_and_levels: Vec<_> = files
|
||||
.iter()
|
||||
.map(|f| (f.id.get(), f.compaction_level))
|
||||
.collect();
|
||||
assert_eq!(
|
||||
files_and_levels,
|
||||
vec![
|
||||
(2, CompactionLevel::Initial),
|
||||
(3, CompactionLevel::Initial),
|
||||
(5, CompactionLevel::Initial),
|
||||
(6, CompactionLevel::Initial),
|
||||
(7, CompactionLevel::Final),
|
||||
]
|
||||
);
|
||||
|
||||
let file = files.pop().unwrap();
|
||||
assert_eq!(file.id.get(), 7);
|
||||
assert_eq!(file.compaction_level, CompactionLevel::Final);
|
||||
|
||||
// Verify the metrics
|
||||
assert_eq!(
|
||||
extract_byte_metrics(&compaction_input_file_bytes, shard_id),
|
||||
ExtractedByteMetrics {
|
||||
sample_count: 6,
|
||||
buckets_with_counts: vec![(BUCKET_500_KB, 4), (u64::MAX, 2)],
|
||||
sample_count: 2,
|
||||
buckets_with_counts: vec![(BUCKET_500_KB, 2)],
|
||||
}
|
||||
);
|
||||
|
||||
|
@ -1361,15 +1390,9 @@ mod tests {
|
|||
"+-----------+------+------+------+-----------------------------+",
|
||||
"| field_int | tag1 | tag2 | tag3 | time |",
|
||||
"+-----------+------+------+------+-----------------------------+",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
|
||||
"| 10 | VT | | | 1970-01-01T00:00:00.000068Z |",
|
||||
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
|
||||
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000030Z |",
|
||||
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000036Z |",
|
||||
"| 210 | | OH | 21 | 1970-01-01T00:00:00.000136Z |",
|
||||
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
|
||||
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
|
||||
"| 88 | VT | | | 1970-01-01T00:00:00.000010Z |",
|
||||
"| 99 | OR | | | 1970-01-01T00:00:00.000012Z |",
|
||||
"+-----------+------+------+------+-----------------------------+",
|
||||
],
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -30,7 +30,8 @@ pub(crate) struct ParquetFilesForCompaction {
|
|||
/// sequence number.
|
||||
pub(crate) level_0: Vec<CompactorParquetFile>,
|
||||
|
||||
/// Parquet files for a partition with `CompactionLevel::FileNonOverlapped`. Arbitrary order.
|
||||
/// Parquet files for a partition with `CompactionLevel::FileNonOverlapped`. Ordered by
|
||||
/// ascending max sequence number.
|
||||
pub(crate) level_1: Vec<CompactorParquetFile>,
|
||||
|
||||
/// Parquet files for a partition with `CompactionLevel::Final`. Arbitrary order.
|
||||
|
@ -96,6 +97,7 @@ impl ParquetFilesForCompaction {
|
|||
}
|
||||
|
||||
level_0.sort_by_key(|pf| pf.max_sequence_number());
|
||||
level_1.sort_by_key(|pf| pf.min_time());
|
||||
|
||||
Ok(Self {
|
||||
level_0,
|
||||
|
@ -427,4 +429,77 @@ mod tests {
|
|||
vec![CompactorParquetFile::new(l1.parquet_file, 0)]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn level_1_files_are_sorted_on_min_time() {
|
||||
test_helpers::maybe_start_logging();
|
||||
let TestSetup {
|
||||
catalog, partition, ..
|
||||
} = test_setup().await;
|
||||
|
||||
// Create a level 1 file, max seq = 100, min time = 8888
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.with_max_seq(100)
|
||||
.with_min_time(8888);
|
||||
let l1_min_time_8888 = partition.create_parquet_file(builder).await;
|
||||
|
||||
// Create a level 1 file, max seq = 50, min time = 7777
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.with_max_seq(50)
|
||||
.with_min_time(7777);
|
||||
let l1_min_time_7777 = partition.create_parquet_file(builder).await;
|
||||
|
||||
// Create a level 1 file, max seq = 150, min time = 6666
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.with_max_seq(150)
|
||||
.with_min_time(6666);
|
||||
let l1_min_time_6666 = partition.create_parquet_file(builder).await;
|
||||
|
||||
// Create a level 0 file
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::Initial);
|
||||
let l0 = partition.create_parquet_file(builder).await;
|
||||
|
||||
// Create a level 2 file
|
||||
let builder = TestParquetFileBuilder::default()
|
||||
.with_line_protocol(ARBITRARY_LINE_PROTOCOL)
|
||||
.with_compaction_level(CompactionLevel::Final);
|
||||
let l2 = partition.create_parquet_file(builder).await;
|
||||
|
||||
let partition_with_info =
|
||||
Arc::new(PartitionCompactionCandidateWithInfo::from_test_partition(&partition).await);
|
||||
|
||||
let parquet_files_for_compaction = ParquetFilesForCompaction::for_partition(
|
||||
Arc::clone(&catalog.catalog),
|
||||
partition_with_info,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
parquet_files_for_compaction.level_0,
|
||||
vec![CompactorParquetFile::new(l0.parquet_file, 0)]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
parquet_files_for_compaction.level_1,
|
||||
vec![
|
||||
CompactorParquetFile::new(l1_min_time_6666.parquet_file, 0),
|
||||
CompactorParquetFile::new(l1_min_time_7777.parquet_file, 0),
|
||||
CompactorParquetFile::new(l1_min_time_8888.parquet_file, 0),
|
||||
]
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
parquet_files_for_compaction.level_2,
|
||||
vec![CompactorParquetFile::new(l2.parquet_file, 0)]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,17 @@ pub struct QueryableParquetChunk {
|
|||
sort_key: Option<SortKey>,
|
||||
partition_sort_key: Option<SortKey>,
|
||||
compaction_level: CompactionLevel,
|
||||
/// The compaction level that this operation will be when finished. Chunks from files that have
|
||||
/// the same level as this should get chunk order 0 so that files at a lower compaction level
|
||||
/// (and thus created later) should have priority in deduplication.
|
||||
///
|
||||
/// That is:
|
||||
///
|
||||
/// * When compacting L0 + L1, the target level is L1. L0 files should have priority, so all L1
|
||||
/// files should have chunk order 0 to be sorted first.
|
||||
/// * When compacting L1 + L2, the target level is L2. L1 files should have priority, so all L2
|
||||
/// files should have chunk order 0 to be sorted first.
|
||||
target_level: CompactionLevel,
|
||||
}
|
||||
|
||||
impl QueryableParquetChunk {
|
||||
|
@ -66,6 +77,7 @@ impl QueryableParquetChunk {
|
|||
sort_key: Option<SortKey>,
|
||||
partition_sort_key: Option<SortKey>,
|
||||
compaction_level: CompactionLevel,
|
||||
target_level: CompactionLevel,
|
||||
) -> Self {
|
||||
let delete_predicates = tombstones_to_delete_predicates(deletes);
|
||||
Self {
|
||||
|
@ -79,6 +91,7 @@ impl QueryableParquetChunk {
|
|||
sort_key,
|
||||
partition_sort_key,
|
||||
compaction_level,
|
||||
target_level,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -233,11 +246,30 @@ impl QueryChunk for QueryableParquetChunk {
|
|||
"QueryableParquetChunk"
|
||||
}
|
||||
|
||||
// Order of the chunk so they can be deduplicate correctly
|
||||
// Order of the chunk so they can be deduplicated correctly
|
||||
fn order(&self) -> ChunkOrder {
|
||||
match self.compaction_level {
|
||||
CompactionLevel::Initial => ChunkOrder::new(self.max_sequence_number.get()),
|
||||
CompactionLevel::FileNonOverlapped | CompactionLevel::Final => ChunkOrder::new(0),
|
||||
use CompactionLevel::*;
|
||||
match (self.target_level, self.compaction_level) {
|
||||
// Files of the same level as what they're being compacting into were created earlier,
|
||||
// so they should be sorted first so that files created later that haven't yet been
|
||||
// compacted to this level will have priority when resolving duplicates.
|
||||
(FileNonOverlapped, FileNonOverlapped) => ChunkOrder::new(0),
|
||||
(Final, Final) => ChunkOrder::new(0),
|
||||
|
||||
// Files that haven't yet been compacted to the target level were created later and
|
||||
// should be sorted based on their max sequence number.
|
||||
(FileNonOverlapped, Initial) => ChunkOrder::new(self.max_sequence_number.get()),
|
||||
(Final, FileNonOverlapped) => ChunkOrder::new(self.max_sequence_number.get()),
|
||||
|
||||
// These combinations of target compaction level and file compaction level are
|
||||
// invalid in this context given the current compaction algorithm.
|
||||
(Initial, _) => panic!("Can't compact into CompactionLevel::Initial"),
|
||||
(FileNonOverlapped, Final) => panic!(
|
||||
"Can't compact CompactionLevel::Final into CompactionLevel::FileNonOverlapped"
|
||||
),
|
||||
(Final, Initial) => {
|
||||
panic!("Can't compact CompactionLevel::Initial into CompactionLevel::Final")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -255,6 +287,7 @@ mod tests {
|
|||
|
||||
async fn test_setup(
|
||||
compaction_level: CompactionLevel,
|
||||
target_level: CompactionLevel,
|
||||
max_sequence_number: i64,
|
||||
) -> QueryableParquetChunk {
|
||||
let catalog = TestCatalog::new();
|
||||
|
@ -295,19 +328,49 @@ mod tests {
|
|||
None,
|
||||
None,
|
||||
parquet_file.compaction_level,
|
||||
target_level,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_order_is_max_seq_when_compaction_level_0() {
|
||||
let chunk = test_setup(CompactionLevel::Initial, 2).await;
|
||||
async fn chunk_order_is_max_seq_when_compaction_level_0_and_target_level_1() {
|
||||
let chunk = test_setup(
|
||||
CompactionLevel::Initial,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
2,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(chunk.order(), ChunkOrder::new(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_order_is_0_when_compaction_level_1() {
|
||||
let chunk = test_setup(CompactionLevel::FileNonOverlapped, 2).await;
|
||||
async fn chunk_order_is_0_when_compaction_level_1_and_target_level_1() {
|
||||
let chunk = test_setup(
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
2,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(chunk.order(), ChunkOrder::new(0));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_order_is_max_seq_when_compaction_level_1_and_target_level_2() {
|
||||
let chunk = test_setup(
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
CompactionLevel::Final,
|
||||
2,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(chunk.order(), ChunkOrder::new(2));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn chunk_order_is_0_when_compaction_level_2_and_target_level_2() {
|
||||
let chunk = test_setup(CompactionLevel::Final, CompactionLevel::Final, 2).await;
|
||||
|
||||
assert_eq!(chunk.order(), ChunkOrder::new(0));
|
||||
}
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
|
||||
use crate::query::QueryableParquetChunk;
|
||||
use data_types::{
|
||||
ParquetFile, ParquetFileId, TableSchema, Timestamp, TimestampMinMax, Tombstone, TombstoneId,
|
||||
CompactionLevel, ParquetFile, ParquetFileId, TableSchema, Timestamp, TimestampMinMax,
|
||||
Tombstone, TombstoneId,
|
||||
};
|
||||
use observability_deps::tracing::*;
|
||||
use parquet_file::{chunk::ParquetChunk, storage::ParquetStorage};
|
||||
|
@ -114,6 +115,7 @@ impl ParquetFileWithTombstone {
|
|||
table_name: String,
|
||||
table_schema: &TableSchema,
|
||||
partition_sort_key: Option<SortKey>,
|
||||
target_level: CompactionLevel,
|
||||
) -> QueryableParquetChunk {
|
||||
let column_id_lookup = table_schema.column_id_map();
|
||||
let selection: Vec<_> = self
|
||||
|
@ -156,6 +158,7 @@ impl ParquetFileWithTombstone {
|
|||
sort_key,
|
||||
partition_sort_key,
|
||||
self.data.compaction_level,
|
||||
target_level,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,11 +53,24 @@ impl TryFrom<i32> for CompactionLevel {
|
|||
match value {
|
||||
x if x == Self::Initial as i32 => Ok(Self::Initial),
|
||||
x if x == Self::FileNonOverlapped as i32 => Ok(Self::FileNonOverlapped),
|
||||
x if x == Self::Final as i32 => Ok(Self::Final),
|
||||
_ => Err("invalid compaction level value".into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactionLevel {
|
||||
/// When compacting files of this level, provide the level that the resulting file should be.
|
||||
/// Does not exceed the maximum available level.
|
||||
pub fn next(&self) -> Self {
|
||||
match self {
|
||||
Self::Initial => Self::FileNonOverlapped,
|
||||
Self::FileNonOverlapped => Self::Final,
|
||||
_ => Self::Final,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Unique ID for a `Namespace`
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
|
||||
#[sqlx(transparent)]
|
||||
|
|
Loading…
Reference in New Issue