fix: Log more info when compacting files
parent
a45767e705
commit
909c4b18d4
|
@ -460,6 +460,8 @@ impl Compactor {
|
|||
pub async fn groups_to_compact_and_files_to_upgrade(
|
||||
&self,
|
||||
partition_id: PartitionId,
|
||||
namespace_name: &str,
|
||||
table_name: &str,
|
||||
) -> Result<CompactAndUpgrade> {
|
||||
let mut compact_and_upgrade = CompactAndUpgrade::new(None);
|
||||
|
||||
|
@ -482,8 +484,11 @@ impl Compactor {
|
|||
let files_with_tombstones = self.add_tombstones_to_files(parquet_files).await?;
|
||||
if let Some(files_with_tombstones) = files_with_tombstones {
|
||||
info!(
|
||||
"compacting {} files",
|
||||
files_with_tombstones.parquet_files.len()
|
||||
partition_id = partition_id.get(),
|
||||
num_files = files_with_tombstones.parquet_files.len(),
|
||||
namespace = namespace_name,
|
||||
table = table_name,
|
||||
"compacting files",
|
||||
);
|
||||
|
||||
// Only one file without tombstones, no need to compact.
|
||||
|
@ -562,6 +567,9 @@ impl Compactor {
|
|||
group.parquet_files.iter().map(|f| f.id).collect();
|
||||
let size: i64 = group.parquet_files.iter().map(|f| f.file_size_bytes).sum();
|
||||
info!(
|
||||
partition_id = partition_id.get(),
|
||||
namespace = %namespace.name,
|
||||
table = %table.name,
|
||||
num_files=%group.parquet_files.len(),
|
||||
?size,
|
||||
?original_parquet_file_ids,
|
||||
|
@ -1172,7 +1180,11 @@ mod tests {
|
|||
);
|
||||
|
||||
let compact_and_upgrade = compactor
|
||||
.groups_to_compact_and_files_to_upgrade(partition.partition.id)
|
||||
.groups_to_compact_and_files_to_upgrade(
|
||||
partition.partition.id,
|
||||
&ns.namespace.name,
|
||||
&table.table.name,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compactor
|
||||
|
@ -1393,7 +1405,11 @@ mod tests {
|
|||
// ------------------------------------------------
|
||||
// Compact
|
||||
let compact_and_upgrade = compactor
|
||||
.groups_to_compact_and_files_to_upgrade(partition.partition.id)
|
||||
.groups_to_compact_and_files_to_upgrade(
|
||||
partition.partition.id,
|
||||
&ns.namespace.name,
|
||||
&table.table.name,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
compactor
|
||||
|
|
|
@ -200,7 +200,11 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
|
|||
for c in candidates {
|
||||
let compactor = Arc::clone(&compactor);
|
||||
let compact_and_upgrade = compactor
|
||||
.groups_to_compact_and_files_to_upgrade(c.candidate.partition_id)
|
||||
.groups_to_compact_and_files_to_upgrade(
|
||||
c.candidate.partition_id,
|
||||
&c.namespace.name,
|
||||
&c.table.name,
|
||||
)
|
||||
.await;
|
||||
|
||||
match compact_and_upgrade {
|
||||
|
|
Loading…
Reference in New Issue