fix: Clean up comments as I read through
parent
f1467cf4d8
commit
959f0d3e02
|
@ -193,18 +193,21 @@ impl CompactAndUpgrade {
|
|||
}
|
||||
}
|
||||
|
||||
/// Data points need to run a compactor
|
||||
/// Data points needed to run a compactor
|
||||
#[derive(Debug)]
|
||||
pub struct Compactor {
|
||||
/// Sequencers assigned to this compactor
|
||||
sequencers: Vec<SequencerId>,
|
||||
|
||||
/// Object store for reading and persistence of parquet files
|
||||
store: ParquetStorage,
|
||||
|
||||
/// The global catalog for schema, parquet files and tombstones
|
||||
catalog: Arc<dyn Catalog>,
|
||||
|
||||
/// Executor for running queries and compacting and persisting
|
||||
/// Executor for running queries, compacting, and persisting
|
||||
exec: Arc<Executor>,
|
||||
|
||||
/// Time provider for all activities in this compactor
|
||||
pub time_provider: Arc<dyn TimeProvider>,
|
||||
|
||||
|
@ -459,7 +462,8 @@ impl Compactor {
|
|||
|
||||
/// Group files to be compacted together and level-0 files that will get upgraded
|
||||
/// for a given partition.
|
||||
/// The number of compacting files per group will be limited by thier total size and number of files
|
||||
/// The number of compacting files per group will be limited by their total size and number of
|
||||
/// files.
|
||||
pub async fn groups_to_compact_and_files_to_upgrade(
|
||||
&self,
|
||||
partition_id: PartitionId,
|
||||
|
@ -468,7 +472,7 @@ impl Compactor {
|
|||
) -> Result<CompactAndUpgrade> {
|
||||
let mut compact_and_upgrade = CompactAndUpgrade::new(None);
|
||||
|
||||
// List all valid (not soft deletded) files of the partition
|
||||
// List all valid (not soft deleted) files of the partition
|
||||
let parquet_files = self
|
||||
.catalog
|
||||
.repositories()
|
||||
|
@ -491,8 +495,8 @@ impl Compactor {
|
|||
compaction_max_file_count,
|
||||
)?;
|
||||
|
||||
// Group time-contiguous non-overlapped groups if their total size is smaller than a threshold
|
||||
// If their
|
||||
// Group time-contiguous non-overlapped groups if their total size is smaller than a
|
||||
// threshold
|
||||
let compact_file_groups = Self::group_small_contiguous_groups(
|
||||
overlapped_file_groups,
|
||||
compaction_max_size_bytes,
|
||||
|
@ -524,7 +528,8 @@ impl Compactor {
|
|||
/// Runs compaction in a partition resolving any tombstones and deduplicating data
|
||||
///
|
||||
/// Expectation: After a partition of a table has not received any writes for some
|
||||
/// amount of time, the compactor will ensure it is stored in object store as N parquet files which:
|
||||
/// amount of time, the compactor will ensure it is stored in object store as N parquet files
|
||||
/// which:
|
||||
/// . have non overlapping time ranges
|
||||
/// . each does not exceed a size specified by config param max_desired_file_size_bytes.
|
||||
///
|
||||
|
@ -566,7 +571,12 @@ impl Compactor {
|
|||
let original_parquet_file_ids: Vec<_> =
|
||||
group.parquet_files.iter().map(|f| f.id).collect();
|
||||
let size: i64 = group.parquet_files.iter().map(|f| f.file_size_bytes).sum();
|
||||
info!(num_files=%group.parquet_files.len(), ?size, ?original_parquet_file_ids, "compacting group of files");
|
||||
info!(
|
||||
num_files=%group.parquet_files.len(),
|
||||
?size,
|
||||
?original_parquet_file_ids,
|
||||
"compacting group of files"
|
||||
);
|
||||
|
||||
// Compact the files concurrently.
|
||||
//
|
||||
|
@ -676,10 +686,13 @@ impl Compactor {
|
|||
|
||||
// Group time-contiguous non-overlapped groups if their total size is smaller than a threshold
|
||||
// There are 2 types of input groups
|
||||
// 1. Type-1: Groups that inlcude overlapped files but the groups do not overlap with other groups
|
||||
//
|
||||
// 1. Type-1: Groups that inlcude overlapped files but the groups do not overlap with other
|
||||
// groups
|
||||
// 2. Type-2: Groups that overlap with other group
|
||||
// We can only combine time-contiguous type-1 groups to avoid a buggy situation where the groups overlap
|
||||
// in both time and sequence numbers
|
||||
//
|
||||
// We can only combine time-contiguous type-1 groups to avoid a buggy situation where the
|
||||
// groups overlap in both time and sequence numbers
|
||||
//
|
||||
// Example:
|
||||
// n_m: chunk with range of sequence numbers [n, m] where n <= m
|
||||
|
@ -715,7 +728,8 @@ impl Compactor {
|
|||
//
|
||||
// Note that even if groups [6_6] and [4_4] are very small, they cannot be combined
|
||||
// because chunk 6_6 overlaps with chunk 5_5. Combining [6_6] and [4_4] will lead to
|
||||
// a bad comacting result of 2 chunks 5_5 and 4_6 that overlap in both time and sequence numbers
|
||||
// a bad comacting result of 2 chunks 5_5 and 4_6 that overlap in both time and sequence
|
||||
// numbers
|
||||
|
||||
fn group_small_contiguous_groups(
|
||||
mut file_groups: Vec<GroupWithMinTimeAndSize>,
|
||||
|
@ -822,8 +836,8 @@ impl Compactor {
|
|||
// Collect all unique tombstone
|
||||
let mut tombstone_map = overlapped_files[0].tombstone_map();
|
||||
|
||||
// Verify if the given files belong to the same partition and collect their tombstones
|
||||
// One tombstone might be relevant to multiple parquet files in this set, so dedupe here.
|
||||
// Verify if the given files belong to the same partition and collect their tombstones.
|
||||
// One tombstone might be relevant to multiple parquet files in this set, so dedupe here.
|
||||
if let Some((head, tail)) = overlapped_files.split_first() {
|
||||
for file in tail {
|
||||
tombstone_map.append(&mut file.tombstone_map());
|
||||
|
@ -851,8 +865,7 @@ impl Compactor {
|
|||
}
|
||||
}
|
||||
|
||||
// Convert the input files into QueryableParquetChunk for making query
|
||||
// plan
|
||||
// Convert the input files into QueryableParquetChunk for making query plan
|
||||
let query_chunks: Vec<_> = overlapped_files
|
||||
.iter()
|
||||
.map(|f| {
|
||||
|
@ -986,9 +999,9 @@ impl Compactor {
|
|||
|
||||
// Now that the parquet file is available, create its processed tombstones
|
||||
for (_, tombstone) in catalog_update.tombstones {
|
||||
// Becasue data may get removed and split during compaction, a few new files
|
||||
// Because data may get removed and split during compaction, a few new files
|
||||
// may no longer overlap with the delete tombstones. Need to verify whether
|
||||
// they are overlap before adding process tombstones
|
||||
// they overlap before adding process tombstones
|
||||
if (parquet.min_time <= tombstone.min_time
|
||||
&& parquet.max_time >= tombstone.min_time)
|
||||
|| (parquet.min_time > tombstone.min_time
|
||||
|
@ -1016,14 +1029,18 @@ impl Compactor {
|
|||
// their total size is too large.
|
||||
//
|
||||
// Invariants
|
||||
// 1. Input `groups`, each contains overlapped files but the groups do not overlap with each other.
|
||||
// As a result, a group can include files whose total size is larger than our maximum allocated memory.
|
||||
//
|
||||
// 1. Input `groups`, each contains overlapped files but the groups do not overlap with each
|
||||
// other. As a result, a group can include files whose total size is larger than our
|
||||
// maximum allocated memory.
|
||||
// 2. Output will be in OverlappedGroups type that include 2 different groups of files:
|
||||
// . internal_overlapped_groups: groups of the input that are neither contains too many files
|
||||
// nor their total file size is too large. Each of these groups do not overlap with any other groups.
|
||||
// . external_overlapped_groups: subgroups of splitting too-large-or-too-many-file input groups.
|
||||
// Each of these subgroups will overlap with at least one of the other subgroups. However, to correclty
|
||||
// deduplicate data, each subgroup only include contiguous sequence numbers.
|
||||
// - internal_overlapped_groups: groups of the input that are neither contains too many
|
||||
// files nor their total file size is too large. Each of these groups do not overlap
|
||||
// with any other groups.
|
||||
// - external_overlapped_groups: subgroups of splitting too-large-or-too-many-file input
|
||||
// groups.
|
||||
// Each of these subgroups will overlap with at least one of the other subgroups. However,
|
||||
// to correctly deduplicate data, each subgroup only includes contiguous sequence numbers.
|
||||
//
|
||||
// Example:
|
||||
// The input `groups` consists of 4 groups of 9 chunks with time ranges as below.
|
||||
|
@ -1051,8 +1068,8 @@ impl Compactor {
|
|||
// G1[C4] G2[C1, C2, C3, C6] G3[C5, C8, C9] G4[C7]
|
||||
//
|
||||
//
|
||||
// Since total size of files in G2 and G3 is over the limit `max_size_bytes`, they are split further,
|
||||
// each into 2 groups. The output will inlcude 6 groups as follows:
|
||||
// Since total size of files in G2 and G3 is over the limit `max_size_bytes`, they are split
|
||||
// further, each into 2 groups. The output will inlcude 6 groups as follows:
|
||||
// Time
|
||||
// ────────────────────────────────────────────────────────────────────────────────────▶
|
||||
//
|
||||
|
@ -1070,10 +1087,10 @@ impl Compactor {
|
|||
// G1[C4] G2[C1, C2]* G4[C5, C8]* G6[C7]
|
||||
// G3[C3, C6]* G5[C9]*
|
||||
//
|
||||
// The newly split group are G2, G3, G4 and G5 and marked with a star (*) at the end.
|
||||
// The newly split groups are G2, G3, G4 and G5 and marked with a star (*) at the end.
|
||||
// Before splitting, the chunks are sorted by their sequence numbers to guarantee their data
|
||||
// will be deduplicated correctly in this and future compaction cycles. As a consequence,
|
||||
// new group may inlcude non-overlapped chunks, e.g. G2. But this is not an issue becasue:
|
||||
// the new group may inlcude non-overlapped chunks, e.g. G2. But this is not an issue becasue:
|
||||
// . The compaction plan will discover whether the chunks overlap or not to avoid the
|
||||
// actual deduplication work
|
||||
// . We do want to compact/concat small continuous non-overlapped chunks (in later steps) so
|
||||
|
@ -1091,19 +1108,19 @@ impl Compactor {
|
|||
{
|
||||
overlapped_groups.add_internal_overalapped_group(group.to_vec());
|
||||
} else {
|
||||
// Sort overlapped files on their min sequence number to ensure their split subgroups
|
||||
// contain contiguous sequnce numbers
|
||||
// Sort overlapped files on their min sequence number to ensure their split
|
||||
// subgroups contain contiguous sequnce numbers
|
||||
group.sort_by_key(|f| f.min_sequence_number);
|
||||
|
||||
// Verify that the sorted ranges of [min_sequence_number, max_sequence_number] do not
|
||||
// overlap if their time ranges overlap
|
||||
// Verify that the sorted ranges of [min_sequence_number, max_sequence_number] do
|
||||
// not overlap if their time ranges overlap
|
||||
// Note that: `https://github.com/influxdata/conductor/issues/1009`
|
||||
// 1. the input groups includes time-overlaped files but 2 files in them may NOT overlap but
|
||||
// both overlap with a third file
|
||||
// 2. Since we split large compacted result into many files in previous cycles, files
|
||||
// in this cycle can have exact same range of sequence numbers
|
||||
// Points 1 and 2 together will lead to many non-time-overlapped files with same sequence number ranges
|
||||
// can end up in the same time-overlapped group
|
||||
// 1. the input groups includes time-overlaped files but 2 files in them may NOT
|
||||
// overlap but both overlap with a third file
|
||||
// 2. Since we split large compacted result into many files in previous cycles,
|
||||
// files in this cycle can have exact same range of sequence numbers
|
||||
// Points 1 and 2 together will lead to many non-time-overlapped files with the
|
||||
// same sequence number ranges ending up in the same time-overlapped group
|
||||
for i in 1..group.len() {
|
||||
Self::verify_contiguous_files(&group[i - 1], &group[i])?
|
||||
}
|
||||
|
@ -1272,7 +1289,8 @@ impl Compactor {
|
|||
async fn fully_processed(&self, tombstone: Tombstone) -> bool {
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
|
||||
// Get number of non-deleted parquet files of the same tableId & sequencerId that overlap with the tombstone time range
|
||||
// Get number of non-deleted parquet files of the same table ID & sequencer ID that overlap
|
||||
// with the tombstone time range
|
||||
let count_pf = repos
|
||||
.parquet_files()
|
||||
.count_by_overlaps(
|
||||
|
@ -1287,15 +1305,19 @@ impl Compactor {
|
|||
Ok(count_pf) => count_pf,
|
||||
_ => {
|
||||
warn!(
|
||||
"Error getting parquet file count for table ID {}, sequencer ID {}, min time {:?}, max time {:?}.
|
||||
Won't be able to verify whether its tombstone is fully processed",
|
||||
tombstone.table_id, tombstone.sequencer_id, tombstone.min_time, tombstone.max_time
|
||||
"Error getting parquet file count for table ID {}, sequencer ID {}, min time \
|
||||
{:?}, max time {:?}. Won't be able to verify whether its tombstone is fully \
|
||||
processed",
|
||||
tombstone.table_id,
|
||||
tombstone.sequencer_id,
|
||||
tombstone.min_time,
|
||||
tombstone.max_time
|
||||
);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
// Get number of the processed parquet file for this tombstones
|
||||
// Get number of the processed parquet file for this tombstone
|
||||
let count_pt = repos
|
||||
.processed_tombstones()
|
||||
.count_by_tombstone_id(tombstone.id)
|
||||
|
@ -1400,7 +1422,7 @@ impl Compactor {
|
|||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
struct OverlappedGroups {
|
||||
// Groups that contain overlapped files but the groups do not overlapp
|
||||
// Groups that contain overlapped files but the groups do not overlap
|
||||
// with other groups
|
||||
internal_overlapped_groups: Vec<Vec<ParquetFile>>,
|
||||
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
//! IOx compactor implementation.
|
||||
//!
|
||||
|
||||
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
|
||||
#![warn(
|
||||
|
|
Loading…
Reference in New Issue