refactor: Move hot compaction function into hot compaction module

pull/24376/head
Carol (Nichols || Goulding) 2022-09-01 13:33:22 -04:00
parent 85fb0acea6
commit 1f69d11d46
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 322 additions and 310 deletions

View File

@ -1,21 +1,22 @@
//! Collect highest hot candidates and compact them
use crate::{
compact::{Compactor, PartitionCompactionCandidateWithInfo},
parquet_file_combining,
parquet_file_filtering::{filter_hot_parquet_files, FilterResult, FilteredFiles},
parquet_file_lookup,
};
use backoff::Backoff;
use data_types::{ColumnTypeCount, TableId};
use metric::Attributes;
use observability_deps::tracing::*;
use snafu::{ResultExt, Snafu};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use thiserror::Error;
use crate::{
compact::{Compactor, PartitionCompactionCandidateWithInfo},
parquet_file_filtering::{filter_hot_parquet_files, FilterResult, FilteredFiles},
parquet_file_lookup,
};
#[derive(Debug, Error)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {}
@ -279,7 +280,7 @@ async fn compact_hot_partitions_in_parallel(
let handle = tokio::task::spawn(async move {
let partition_id = p.partition.candidate.partition_id;
debug!(?partition_id, "hot compaction starting");
let compaction_result = crate::compact_hot_partition(&comp, p).await;
let compaction_result = compact_one_hot_partition(&comp, p).await;
match compaction_result {
Err(e) => {
warn!(?e, ?partition_id, "hot compaction failed");
@ -301,20 +302,71 @@ async fn compact_hot_partitions_in_parallel(
let _ = futures::future::join_all(handles).await;
}
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub(crate) enum CompactOneHotPartitionError {
#[snafu(display("{}", source))]
Combining {
source: parquet_file_combining::Error,
},
}
/// One compaction operation of one hot partition
pub(crate) async fn compact_one_hot_partition(
compactor: &Compactor,
to_compact: FilteredFiles,
) -> Result<(), CompactOneHotPartitionError> {
let start_time = compactor.time_provider.now();
let partition = to_compact.partition;
let shard_id = partition.shard_id();
let compact_result = parquet_file_combining::compact_parquet_files(
to_compact.files,
partition,
Arc::clone(&compactor.catalog),
compactor.store.clone(),
Arc::clone(&compactor.exec),
Arc::clone(&compactor.time_provider),
&compactor.compaction_input_file_bytes,
compactor.config.max_desired_file_size_bytes(),
compactor.config.percentage_max_file_size(),
compactor.config.split_percentage(),
)
.await
.context(CombiningSnafu);
let attributes = Attributes::from([
("shard_id", format!("{}", shard_id).into()),
("partition_type", "hot".into()),
]);
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor.compaction_duration.recorder(attributes);
duration.record(delta);
}
compact_result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
compact::Compactor, compact_hot_partitions::compact_hot_partition_candidates,
handler::CompactorConfig,
handler::CompactorConfig, parquet_file_filtering,
};
use arrow_util::assert_batches_sorted_eq;
use backoff::BackoffConfig;
use data_types::{ColumnType, ColumnTypeCount, CompactionLevel};
use data_types::{ColumnType, ColumnTypeCount, CompactionLevel, ParquetFileId};
use iox_query::exec::Executor;
use iox_tests::util::{
TestCatalog, TestNamespace, TestParquetFileBuilder, TestShard, TestTable,
};
use iox_time::SystemProvider;
use iox_time::{SystemProvider, TimeProvider};
use parquet_file::storage::ParquetStorage;
use std::{
collections::VecDeque,
@ -656,6 +708,265 @@ mod tests {
assert_eq!(g3_candidate1_pf_ids, vec![6, 5]);
}
// A quite sophisticated integration test
// Beside lp data, every value min/max sequence numbers and min/max time are important
// to have a combination of needed tests in this test function
#[tokio::test]
async fn test_compact_hot_partition_many_files() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
// lp1 does not overlap with any other level 0
let lp1 = vec![
"table,tag1=WA field_int=1000i 10",
"table,tag1=VT field_int=10i 20",
]
.join("\n");
// lp2 overlaps with lp3
let lp2 = vec![
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10i 10000",
"table,tag1=UT field_int=70i 20000",
]
.join("\n");
// lp3 overlaps with lp2
let lp3 = vec![
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
"table,tag1=VT field_int=10i 6000",
"table,tag1=UT field_int=270i 25000",
]
.join("\n");
// lp4 does not overlap with any
let lp4 = vec![
"table,tag2=WA,tag3=10 field_int=1600i 28000",
"table,tag2=VT,tag3=20 field_int=20i 26000",
]
.join("\n");
// lp5 overlaps with lp1
let lp5 = vec![
"table,tag2=PA,tag3=15 field_int=1601i 9",
"table,tag2=OH,tag3=21 field_int=21i 25",
]
.join("\n");
// lp6 does not overlap with any
let lp6 = vec![
"table,tag2=PA,tag3=15 field_int=81601i 90000",
"table,tag2=OH,tag3=21 field_int=421i 91000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let table_column_types = vec![
ColumnTypeCount {
col_type: ColumnType::Tag as i16,
count: 3,
},
ColumnTypeCount {
col_type: ColumnType::I64 as i16,
count: 1,
},
ColumnTypeCount {
col_type: ColumnType::Time as i16,
count: 1,
},
];
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
Arc::clone(&metrics),
);
// parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
// pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
.with_max_seq(3)
.with_min_time(10)
.with_max_time(20)
.with_creation_time(20);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
compactor.config.max_desired_file_size_bytes() as i64 + 10,
);
// pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2)
.with_max_seq(5)
.with_min_time(8_000)
.with_max_time(20_000)
.with_creation_time(time.now().timestamp_nanos());
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3)
.with_max_seq(10)
.with_min_time(6_000)
.with_max_time(25_000)
.with_creation_time(time.now().timestamp_nanos());
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4)
.with_max_seq(18)
.with_min_time(26_000)
.with_max_time(28_000)
.with_creation_time(time.now().timestamp_nanos());
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5)
.with_max_seq(1)
.with_min_time(9)
.with_max_time(25)
.with_creation_time(time.now().timestamp_nanos())
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6)
.with_max_seq(20)
.with_min_time(90000)
.with_max_time(91000)
.with_creation_time(time.now().timestamp_nanos())
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 4);
// ------------------------------------------------
// Compact
let candidates = compactor
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_shard(),
compactor
.config
.min_number_recent_ingested_files_per_partition(),
)
.await
.unwrap();
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop_front().unwrap();
let parquet_files_for_compaction =
parquet_file_lookup::ParquetFilesForCompaction::for_partition(
Arc::clone(&compactor.catalog),
c.id(),
&size_overrides,
)
.await
.unwrap();
let to_compact = parquet_file_filtering::filter_hot_parquet_files(
c,
parquet_files_for_compaction,
compactor.config.memory_budget_bytes(),
&table_column_types,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
);
compact_one_hot_partition(&compactor, to_compact)
.await
.unwrap();
// Should have 3 non-soft-deleted files:
//
// - the level 1 file that didn't overlap with anything
// - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 3);
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
vec![
(6, CompactionLevel::FileNonOverlapped),
(7, CompactionLevel::FileNonOverlapped),
(8, CompactionLevel::FileNonOverlapped),
]
);
// ------------------------------------------------
// Verify the parquet file content
// Later compacted file
let file1 = files.pop().unwrap();
let batches = table.read_parquet_file(file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+-----------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+-----------------------------+",
"| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |",
"| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |",
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
"+-----------+------+------+------+-----------------------------+",
],
&batches
);
// Earlier compacted file
let file0 = files.pop().unwrap();
let batches = table.read_parquet_file(file0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+--------------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+--------------------------------+",
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
"+-----------+------+------+------+--------------------------------+",
],
&batches
);
}
#[derive(Default)]
struct MockCompactor {
compaction_groups: Arc<Mutex<Vec<Vec<FilteredFiles>>>>,

View File

@ -25,7 +25,6 @@ pub mod utils;
use crate::compact::{Compactor, PartitionCompactionCandidateWithInfo};
use data_types::{CompactionLevel, ParquetFileId};
use metric::Attributes;
use parquet_file_filtering::FilteredFiles;
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, sync::Arc};
@ -48,47 +47,6 @@ pub(crate) enum Error {
},
}
/// One compaction operation of one hot partition
pub(crate) async fn compact_hot_partition(
compactor: &Compactor,
to_compact: FilteredFiles,
) -> Result<(), Error> {
let start_time = compactor.time_provider.now();
let partition = to_compact.partition;
let shard_id = partition.shard_id();
let compact_result = parquet_file_combining::compact_parquet_files(
to_compact.files,
partition,
Arc::clone(&compactor.catalog),
compactor.store.clone(),
Arc::clone(&compactor.exec),
Arc::clone(&compactor.time_provider),
&compactor.compaction_input_file_bytes,
compactor.config.max_desired_file_size_bytes(),
compactor.config.percentage_max_file_size(),
compactor.config.split_percentage(),
)
.await
.context(CombiningSnafu);
let attributes = Attributes::from([
("shard_id", format!("{}", shard_id).into()),
("partition_type", "hot".into()),
]);
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor.compaction_duration.recorder(attributes);
duration.record(delta);
}
compact_result
}
/// One compaction operation of one cold partition
///
/// Takes a hash-map `size_overrides` that mocks the size of the detected [`CompactorParquetFile`]s. This will
@ -172,269 +130,12 @@ mod tests {
use ::parquet_file::storage::ParquetStorage;
use arrow_util::assert_batches_sorted_eq;
use backoff::BackoffConfig;
use data_types::{ColumnType, ColumnTypeCount, CompactionLevel};
use data_types::{ColumnType, CompactionLevel};
use iox_query::exec::Executor;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder};
use iox_time::{SystemProvider, TimeProvider};
use std::time::Duration;
// A quite sophisticated integration test
// Beside lp data, every value min/max sequence numbers and min/max time are important
// to have a combination of needed tests in this test function
#[tokio::test]
async fn test_compact_hot_partition_many_files() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
// lp1 does not overlap with any other level 0
let lp1 = vec![
"table,tag1=WA field_int=1000i 10",
"table,tag1=VT field_int=10i 20",
]
.join("\n");
// lp2 overlaps with lp3
let lp2 = vec![
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10i 10000",
"table,tag1=UT field_int=70i 20000",
]
.join("\n");
// lp3 overlaps with lp2
let lp3 = vec![
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
"table,tag1=VT field_int=10i 6000",
"table,tag1=UT field_int=270i 25000",
]
.join("\n");
// lp4 does not overlap with any
let lp4 = vec![
"table,tag2=WA,tag3=10 field_int=1600i 28000",
"table,tag2=VT,tag3=20 field_int=20i 26000",
]
.join("\n");
// lp5 overlaps with lp1
let lp5 = vec![
"table,tag2=PA,tag3=15 field_int=1601i 9",
"table,tag2=OH,tag3=21 field_int=21i 25",
]
.join("\n");
// lp6 does not overlap with any
let lp6 = vec![
"table,tag2=PA,tag3=15 field_int=81601i 90000",
"table,tag2=OH,tag3=21 field_int=421i 91000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let shard = ns.create_shard(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let table_column_types = vec![
ColumnTypeCount {
col_type: ColumnType::Tag as i16,
count: 3,
},
ColumnTypeCount {
col_type: ColumnType::I64 as i16,
count: 1,
},
ColumnTypeCount {
col_type: ColumnType::Time as i16,
count: 1,
},
];
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
Arc::clone(&metrics),
);
// parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
// pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
.with_max_seq(3)
.with_min_time(10)
.with_max_time(20)
.with_creation_time(20);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(
f.parquet_file.id,
compactor.config.max_desired_file_size_bytes() as i64 + 10,
);
// pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2)
.with_max_seq(5)
.with_min_time(8_000)
.with_max_time(20_000)
.with_creation_time(time.now().timestamp_nanos());
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3)
.with_max_seq(10)
.with_min_time(6_000)
.with_max_time(25_000)
.with_creation_time(time.now().timestamp_nanos());
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4)
.with_max_seq(18)
.with_min_time(26_000)
.with_max_time(28_000)
.with_creation_time(time.now().timestamp_nanos());
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5)
.with_max_seq(1)
.with_min_time(9)
.with_max_time(25)
.with_creation_time(time.now().timestamp_nanos())
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6)
.with_max_seq(20)
.with_min_time(90000)
.with_max_time(91000)
.with_creation_time(time.now().timestamp_nanos())
.with_compaction_level(CompactionLevel::FileNonOverlapped);
let f = partition.create_parquet_file(builder).await;
size_overrides.insert(f.parquet_file.id, 100); // small file
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(shard.shard.id).await;
assert_eq!(count, 4);
// ------------------------------------------------
// Compact
let candidates = compactor
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_shard(),
compactor
.config
.min_number_recent_ingested_files_per_partition(),
)
.await
.unwrap();
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop_front().unwrap();
let parquet_files_for_compaction =
parquet_file_lookup::ParquetFilesForCompaction::for_partition(
Arc::clone(&compactor.catalog),
c.id(),
&size_overrides,
)
.await
.unwrap();
let to_compact = parquet_file_filtering::filter_hot_parquet_files(
c,
parquet_files_for_compaction,
compactor.config.memory_budget_bytes(),
&table_column_types,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
);
compact_hot_partition(&compactor, to_compact).await.unwrap();
// Should have 3 non-soft-deleted files:
//
// - the level 1 file that didn't overlap with anything
// - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 3);
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
vec![
(6, CompactionLevel::FileNonOverlapped),
(7, CompactionLevel::FileNonOverlapped),
(8, CompactionLevel::FileNonOverlapped),
]
);
// ------------------------------------------------
// Verify the parquet file content
// Later compacted file
let file1 = files.pop().unwrap();
let batches = table.read_parquet_file(file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+-----------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+-----------------------------+",
"| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |",
"| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |",
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
"+-----------+------+------+------+-----------------------------+",
],
&batches
);
// Earlier compacted file
let file0 = files.pop().unwrap();
let batches = table.read_parquet_file(file0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+--------------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+--------------------------------+",
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
"+-----------+------+------+------+--------------------------------+",
],
&batches
);
}
#[tokio::test]
async fn test_compact_cold_partition_many_files() {
test_helpers::maybe_start_logging();