fix: Re-enable full cold compaction, in serial for now

pull/24376/head
Carol (Nichols || Goulding) 2022-09-08 16:47:19 -04:00
parent 6e1b06c435
commit 743b67f0e9
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 57 additions and 132 deletions

View File

@ -91,15 +91,30 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
let start_time = compactor.time_provider.now();
// Compact any remaining level 0 files in parallel
compact_candidates_with_memory_budget(
Arc::clone(&compactor),
compaction_type,
compact_in_parallel,
candidates,
candidates.clone(),
table_columns,
)
.await;
// Compact level 1 files to level 2 in serial (for now)
for candidate in candidates {
let partition_id = candidate.id();
let compaction_result = full_compaction(&compactor, candidate, &Default::default()).await;
match compaction_result {
Err(e) => {
warn!(?e, ?partition_id, "full compaction failed");
}
Ok(_) => {
debug!(?partition_id, "full compaction complete");
}
}
}
// Done compacting all candidates in the cycle, record its time
if let Some(delta) = compactor
.time_provider
@ -141,7 +156,6 @@ pub(crate) enum Error {
/// - Compact each group into a new level 2 file, no splitting
///
/// Uses a hashmap of size overrides to allow mocking of file sizes.
#[allow(dead_code)]
async fn full_compaction(
compactor: &Compactor,
partition: Arc<PartitionCompactionCandidateWithInfo>,
@ -742,26 +756,12 @@ mod tests {
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let table_column_types = vec![
ColumnTypeCount {
col_type: ColumnType::Tag as i16,
count: 1,
},
ColumnTypeCount {
col_type: ColumnType::I64 as i16,
count: 1,
},
ColumnTypeCount {
col_type: ColumnType::Time as i16,
count: 1,
},
];
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
let compactor = Arc::new(Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
@ -770,7 +770,7 @@ mod tests {
BackoffConfig::default(),
config,
Arc::clone(&metrics),
);
));
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
@ -783,46 +783,16 @@ mod tests {
// ------------------------------------------------
// Compact
let candidates = compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
.await
.unwrap();
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop_front().unwrap();
let parquet_files_for_compaction =
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
Arc::clone(&compactor.catalog),
c.id(),
&Default::default(),
)
.await
.unwrap();
let to_compact = parquet_file_filtering::filter_parquet_files(
c,
parquet_files_for_compaction,
compactor.config.memory_budget_bytes,
&table_column_types,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
);
compact_one_partition(&compactor, to_compact, "cold")
.await
.unwrap();
compact(compactor).await;
// Should have 1 non-soft-deleted files:
//
// - the newly created file that was upgraded to level 1 (TEMP: DISABLED then to level 2)
// - the newly created file that was upgraded to level 1 then to level 2
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 1);
let file = files.pop().unwrap();
assert_eq!(file.id.get(), 1); // ID doesn't change because the file doesn't get rewritten
assert_eq!(file.compaction_level, CompactionLevel::FileNonOverlapped);
// TEMP: DISABLED CompactionLevel::Final);
assert_eq!(file.compaction_level, CompactionLevel::Final);
// ------------------------------------------------
// Verify the parquet file content
@ -910,27 +880,13 @@ mod tests {
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let table_column_types = vec![
ColumnTypeCount {
col_type: ColumnType::Tag as i16,
count: 3,
},
ColumnTypeCount {
col_type: ColumnType::I64 as i16,
count: 1,
},
ColumnTypeCount {
col_type: ColumnType::Time as i16,
count: 1,
},
];
let partition = table.with_shard(&shard).create_partition("part").await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
let compactor = Arc::new(Compactor::new(
vec![shard.shard.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
@ -939,7 +895,7 @@ mod tests {
BackoffConfig::default(),
config,
Arc::clone(&metrics),
);
));
// parquet files that are all in the same partition
let mut size_overrides = HashMap::<ParquetFileId, i64>::default();
@ -1030,77 +986,46 @@ mod tests {
// ------------------------------------------------
// Compact
let candidates = compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
.await
.unwrap();
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop_front().unwrap();
let parquet_files_for_compaction =
parquet_file_lookup::ParquetFilesForCompaction::for_partition_with_size_overrides(
Arc::clone(&compactor.catalog),
c.id(),
&size_overrides,
)
.await
.unwrap();
let to_compact = parquet_file_filtering::filter_parquet_files(
c,
parquet_files_for_compaction,
compactor.config.memory_budget_bytes,
&table_column_types,
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes,
);
compact_one_partition(&compactor, to_compact, "cold")
.await
.unwrap();
compact(compactor).await;
// Should have 1 non-soft-deleted file:
//
// - the level 2 file created after combining all 3 level 1 files created by the first step
// of compaction to compact remaining level 0 files
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 1);
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(files_and_levels, vec![(9, CompactionLevel::Final),]);
// TEMP: DISABLED
// let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
// assert_eq!(files.len(), 1);
// let files_and_levels: Vec<_> = files
// .iter()
// .map(|f| (f.id.get(), f.compaction_level))
// .collect();
// assert_eq!(files_and_levels, vec![(9, CompactionLevel::Final),]);
//
// // ------------------------------------------------
// // Verify the parquet file content
// let file = files.pop().unwrap();
// let batches = table.read_parquet_file(file).await;
// assert_batches_sorted_eq!(
// &[
// "+-----------+------+------+------+--------------------------------+",
// "| field_int | tag1 | tag2 | tag3 | time |",
// "+-----------+------+------+------+--------------------------------+",
// "| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
// "| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
// "| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
// "| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
// "| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
// "| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |",
// "| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
// "| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |",
// "| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
// "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
// "| 421 | | OH | 21 | 1970-01-01T00:00:00.000091Z |",
// "| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
// "| 81601 | | PA | 15 | 1970-01-01T00:00:00.000090Z |",
// "+-----------+------+------+------+--------------------------------+",
// ],
// &batches
// );
// ------------------------------------------------
// Verify the parquet file content
let file = files.pop().unwrap();
let batches = table.read_parquet_file(file).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+--------------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+--------------------------------+",
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
"| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |",
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
"| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |",
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
"| 421 | | OH | 21 | 1970-01-01T00:00:00.000091Z |",
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
"| 81601 | | PA | 15 | 1970-01-01T00:00:00.000090Z |",
"+-----------+------+------+------+--------------------------------+",
],
&batches
);
}
}