feat: start adding piece by piece for hotcold compaction (#6804)

* feat: start adding piece by piece for hotcold compaction

* chore: remove upgrade from commit results

* chore: address review comments

* chore: comments for stop conditions
pull/24376/head
Nga Tran 2023-02-01 14:52:48 -05:00 committed by GitHub
parent 2574082776
commit 0ef4f9fa9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 271 additions and 20 deletions

View File

@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{ParquetFileId, ParquetFileParams, PartitionId};
use data_types::{CompactionLevel, ParquetFileId, ParquetFileParams, PartitionId};
use iox_catalog::interface::Catalog;
use super::Commit;
@ -34,8 +34,13 @@ impl Commit for CatalogCommit {
&self,
_partition_id: PartitionId,
delete: &[ParquetFileId],
upgrade: &[ParquetFileId],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
// Either upgrade or (delete & create) must not empty
assert!(!upgrade.is_empty() || (!delete.is_empty() && !create.is_empty()));
Backoff::new(&self.backoff_config)
.retry_all_errors("commit parquet file changes", || async {
let mut txn = self.catalog.start_transaction().await?;
@ -46,6 +51,10 @@ impl Commit for CatalogCommit {
parquet_files.flag_for_delete(*file).await?;
}
parquet_files
.update_compaction_level(upgrade, target_level)
.await?;
let mut ids = Vec::with_capacity(create.len());
for file in create {
let res = parquet_files.create(file.clone()).await?;

View File

@ -1,7 +1,7 @@
use std::fmt::Display;
use async_trait::async_trait;
use data_types::{ParquetFileId, ParquetFileParams, PartitionId};
use data_types::{CompactionLevel, ParquetFileId, ParquetFileParams, PartitionId};
use observability_deps::tracing::info;
use super::Commit;
@ -41,19 +41,27 @@ where
&self,
partition_id: PartitionId,
delete: &[ParquetFileId],
upgrade: &[ParquetFileId],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
// Perform commit first and report status AFTERWARDS.
let created = self.inner.commit(partition_id, delete, create).await;
let created = self
.inner
.commit(partition_id, delete, upgrade, create, target_level)
.await;
// Log numbers BEFORE IDs because the list may be so long that we hit the line-length limit. In this case we at
// least have the important information. Note that the message always is printed first, so we'll never loose
// that one.
info!(
target_level=?target_level,
partition_id=partition_id.get(),
n_delete=delete.len(),
n_create=created.len(),
n_upgrade=upgrade.len(),
n_create=created.len(),
delete=?delete.iter().map(|id| id.get()).collect::<Vec<_>>(),
upgrade=?upgrade.iter().map(|id| id.get()).collect::<Vec<_>>(),
create=?created.iter().map(|id| id.get()).collect::<Vec<_>>(),
"committed parquet file change",
);
@ -95,7 +103,9 @@ mod tests {
.commit(
PartitionId::new(1),
&[ParquetFileId::new(1)],
&[],
&[created_1.clone().into(), created_2.clone().into()],
CompactionLevel::Final,
)
.await;
assert_eq!(
@ -107,15 +117,17 @@ mod tests {
.commit(
PartitionId::new(2),
&[ParquetFileId::new(2), ParquetFileId::new(3)],
&[ParquetFileId::new(1)],
&[],
CompactionLevel::Final,
)
.await;
assert_eq!(ids, vec![]);
assert_eq!(
capture.to_string(),
"level = INFO; message = committed parquet file change; partition_id = 1; n_delete = 1; n_create = 2; delete = [1]; create = [1000, 1001]; \n\
level = INFO; message = committed parquet file change; partition_id = 2; n_delete = 2; n_create = 0; delete = [2, 3]; create = []; ",
"level = INFO; message = committed parquet file change; target_level = Final; partition_id = 1; n_delete = 1; n_upgrade = 0; n_create = 2; delete = [1]; upgrade = []; create = [1000, 1001];
level = INFO; message = committed parquet file change; target_level = Final; partition_id = 2; n_delete = 2; n_upgrade = 1; n_create = 0; delete = [2, 3]; upgrade = [1]; create = []; "
);
assert_eq!(
@ -124,12 +136,16 @@ level = INFO; message = committed parquet file change; partition_id = 2; n_delet
CommitHistoryEntry {
partition_id: PartitionId::new(1),
delete: vec![ParquetFileId::new(1)],
upgrade: vec![],
created: vec![created_1, created_2],
target_level: CompactionLevel::Final,
},
CommitHistoryEntry {
partition_id: PartitionId::new(2),
delete: vec![ParquetFileId::new(2), ParquetFileId::new(3)],
upgrade: vec![ParquetFileId::new(1)],
created: vec![],
target_level: CompactionLevel::Final,
},
]
);

View File

@ -1,7 +1,7 @@
use std::fmt::Display;
use async_trait::async_trait;
use data_types::{ParquetFileId, ParquetFileParams, PartitionId};
use data_types::{CompactionLevel, ParquetFileId, ParquetFileParams, PartitionId};
use metric::{Registry, U64Counter};
use super::Commit;
@ -12,6 +12,7 @@ where
T: Commit,
{
create_counter: U64Counter,
upgrade_counter: U64Counter,
delete_counter: U64Counter,
commit_counter: U64Counter,
inner: T,
@ -28,6 +29,12 @@ where
"Number of files created by the compactor",
)
.recorder(&[]);
let upgrade_counter = registry
.register_metric::<U64Counter>(
"iox_compactor_file_upgrade_count",
"Number of files upgraded by the compactor",
)
.recorder(&[]);
let delete_counter = registry
.register_metric::<U64Counter>(
"iox_compactor_file_delete_count",
@ -42,6 +49,7 @@ where
.recorder(&[]);
Self {
create_counter,
upgrade_counter,
delete_counter,
commit_counter,
inner,
@ -67,16 +75,22 @@ where
&self,
partition_id: PartitionId,
delete: &[ParquetFileId],
upgrade: &[ParquetFileId],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
// Perform commit first and report status AFTERWARDS.
let created = self.inner.commit(partition_id, delete, create).await;
let ids = self
.inner
.commit(partition_id, delete, upgrade, create, target_level)
.await;
self.create_counter.inc(created.len() as u64);
self.create_counter.inc(ids.len() as u64);
self.upgrade_counter.inc(upgrade.len() as u64);
self.delete_counter.inc(delete.len() as u64);
self.commit_counter.inc(1);
created
ids
}
}
@ -109,6 +123,7 @@ mod tests {
let created = ParquetFileBuilder::new(1000).with_partition(1).build();
assert_eq!(create_counter(&registry), 0);
assert_eq!(upgrade_counter(&registry), 0);
assert_eq!(delete_counter(&registry), 0);
assert_eq!(commit_counter(&registry), 0);
@ -116,7 +131,9 @@ mod tests {
.commit(
PartitionId::new(1),
&[ParquetFileId::new(1)],
&[ParquetFileId::new(2)],
&[created.clone().into()],
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(ids, vec![ParquetFileId::new(1000)]);
@ -125,12 +142,15 @@ mod tests {
.commit(
PartitionId::new(2),
&[ParquetFileId::new(2), ParquetFileId::new(3)],
&[ParquetFileId::new(4)],
&[],
CompactionLevel::Final,
)
.await;
assert_eq!(ids, vec![]);
assert_eq!(create_counter(&registry), 1);
assert_eq!(upgrade_counter(&registry), 2);
assert_eq!(delete_counter(&registry), 3);
assert_eq!(commit_counter(&registry), 2);
@ -140,12 +160,16 @@ mod tests {
CommitHistoryEntry {
partition_id: PartitionId::new(1),
delete: vec![ParquetFileId::new(1)],
upgrade: vec![ParquetFileId::new(2)],
created: vec![created],
target_level: CompactionLevel::FileNonOverlapped,
},
CommitHistoryEntry {
partition_id: PartitionId::new(2),
delete: vec![ParquetFileId::new(2), ParquetFileId::new(3)],
upgrade: vec![ParquetFileId::new(4)],
created: vec![],
target_level: CompactionLevel::Final,
},
]
);
@ -160,6 +184,15 @@ mod tests {
.fetch()
}
fn upgrade_counter(registry: &Registry) -> u64 {
registry
.get_instrument::<Metric<U64Counter>>("iox_compactor_file_upgrade_count")
.expect("instrument not found")
.get_observer(&Attributes::from([]))
.expect("observer not found")
.fetch()
}
fn delete_counter(registry: &Registry) -> u64 {
registry
.get_instrument::<Metric<U64Counter>>("iox_compactor_file_delete_count")

View File

@ -7,7 +7,7 @@ use std::{
};
use async_trait::async_trait;
use data_types::{ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use super::Commit;
@ -15,7 +15,9 @@ use super::Commit;
pub struct CommitHistoryEntry {
pub partition_id: PartitionId,
pub delete: Vec<ParquetFileId>,
pub upgrade: Vec<ParquetFileId>,
pub created: Vec<ParquetFile>,
pub target_level: CompactionLevel,
}
#[derive(Debug)]
@ -51,7 +53,9 @@ impl Commit for MockCommit {
&self,
partition_id: PartitionId,
delete: &[ParquetFileId],
upgrade: &[ParquetFileId],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
let (created, ids): (Vec<_>, Vec<_>) = create
.iter()
@ -68,7 +72,9 @@ impl Commit for MockCommit {
.push(CommitHistoryEntry {
partition_id,
delete: delete.to_vec(),
upgrade: upgrade.to_vec(),
created,
target_level,
});
ids
@ -99,7 +105,9 @@ mod tests {
.commit(
PartitionId::new(1),
&[ParquetFileId::new(1), ParquetFileId::new(2)],
&[ParquetFileId::new(3), ParquetFileId::new(4)],
&[created_1_1.clone().into(), created_1_2.clone().into()],
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(
@ -111,7 +119,9 @@ mod tests {
.commit(
PartitionId::new(2),
&[ParquetFileId::new(3)],
&[],
&[created_2_1.clone().into()],
CompactionLevel::Final,
)
.await;
assert_eq!(ids, vec![ParquetFileId::new(1002)]);
@ -124,14 +134,22 @@ mod tests {
ParquetFileId::new(6),
ParquetFileId::new(7),
],
&[],
&[created_1_3.clone().into()],
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(ids, vec![ParquetFileId::new(1003)]);
// simulate fill implosion of the file (this may happen w/ delete predicates)
let ids = commit
.commit(PartitionId::new(1), &[ParquetFileId::new(8)], &[])
.commit(
PartitionId::new(1),
&[ParquetFileId::new(8)],
&[],
&[],
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(ids, vec![]);
@ -141,12 +159,16 @@ mod tests {
CommitHistoryEntry {
partition_id: PartitionId::new(1),
delete: vec![ParquetFileId::new(1), ParquetFileId::new(2)],
upgrade: vec![ParquetFileId::new(3), ParquetFileId::new(4)],
created: vec![created_1_1, created_1_2],
target_level: CompactionLevel::FileNonOverlapped,
},
CommitHistoryEntry {
partition_id: PartitionId::new(2),
delete: vec![ParquetFileId::new(3)],
upgrade: vec![],
created: vec![created_2_1],
target_level: CompactionLevel::Final,
},
CommitHistoryEntry {
partition_id: PartitionId::new(1),
@ -155,12 +177,16 @@ mod tests {
ParquetFileId::new(6),
ParquetFileId::new(7)
],
upgrade: vec![],
created: vec![created_1_3],
target_level: CompactionLevel::FileNonOverlapped,
},
CommitHistoryEntry {
partition_id: PartitionId::new(1),
delete: vec![ParquetFileId::new(8)],
upgrade: vec![],
created: vec![],
target_level: CompactionLevel::FileNonOverlapped,
},
]
)

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use data_types::{ParquetFileId, ParquetFileParams, PartitionId};
use data_types::{CompactionLevel, ParquetFileId, ParquetFileParams, PartitionId};
pub mod catalog;
pub mod logging;
@ -14,7 +14,7 @@ pub mod mock;
/// Ensures that the file change (i.e. deletion and creation) are committed to the catalog.
#[async_trait]
pub trait Commit: Debug + Display + Send + Sync {
/// Commmit deletions and creations in a single transaction.
/// Commmit deletions, upgrades and creations in a single transaction.
///
/// Returns the IDs for the created files.
///
@ -24,7 +24,9 @@ pub trait Commit: Debug + Display + Send + Sync {
&self,
partition_id: PartitionId,
delete: &[ParquetFileId],
upgrade: &[ParquetFileId],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId>;
}
@ -37,8 +39,12 @@ where
&self,
partition_id: PartitionId,
delete: &[ParquetFileId],
upgrade: &[ParquetFileId],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
self.as_ref().commit(partition_id, delete, create).await
self.as_ref()
.commit(partition_id, delete, upgrade, create, target_level)
.await
}
}

View File

@ -238,8 +238,13 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
})
}
// This is the stop compaction condition
// Same condition for all versions to protect the system from OOMs
// . Number of files > max_input_files_per_partition
// . Total size of files > max_input_parquet_bytes_per_partition
fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionFilter>> {
match config.compact_version {
// No L0 files to compact
AlgoVersion::Naive => {
vec![
Arc::new(HasMatchingFilePartitionFilter::new(
@ -253,6 +258,12 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
)),
]
}
// TODO: Change this to reflect ot stop compaction condition
// . One file
// . All files are L2
// . A combination of L2s and a few small L1s (will have specific numbers)
// This is the middle stage when we suspect there are more coming L0s to get compacted
// with L1s before they are large enough to get compacted with other L2s and into L2s
AlgoVersion::HotCold => {
// identical for now
vec![

View File

@ -1,6 +1,6 @@
use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use iox_time::Time;
@ -79,6 +79,109 @@ async fn compact_partition(
scratchpad.clean().await;
}
/// Main function to compact files of a single partitions.
///
/// Output: The output is one of the stop conditions of the loop
/// . Version 1:
/// - One L0 file (no need to compact it)
/// - One or many L1 files. The size of each file is estimated to be under a
/// config max_desired_file_size
/// . Version 2:
/// - One file
/// - All files are L2
/// - A combination of L2s and a few small L1s
/// This is the middle stage when we suspect there are more coming L0s to get compacted
/// with L1s before they are large enough to get compacted with other L2s and into L2s
///
/// Algorithms
///
/// GENERAL IDEA OF THE CODE: DIVIDE & CONQUER (we have not used all of its power yet)
///
/// The files are split into non-time-overlaped branches, each is compacted in parallel.
/// The output of each branch is then combined and re-branch in next round until
/// they should not be compacted based on defined stop conditions.
//
// Example: Partition has 7 files: f1, f2, f3, f4, f5, f6, f7
/// Input: shown by their time range
/// |--f1--| |----f3----| |-f4-||-f5-||-f7-|
/// |------f2----------| |--f6--|
///
/// - Round 1: Assuming 7 files are split into 2 branches:
/// . Branch 1: has 3 files: f1, f2, f3
/// . Branch 2: has 4 files: f4, f5, f6, f7
/// |<------- Branch 1 -------------->| |<-- Branch 2 -->|
/// |--f1--| |----f3----| |-f4-||-f5-||-f7-|
/// |------f2----------| |--f6--|
///
/// Output: 3 files f8, f9, f10
/// |<------- Branch 1 -------------->| |<-- Branch 2--->|
/// |---------f8---------------|--f9--| |-----f10--------|
///
/// - Round 2: 3 files f8, f9, f10 are in one branch and compacted into 2 files
/// Output: two files f11, f12
/// |-------------------f11---------------------|---f12---|
///
/// - Stop condition meets and the final output is f11 & F12
///
/// -----------------------------------------------------------------------------------------------------
/// VERSION 1 - Naive (implemented here): One round, one branch
///
/// . All L0 and L1 files will be compacted into one or two files if the size > estimated desired max size
/// . We do not generate L2 files in this version.
///
///
/// Example: same Partition has 7 files: f1, f2, f3, f4, f5, f6, f7
/// Input:
/// |--f1--| |----f3----| |-f4-||-f5-||-f7-|
/// |------f2----------| |--f6--|
/// Output:
/// |---------f8-------------------------------|----f9----|
///
/// -----------------------------------------------------------------------------------------------------
/// VERSION 2 - HotCold (in-progress and will be adding in here under feature flag and new components & filters)
/// . Mutiple rounds, each round has 1 branch
/// . Each branch will compact files lowest level (aka initial level) into its next level (aka target level):
/// - hot: (L0s & L1s) to L1s if there are L0s
/// - cold: (L1s & L2s) to L2s if no L0s
/// . Each branch does find non-overlaps and upgragde files to avoid unecessary recompacting.
/// The actually split files:
/// 1. files_higher_level: do not compact these files because they are already higher than target level
/// . Value: nothing for hot and L2s for cold
/// 2. files_non_overlap: do not compact these target-level files because they are not overlapped
/// with the initial-level files
/// . Value: non-overlapped L1s for hot and non-overlapped L2s for cold
/// . Definition of overlaps is defined in the split non-overlapped files function
/// 3. files_upgrade: upgrade this initial-level files to target level because they are not overlap with
/// any target-level and initial-level files and large enough (> desired max size)
/// . value: non-overlapped L0s for hot and non-overlapped L1s for cold
/// 4. files_compact: the rest of the files that must be compacted
/// . Value: (L0s & L1s) for hot and (L1s & L2s) for cold
///
/// Example: 4 files: two L0s, two L1s and one L2
/// Input:
/// |-L0.1-||------L0.2-------|
/// |-----L1.1-----||--L1.2--|
/// |----L2.1-----|
///
/// - Round 1: There are L0s, let compact L0s with L1s. But let split them first:
/// . files_higher_level: L2.1
/// . files_non_overlap: L1.1
/// . files_upgrade: L0.2
/// . files_compact: L0.1, L1.2
/// Output: 4 files
/// |------L1.4-------|
/// |-----L1.1-----||-new L1.3 -| ^
/// |----L2.1-----| ^ |
/// | result of upgrading L0.2
/// result of compacting L0.1, L1.2
///
/// - Round 2: Compact those 4 files
/// Output: two L2 files
/// |-----------------L2.2---------------------||-----L2.3------|
///
/// Note:
/// . If there are no L0s files in the partition, the first round can just compact L1s and L2s to L2s
/// . Round 2 happens or not depends on the stop condition
async fn try_compact_partition(
partition_id: PartitionId,
job_semaphore: Arc<InstrumentedAsyncSemaphore>,
@ -91,8 +194,13 @@ async fn try_compact_partition(
let mut lazy_partition_info = None;
loop {
// TODO: I do not think we need this in version 1 becasue we do not generate L2 files
// So either remove this or move it inside the partition filter for Naive version
// This filter will stop HotCold version from working correctly
files = components.files_filter.apply(files);
// This is the stop condition which will be different for different version of compaction
// and describe where the filter is created at version_specific_partition_filters function
if !components
.partition_filter
.apply(partition_id, &files)
@ -109,10 +217,12 @@ async fn try_compact_partition(
let (files_now, files_later) = components.round_split.split(files);
// Each branch must not overlap with each other
let mut branches = components.divide_initial.divide(files_now);
let mut files_next = files_later;
while let Some(branch) = branches.pop() {
let target_level = CompactionLevel::FileNonOverlapped;
let delete_ids = branch.iter().map(|f| f.id).collect::<Vec<_>>();
// compute max_l0_created_at
@ -151,7 +261,6 @@ async fn try_compact_partition(
// . identify right files for hot/cold compaction
// . filter right amount of files
// . compact many steps hot/cold (need more thinking)
let target_level = CompactionLevel::FileNonOverlapped;
let plan = components
.df_planner
.plan(branch_inpad, Arc::clone(partition_info), target_level)
@ -169,7 +278,18 @@ async fn try_compact_partition(
job.await?
};
// The minimum to add in this PR for reviewers to go with the idea
// TODO: refactor the code above to create these values
let higher_level_files = vec![];
let compact_result = CompactResult {
non_overlapping_files: vec![],
upgraded_files: vec![],
created_file_params: create,
deleted_ids: delete_ids,
};
// upload files to real object store
let create = compact_result.created_file_params;
let output_files: Vec<ParquetFilePath> = create.iter().map(|p| p.into()).collect();
let output_uuids = scratchpad_ctx.make_public(&output_files).await;
let create: Vec<_> = create
@ -184,23 +304,53 @@ async fn try_compact_partition(
// clean scratchpad
scratchpad_ctx.clean_from_scratchpad(&input_paths).await;
let ids = components
// Update the catalog to refelct the newly created files, soft delete the compacted files and
// update the the upgraded files
let upgraded_ids = compact_result
.upgraded_files
.iter()
.map(|f| f.id)
.collect::<Vec<_>>();
let deleted_ids = compact_result.deleted_ids;
let create_ids = components
.commit
.commit(partition_id, &delete_ids, &create)
.commit(
partition_id,
&deleted_ids,
&upgraded_ids,
&create,
target_level,
)
.await;
// Extend created files, files_higher_level, overlapped_files and upgraded_files to files_next
files_next.extend(
create
.into_iter()
.zip(ids)
.zip(create_ids)
.map(|(params, id)| ParquetFile::from_params(params, id)),
);
files_next.extend(higher_level_files);
files_next.extend(compact_result.upgraded_files);
files_next.extend(compact_result.non_overlapping_files);
}
files = files_next;
}
}
/// Struct to store result of compaction results
struct CompactResult {
/// files of target_level that do not overlap with (target_level - 1)
non_overlapping_files: Vec<ParquetFile>,
/// files of (target_level -1) that will be upgraded to target_level
upgraded_files: Vec<ParquetFile>,
/// result of compaction of [files_compact - (files_non_overlap + files_upgrade)]
created_file_params: Vec<ParquetFileParams>,
/// ids of files in [files_compact - (files_non_overlap + files_upgrade)]
deleted_ids: Vec<ParquetFileId>,
}
async fn fetch_partition_info(
partition_id: PartitionId,
components: &Arc<Components>,