fix: If full compaction group has only 1 file, upgrade level
As opposed to running full compaction. Makes the catalog function general and take the level as a parameter rather than only upgrade to level 1.pull/24376/head
parent
10ba3fef47
commit
6bba3fafaa
|
@ -677,7 +677,7 @@ mod tests {
|
|||
};
|
||||
let pf2 = txn.parquet_files().create(p2).await.unwrap();
|
||||
txn.parquet_files()
|
||||
.update_to_level_1(&[pf2.id])
|
||||
.update_compaction_level(&[pf2.id], CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.unwrap();
|
||||
txn.commit().await.unwrap();
|
||||
|
@ -961,7 +961,7 @@ mod tests {
|
|||
};
|
||||
let pf2 = txn.parquet_files().create(p2).await.unwrap();
|
||||
txn.parquet_files()
|
||||
.update_to_level_1(&[pf2.id])
|
||||
.update_compaction_level(&[pf2.id], CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.unwrap();
|
||||
txn.commit().await.unwrap();
|
||||
|
|
|
@ -113,7 +113,7 @@ async fn compact_remaining_level_0_files(
|
|||
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_to_level_1(&[to_compact[0].id()])
|
||||
.update_compaction_level(&[to_compact[0].id()], CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.context(UpgradingSnafu)?;
|
||||
} else {
|
||||
|
@ -184,17 +184,27 @@ async fn full_compaction(
|
|||
}
|
||||
|
||||
for group in groups {
|
||||
// TODO: if there's only 1 level 1 file in the group, upgrade to level 2 w/o compaction?
|
||||
parquet_file_combining::compact_final_no_splits(
|
||||
group,
|
||||
Arc::clone(&partition),
|
||||
Arc::clone(&compactor.catalog),
|
||||
compactor.store.clone(),
|
||||
Arc::clone(&compactor.exec),
|
||||
Arc::clone(&compactor.time_provider),
|
||||
)
|
||||
.await
|
||||
.context(CombiningSnafu)?;
|
||||
if group.len() == 1 {
|
||||
// upgrade the one file to l2, don't run compaction
|
||||
let mut repos = compactor.catalog.repositories().await;
|
||||
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_compaction_level(&[group[0].id()], CompactionLevel::Final)
|
||||
.await
|
||||
.context(UpgradingSnafu)?;
|
||||
} else {
|
||||
parquet_file_combining::compact_final_no_splits(
|
||||
group,
|
||||
Arc::clone(&partition),
|
||||
Arc::clone(&compactor.catalog),
|
||||
compactor.store.clone(),
|
||||
Arc::clone(&compactor.exec),
|
||||
Arc::clone(&compactor.time_provider),
|
||||
)
|
||||
.await
|
||||
.context(CombiningSnafu)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -2,11 +2,11 @@
|
|||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
Column, ColumnSchema, ColumnType, ColumnTypeCount, Namespace, NamespaceId, NamespaceSchema,
|
||||
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo,
|
||||
PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber,
|
||||
Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition, TableSchema,
|
||||
Timestamp, Tombstone, TombstoneId, TopicId, TopicMetadata,
|
||||
Column, ColumnSchema, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId,
|
||||
NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId,
|
||||
PartitionInfo, PartitionKey, PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId,
|
||||
SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, TablePartition,
|
||||
TableSchema, Timestamp, Tombstone, TombstoneId, TopicId, TopicMetadata,
|
||||
};
|
||||
use iox_time::TimeProvider;
|
||||
use snafu::{OptionExt, Snafu};
|
||||
|
@ -609,11 +609,12 @@ pub trait ParquetFileRepo: Send + Sync {
|
|||
) -> Result<Vec<ParquetFile>>;
|
||||
|
||||
/// Update the compaction level of the specified parquet files to
|
||||
/// `CompactionLevel::FileNonOverlapped`
|
||||
/// the specified [`CompactionLevel`].
|
||||
/// Returns the IDs of the files that were successfully updated.
|
||||
async fn update_to_level_1(
|
||||
async fn update_compaction_level(
|
||||
&mut self,
|
||||
parquet_file_ids: &[ParquetFileId],
|
||||
compaction_level: CompactionLevel,
|
||||
) -> Result<Vec<ParquetFileId>>;
|
||||
|
||||
/// Verify if the parquet file exists by selecting its id
|
||||
|
@ -2339,12 +2340,12 @@ pub(crate) mod test_helpers {
|
|||
// Let upgrade all files (only f1 and f3 are not deleted) to level 1
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_to_level_1(&[f1.id])
|
||||
.update_compaction_level(&[f1.id], CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.unwrap();
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_to_level_1(&[f3.id])
|
||||
.update_compaction_level(&[f3.id], CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.unwrap();
|
||||
//
|
||||
|
@ -2509,7 +2510,7 @@ pub(crate) mod test_helpers {
|
|||
let level_1_file = repos.parquet_files().create(level_1_params).await.unwrap();
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_to_level_1(&[level_1_file.id])
|
||||
.update_compaction_level(&[level_1_file.id], CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -2710,17 +2711,20 @@ pub(crate) mod test_helpers {
|
|||
// Make all but _level_0_file compaction level 1
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_to_level_1(&[
|
||||
parquet_file.id,
|
||||
too_early_file.id,
|
||||
too_late_file.id,
|
||||
overlap_lower_file.id,
|
||||
overlap_upper_file.id,
|
||||
other_shard_file.id,
|
||||
other_table_file.id,
|
||||
other_partition_file.id,
|
||||
to_delete_file.id,
|
||||
])
|
||||
.update_compaction_level(
|
||||
&[
|
||||
parquet_file.id,
|
||||
too_early_file.id,
|
||||
too_late_file.id,
|
||||
overlap_lower_file.id,
|
||||
overlap_upper_file.id,
|
||||
other_shard_file.id,
|
||||
other_table_file.id,
|
||||
other_partition_file.id,
|
||||
to_delete_file.id,
|
||||
],
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -2960,7 +2964,7 @@ pub(crate) mod test_helpers {
|
|||
// another_partition should no longer be selected for compaction
|
||||
let partitions = repos
|
||||
.parquet_files()
|
||||
.most_level_0_files_partitions(shard.id, time_24_hours_ago, num_partitions)
|
||||
.most_level_0_files_partitions(shard.id, time_8_hours_ago, num_partitions)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(partitions.len(), 2);
|
||||
|
@ -3369,7 +3373,7 @@ pub(crate) mod test_helpers {
|
|||
.unwrap();
|
||||
repos
|
||||
.parquet_files()
|
||||
.update_to_level_1(&[level1_file.id])
|
||||
.update_compaction_level(&[level1_file.id], CompactionLevel::FileNonOverlapped)
|
||||
.await
|
||||
.unwrap();
|
||||
level1_file.compaction_level = CompactionLevel::FileNonOverlapped;
|
||||
|
@ -3477,7 +3481,10 @@ pub(crate) mod test_helpers {
|
|||
// should succeed
|
||||
let updated = repos
|
||||
.parquet_files()
|
||||
.update_to_level_1(&[parquet_file.id, nonexistent_parquet_file_id])
|
||||
.update_compaction_level(
|
||||
&[parquet_file.id, nonexistent_parquet_file_id],
|
||||
CompactionLevel::FileNonOverlapped,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated, vec![parquet_file.id]);
|
||||
|
|
|
@ -1311,9 +1311,10 @@ impl ParquetFileRepo for MemTxn {
|
|||
.collect())
|
||||
}
|
||||
|
||||
async fn update_to_level_1(
|
||||
async fn update_compaction_level(
|
||||
&mut self,
|
||||
parquet_file_ids: &[ParquetFileId],
|
||||
compaction_level: CompactionLevel,
|
||||
) -> Result<Vec<ParquetFileId>> {
|
||||
let stage = self.stage();
|
||||
|
||||
|
@ -1324,7 +1325,7 @@ impl ParquetFileRepo for MemTxn {
|
|||
.iter_mut()
|
||||
.filter(|p| parquet_file_ids.contains(&p.id))
|
||||
{
|
||||
f.compaction_level = CompactionLevel::FileNonOverlapped;
|
||||
f.compaction_level = compaction_level;
|
||||
updated.push(f.id);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,11 +7,11 @@ use crate::interface::{
|
|||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
Column, ColumnType, ColumnTypeCount, Namespace, NamespaceId, ParquetFile, ParquetFileId,
|
||||
ParquetFileParams, Partition, PartitionId, PartitionInfo, PartitionKey, PartitionParam,
|
||||
ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex,
|
||||
SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, TopicId,
|
||||
TopicMetadata,
|
||||
Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile,
|
||||
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionInfo, PartitionKey,
|
||||
PartitionParam, ProcessedTombstone, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId,
|
||||
ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, Tombstone,
|
||||
TombstoneId, TopicId, TopicMetadata,
|
||||
};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use metric::{DurationHistogram, Metric};
|
||||
|
@ -277,7 +277,7 @@ decorate!(
|
|||
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_level_0" = level_0(&mut self, shard_id: ShardId) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<ParquetFile>>;
|
||||
"parquet_update_to_level_1" = update_to_level_1(&mut self, parquet_file_ids: &[ParquetFileId]) -> Result<Vec<ParquetFileId>>;
|
||||
"parquet_update_compaction_level" = update_compaction_level(&mut self, parquet_file_ids: &[ParquetFileId], compaction_level: CompactionLevel) -> Result<Vec<ParquetFileId>>;
|
||||
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
|
||||
"parquet_count" = count(&mut self) -> Result<i64>;
|
||||
"parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
|
||||
|
|
|
@ -1832,9 +1832,10 @@ WHERE parquet_file.partition_id = $1
|
|||
.map_err(|e| Error::SqlxError { source: e })
|
||||
}
|
||||
|
||||
async fn update_to_level_1(
|
||||
async fn update_compaction_level(
|
||||
&mut self,
|
||||
parquet_file_ids: &[ParquetFileId],
|
||||
compaction_level: CompactionLevel,
|
||||
) -> Result<Vec<ParquetFileId>> {
|
||||
// If I try to do `.bind(parquet_file_ids)` directly, I get a compile error from sqlx.
|
||||
// See https://github.com/launchbadge/sqlx/issues/1744
|
||||
|
@ -1847,7 +1848,7 @@ WHERE id = ANY($2)
|
|||
RETURNING id;
|
||||
"#,
|
||||
)
|
||||
.bind(CompactionLevel::FileNonOverlapped) // $1
|
||||
.bind(compaction_level) // $1
|
||||
.bind(&ids[..]) // $2
|
||||
.fetch_all(&mut self.inner)
|
||||
.await
|
||||
|
|
Loading…
Reference in New Issue