Merge pull request #5534 from influxdata/cn/even-more-compactor-tests

test: Refactor and add some more compactor tests
pull/24376/head
kodiakhq[bot] 2022-09-01 21:08:55 +00:00 committed by GitHub
commit 176d86a7c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 143 additions and 20 deletions

View File

@ -307,10 +307,101 @@ mod tests {
use backoff::BackoffConfig;
use data_types::{ColumnType, ColumnTypeCount, CompactionLevel};
use iox_query::exec::Executor;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable};
use iox_tests::util::{
TestCatalog, TestNamespace, TestParquetFileBuilder, TestShard, TestTable,
};
use iox_time::SystemProvider;
use parquet_file::storage::ParquetStorage;
use std::{collections::VecDeque, sync::Arc, time::Duration};
use std::{
collections::VecDeque,
pin::Pin,
sync::{Arc, Mutex},
time::Duration,
};
#[tokio::test]
async fn empty_candidates_compacts_nothing() {
test_helpers::maybe_start_logging();
let TestSetup {
compactor,
mock_compactor,
..
} = test_setup().await;
let sorted_candidates = VecDeque::new();
let table_columns = HashMap::new();
compact_hot_partition_candidates(
Arc::clone(&compactor),
mock_compactor.compaction_function(),
sorted_candidates,
table_columns,
)
.await;
let compaction_groups = mock_compactor.results();
assert!(compaction_groups.is_empty());
}
#[tokio::test]
async fn compacting_table_without_columns_does_nothing() {
test_helpers::maybe_start_logging();
let TestSetup {
compactor,
mock_compactor,
namespace,
shard,
..
} = test_setup().await;
let table_without_columns = namespace.create_table("test_table").await;
let partition1 = table_without_columns
.with_shard(&shard)
.create_partition("one")
.await;
let hot_time_one_hour_ago =
(compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos();
let pf1_1 = TestParquetFileBuilder::default()
.with_min_time(1)
.with_max_time(5)
.with_row_count(2)
.with_compaction_level(CompactionLevel::Initial)
.with_creation_time(hot_time_one_hour_ago);
partition1.create_parquet_file_catalog_record(pf1_1).await;
let candidates = compactor
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_shard(),
compactor
.config
.min_number_recent_ingested_files_per_partition(),
)
.await
.unwrap();
assert_eq!(candidates.len(), 1);
let candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
let mut sorted_candidates = candidates.into_iter().collect::<Vec<_>>();
sorted_candidates.sort_by_key(|c| c.candidate.partition_id);
let sorted_candidates = sorted_candidates.into_iter().collect::<VecDeque<_>>();
let table_columns = HashMap::new();
compact_hot_partition_candidates(
Arc::clone(&compactor),
mock_compactor.compaction_function(),
sorted_candidates,
table_columns,
)
.await;
let compaction_groups = mock_compactor.results();
assert!(compaction_groups.is_empty());
}
#[tokio::test]
async fn test_compact_hot_partition_candidates() {
@ -318,6 +409,7 @@ mod tests {
let TestSetup {
compactor,
mock_compactor,
shard,
table,
..
@ -496,34 +588,18 @@ mod tests {
// P4 is not compacted due to overbudget
// Debug info shows all 3 rounds
let compaction_groups = Arc::new(std::sync::Mutex::new(vec![]));
let compaction_groups_for_closure = Arc::clone(&compaction_groups);
let mock_compaction =
move |_compactor: Arc<Compactor>,
parallel_compacting_candidates: Vec<FilteredFiles>| {
let compaction_groups_for_async = Arc::clone(&compaction_groups_for_closure);
async move {
compaction_groups_for_async
.lock()
.unwrap()
.push(parallel_compacting_candidates);
}
};
// Todo next: So conveniently, debug log shows this is also a reproducer of
// https://github.com/influxdata/conductor/issues/1130
// "hot compaction failed: 1, "Could not serialize and persist record batches failed to peek record stream schema"
compact_hot_partition_candidates(
Arc::clone(&compactor),
mock_compaction,
mock_compactor.compaction_function(),
sorted_candidates,
table_columns,
)
.await;
let compaction_groups = compaction_groups.lock().unwrap();
let compaction_groups = mock_compactor.results();
// 3 rounds of parallel compaction
assert_eq!(compaction_groups.len(), 3);
@ -576,6 +652,47 @@ mod tests {
assert_eq!(g3_candidate1_pf_ids, vec![6, 5]);
}
#[derive(Default)]
struct MockCompactor {
compaction_groups: Arc<Mutex<Vec<Vec<FilteredFiles>>>>,
}
type CompactorFunctionFactory = Box<
dyn Fn(
Arc<Compactor>,
Vec<FilteredFiles>,
) -> Pin<Box<dyn futures::Future<Output = ()> + Send>>
+ Send
+ Sync
+ 'static,
>;
impl MockCompactor {
fn compaction_function(&self) -> CompactorFunctionFactory {
let compaction_groups_for_closure = Arc::clone(&self.compaction_groups);
Box::new(
move |_compactor: Arc<Compactor>,
parallel_compacting_candidates: Vec<FilteredFiles>| {
let compaction_groups_for_async = Arc::clone(&compaction_groups_for_closure);
Box::pin(async move {
compaction_groups_for_async
.lock()
.unwrap()
.push(parallel_compacting_candidates);
})
},
)
}
fn results(self) -> Vec<Vec<FilteredFiles>> {
let Self { compaction_groups } = self;
Arc::try_unwrap(compaction_groups)
.unwrap()
.into_inner()
.unwrap()
}
}
fn make_compactor_config() -> CompactorConfig {
let max_desired_file_size_bytes = 100_000_000;
let percentage_max_file_size = 90;
@ -603,6 +720,8 @@ mod tests {
struct TestSetup {
compactor: Arc<Compactor>,
mock_compactor: MockCompactor,
namespace: Arc<TestNamespace>,
shard: Arc<TestShard>,
table: Arc<TestTable>,
}
@ -642,8 +761,12 @@ mod tests {
Arc::new(metric::Registry::new()),
));
let mock_compactor = MockCompactor::default();
TestSetup {
compactor,
mock_compactor,
namespace,
shard,
table,
}