Merge pull request #5303 from influxdata/cn/upgrade-cold-nonoverlapping-l0

feat: Compact cold partitions; upgrade a single non-overlapping level 0 file to level 1 without running compaction
pull/24376/head
kodiakhq[bot] 2022-08-04 21:02:28 +00:00 committed by GitHub
commit 4898a7f1e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2116 additions and 593 deletions

View File

@ -69,13 +69,14 @@ pub struct CompactorConfig {
)]
pub split_percentage: u16,
/// The compactor will limit the number of simultaneous compaction jobs based on the
/// size of the input files to be compacted. This number should be less than 1/10th
/// of the available memory to ensure compactions have
/// enough space to run.
/// Default is 1,073,741,824 bytes (1GB).
/// The number of compact_partititons run in parallel is determined by:
/// max_concurrent_size_bytes/input_size_threshold_bytes
/// The compactor will limit the number of simultaneous hot partition compaction jobs based on
/// the size of the input files to be compacted. This number should be less than 1/10th of the
/// available memory to ensure compactions have enough space to run.
///
/// Default is 1024 * 1024 * 1024 = 1,073,741,824 bytes (1GB).
//
// The number of compact_hot_partititons run in parallel is determined by:
// max_concurrent_size_bytes/input_size_threshold_bytes
#[clap(
long = "--compaction-concurrent-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES",
@ -84,6 +85,22 @@ pub struct CompactorConfig {
)]
pub max_concurrent_size_bytes: u64,
/// The compactor will limit the number of simultaneous cold partition compaction jobs based on
/// the size of the input files to be compacted. This number should be less than 1/10th of the
/// available memory to ensure compactions have enough space to run.
///
/// Default is 1024 * 1024 * 900 = 943,718,400 bytes (900MB).
//
// The number of compact_cold_partititons run in parallel is determined by:
// max_cold_concurrent_size_bytes/cold_input_size_threshold_bytes
#[clap(
long = "--compaction-cold-concurrent-size-bytes",
env = "INFLUXDB_IOX_COMPACTION_COLD_CONCURRENT_SIZE_BYTES",
default_value = "943718400",
action
)]
pub max_cold_concurrent_size_bytes: u64,
/// Max number of partitions per sequencer we want to compact per cycle
/// Default: 1
#[clap(
@ -104,14 +121,14 @@ pub struct CompactorConfig {
)]
pub min_number_recent_ingested_files_per_partition: usize,
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
/// compact together until the total size of input files crosses this threshold. Later
/// compactions will pick up the remaining L0 files.
/// A compaction operation for hot partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total size of input files crosses this
/// threshold. Later compactions will pick up the remaining L0 files.
///
/// A compaction operation will be limited by this or by the file count threshold, whichever is
/// hit first.
///
/// Default is 1024 * 1024 * 100 = 100,048,576 (100MB).
/// Default is 1024 * 1024 * 100 = 100,048,576 bytes (100MB).
#[clap(
long = "--compaction-input-size-threshold-bytes",
env = "INFLUXDB_IOX_COMPACTION_INPUT_SIZE_THRESHOLD_BYTES",
@ -120,6 +137,19 @@ pub struct CompactorConfig {
)]
pub input_size_threshold_bytes: u64,
/// A compaction operation for cold partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total size of input files crosses this
/// threshold. Later compactions will pick up the remaining L0 files.
///
/// Default is 1024 * 1024 * 600 = 629,145,600 bytes (600MB).
#[clap(
long = "--compaction-cold-input-size-threshold-bytes",
env = "INFLUXDB_IOX_COMPACTION_COLD_INPUT_SIZE_THRESHOLD_BYTES",
default_value = "629145600",
action
)]
pub cold_input_size_threshold_bytes: u64,
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
/// compact together until the total number of L0 + L1 files crosses this threshold. Later
/// compactions will pick up the remaining L0 files.
@ -135,4 +165,17 @@ pub struct CompactorConfig {
action
)]
pub input_file_count_threshold: usize,
/// The multiple of times that compacting hot partitions should run for every one time that
/// compacting cold partitions runs. Set to 1 to compact hot partitions and cold partitions
/// equally.
///
/// Default is 4.
#[clap(
long = "--compaction-hot-multiple",
env = "INFLUXDB_IOX_COMPACTION_HOT_MULTIPLE",
default_value = "4",
action
)]
pub hot_multiple: usize,
}

View File

@ -238,7 +238,7 @@ impl Compactor {
/// * In all cases above, for each sequencer, N partitions with the most new ingested files
/// will be selected and the return list will include at most, P = N * S, partitions where S
/// is the number of sequencers this compactor handles.
pub async fn partitions_to_compact(
pub async fn hot_partitions_to_compact(
&self,
// Max number of the most recent highest ingested throughput partitions
// per sequencer we want to read
@ -252,8 +252,10 @@ impl Compactor {
let mut repos = self.catalog.repositories().await;
for sequencer_id in &self.sequencers {
let attributes =
Attributes::from([("sequencer_id", format!("{}", *sequencer_id).into())]);
let attributes = Attributes::from([
("sequencer_id", format!("{}", *sequencer_id).into()),
("partition_type", "hot".into()),
]);
// Get the most recent highest ingested throughput partitions within
// the last 4 hours. If nothing, increase to 24 hours
@ -285,35 +287,57 @@ impl Compactor {
}
}
// No active ingesting partitions the last 24 hours,
// get partition with the most level-0 files
if num_partitions == 0 {
debug!(
sequencer_id = sequencer_id.get(),
"no active ingesting partitions",
);
// Record metric for candidates per sequencer
debug!(
sequencer_id = sequencer_id.get(),
n = num_partitions,
"hot compaction candidates",
);
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
number_gauge.set(num_partitions as u64);
}
let mut partitions = repos
.parquet_files()
.most_level_0_files_partitions(*sequencer_id, max_num_partitions_per_sequencer)
.await
.context(MostL0PartitionsSnafu {
sequencer_id: *sequencer_id,
})?;
Ok(candidates)
}
if !partitions.is_empty() {
num_partitions = partitions.len();
candidates.append(&mut partitions);
}
}
/// Return a list of partitions that:
///
/// - Have not received any writes in 24 hours (determined by all parquet files having a
/// created_at time older than 24 hours ago)
/// - Have some level 0 parquet files that need to be upgraded or compacted
pub async fn cold_partitions_to_compact(
&self,
// Max number of cold partitions per sequencer we want to compact
max_num_partitions_per_sequencer: usize,
) -> Result<Vec<PartitionParam>> {
let mut candidates =
Vec::with_capacity(self.sequencers.len() * max_num_partitions_per_sequencer);
let mut repos = self.catalog.repositories().await;
for sequencer_id in &self.sequencers {
let attributes = Attributes::from([
("sequencer_id", format!("{}", *sequencer_id).into()),
("partition_type", "cold".into()),
]);
let mut partitions = repos
.parquet_files()
.most_level_0_files_partitions(*sequencer_id, 24, max_num_partitions_per_sequencer)
.await
.context(MostL0PartitionsSnafu {
sequencer_id: *sequencer_id,
})?;
let num_partitions = partitions.len();
candidates.append(&mut partitions);
// Record metric for candidates per sequencer
debug!(
sequencer_id = sequencer_id.get(),
n = num_partitions,
"compaction candidates",
"cold compaction candidates",
);
let number_gauge = self.compaction_candidate_gauge.recorder(attributes.clone());
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
number_gauge.set(num_partitions as u64);
}
@ -454,7 +478,7 @@ mod tests {
use uuid::Uuid;
#[tokio::test]
async fn test_candidate_partitions() {
async fn test_hot_partitions_to_compact() {
let catalog = TestCatalog::new();
// Create a db with 2 sequencers, one with 4 empty partitions and the other one with one
@ -465,7 +489,12 @@ mod tests {
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create("namespace_candidate_partitions", "inf", kafka.id, pool.id)
.create(
"namespace_hot_partitions_to_compact",
"inf",
kafka.id,
pool.id,
)
.await
.unwrap();
let table = txn
@ -542,9 +571,6 @@ mod tests {
// Some times in the past to set to created_at of the files
let time_now = Timestamp::new(compactor.time_provider.now().timestamp_nanos());
let _time_one_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos(),
);
let time_three_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60 * 3)).timestamp_nanos(),
);
@ -573,7 +599,7 @@ mod tests {
};
// Note: The order of the test cases below is important and should not be changed
// becasue they depend on the order of the writes and their content. For example,
// because they depend on the order of the writes and their content. For example,
// in order to test `Case 3`, we do not need to add asserts for `Case 1` and `Case 2`,
// but all the writes, deletes and updates in Cases 1 and 2 are a must for testing Case 3.
// In order words, the last Case needs all content of previous tests.
@ -582,7 +608,7 @@ mod tests {
// --------------------------------------
// Case 1: no files yet --> no partition candidates
//
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
@ -606,13 +632,14 @@ mod tests {
.unwrap();
txn.commit().await.unwrap();
// No non-deleted level 0 files yet --> no candidates
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
// Case 3: no new recent writes (within the last 24 hours) --> return candidates with the most L0
// Case 3: no new recent writes (within the last 24 hours) --> no partition candidates
// (the cold case will pick them up)
//
// partition2 has an old (more 24 hours ago) non-deleted level 0 file
// partition2 has an old (more than 24 hours ago) non-deleted level 0 file
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p3 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
@ -622,11 +649,10 @@ mod tests {
};
let _pf3 = txn.parquet_files().create(p3).await.unwrap();
txn.commit().await.unwrap();
//
// Has at least one partition with a L0 file --> make it a candidate
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition2.id);
// No hot candidates
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
// Case 4: has one partition with recent writes (5 hours ago) --> return that partition
@ -643,7 +669,7 @@ mod tests {
txn.commit().await.unwrap();
//
// Has at least one partition with a recent write --> make it a candidate
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition4.id);
@ -665,28 +691,28 @@ mod tests {
txn.commit().await.unwrap();
//
// make partitions in the most recent group candidates
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition3.id);
// --------------------------------------
// Case 6: has partittion candidates for 2 sequecers
// Case 6: has partition candidates for 2 sequencers
//
// The another_sequencer now has non-deleted level-0 file ingested 38 hours ago
// The another_sequencer now has non-deleted level-0 file ingested 5 hours ago
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p6 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
sequencer_id: another_sequencer.id,
table_id: another_table.id,
partition_id: another_partition.id,
created_at: time_38_hour_ago,
created_at: time_five_hour_ago,
..p1.clone()
};
let _pf6 = txn.parquet_files().create(p6).await.unwrap();
txn.commit().await.unwrap();
//
// Will have 2 candidates, one for each sequencer
let mut candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let mut candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
candidates.sort();
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].partition_id, partition3.id);
@ -723,19 +749,307 @@ mod tests {
let percentage_max_file_size = 30;
let split_percentage = 80;
let max_concurrent_size_bytes = 100_000;
let max_cold_concurrent_size_bytes = 90_000;
let max_number_partitions_per_sequencer = 1;
let min_number_recent_ingested_per_partition = 1;
let input_size_threshold_bytes = 300 * 1024 * 1024;
let cold_input_size_threshold_bytes = 600 * 1024 * 1024;
let input_file_count_threshold = 100;
let hot_multiple = 4;
CompactorConfig::new(
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
max_concurrent_size_bytes,
max_cold_concurrent_size_bytes,
max_number_partitions_per_sequencer,
min_number_recent_ingested_per_partition,
input_size_threshold_bytes,
cold_input_size_threshold_bytes,
input_file_count_threshold,
hot_multiple,
)
}
#[tokio::test]
async fn test_cold_partitions_to_compact() {
let catalog = TestCatalog::new();
// Create a db with 2 sequencers, one with 4 empty partitions and the other one with one
// empty partition
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create(
"namespace_hot_partitions_to_compact",
"inf",
kafka.id,
pool.id,
)
.await
.unwrap();
let table = txn
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = txn
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition1 = txn
.partitions()
.create_or_get("one".into(), sequencer.id, table.id)
.await
.unwrap();
let partition2 = txn
.partitions()
.create_or_get("two".into(), sequencer.id, table.id)
.await
.unwrap();
let partition3 = txn
.partitions()
.create_or_get("three".into(), sequencer.id, table.id)
.await
.unwrap();
let partition4 = txn
.partitions()
.create_or_get("four".into(), sequencer.id, table.id)
.await
.unwrap();
let partition5 = txn
.partitions()
.create_or_get("five".into(), sequencer.id, table.id)
.await
.unwrap();
// other sequencer
let another_table = txn
.tables()
.create_or_get("another_test_table", namespace.id)
.await
.unwrap();
let another_sequencer = txn
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(2))
.await
.unwrap();
let another_partition = txn
.partitions()
.create_or_get(
"another_partition".into(),
another_sequencer.id,
another_table.id,
)
.await
.unwrap();
// update sort key for this another_partition
let another_partition = txn
.partitions()
.update_sort_key(another_partition.id, &["tag1", "time"])
.await
.unwrap();
txn.commit().await.unwrap();
// Create a compactor
let time_provider = Arc::new(SystemProvider::new());
let config = make_compactor_config();
let compactor = Compactor::new(
vec![sequencer.id, another_sequencer.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
time_provider,
BackoffConfig::default(),
config,
Arc::new(metric::Registry::new()),
);
// Some times in the past to set to created_at of the files
let time_five_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60 * 5)).timestamp_nanos(),
);
let time_38_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(),
);
// Basic parquet info
let p1 = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: table.id,
partition_id: partition1.id,
object_store_id: Uuid::new_v4(),
max_sequence_number: SequenceNumber::new(100),
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
file_size_bytes: 1337,
row_count: 0,
compaction_level: CompactionLevel::Initial, // level of file of new writes
created_at: time_38_hour_ago, // create cold files by default
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
};
// Note: The order of the test cases below is important and should not be changed
// because they depend on the order of the writes and their content. For example,
// in order to test `Case 3`, we do not need to add asserts for `Case 1` and `Case 2`,
// but all the writes, deletes and updates in Cases 1 and 2 are a must for testing Case 3.
// In order words, the last Case needs all content of previous tests.
// This shows the priority of selecting compaction candidates
// --------------------------------------
// Case 1: no files yet --> no partition candidates
//
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
// Case 2: no non-deleleted cold L0 files --> no partition candidates
//
// partition1 has a cold deleted L0
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let pf1 = txn.parquet_files().create(p1.clone()).await.unwrap();
txn.parquet_files().flag_for_delete(pf1.id).await.unwrap();
//
// partition2 has a cold non-L0 file
let p2 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition2.id,
..p1.clone()
};
let pf2 = txn.parquet_files().create(p2).await.unwrap();
txn.parquet_files()
.update_to_level_1(&[pf2.id])
.await
.unwrap();
txn.commit().await.unwrap();
// No non-deleted level 0 files yet --> no candidates
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
// Case 3: no new recent writes (within the last 24 hours) --> return that partition
//
// partition2 has a cold (more than 24 hours ago) non-deleted level 0 file
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p3 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition2.id,
..p1.clone()
};
let _pf3 = txn.parquet_files().create(p3).await.unwrap();
txn.commit().await.unwrap();
//
// Has at least one partition with a L0 file --> make it a candidate
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition2.id);
// --------------------------------------
// Case 4: has two cold partitions --> return the candidate with the most L0
//
// partition4 has two cold non-deleted level 0 files
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p4 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition4.id,
..p1.clone()
};
let _pf4 = txn.parquet_files().create(p4).await.unwrap();
let p5 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition4.id,
..p1.clone()
};
let _pf5 = txn.parquet_files().create(p5).await.unwrap();
txn.commit().await.unwrap();
// Partition with the most l0 files is the candidate
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition4.id);
// --------------------------------------
// Case 5: "warm" and "hot" partitions aren't returned
//
// partition3 has one cold level 0 file and one hot level 0 file
// partition5 has one hot level 0 file
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p3_cold = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition3.id,
..p1.clone()
};
let _pf3_cold = txn.parquet_files().create(p3_cold).await.unwrap();
let p3_hot = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition3.id,
created_at: time_five_hour_ago,
..p1.clone()
};
let _pf3_hot = txn.parquet_files().create(p3_hot).await.unwrap();
let p5_hot = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition5.id,
created_at: time_five_hour_ago,
..p1.clone()
};
let _pf5_hot = txn.parquet_files().create(p5_hot).await.unwrap();
txn.commit().await.unwrap();
// Partition4 is still the only candidate
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition4.id);
// Ask for 2 partitions per sequencer; get partition4 and partition2
let candidates = compactor.cold_partitions_to_compact(2).await.unwrap();
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].partition_id, partition4.id);
assert_eq!(candidates[1].partition_id, partition2.id);
// Ask for 3 partitions per sequencer; still get only partition4 and partition2
let candidates = compactor.cold_partitions_to_compact(3).await.unwrap();
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].partition_id, partition4.id);
assert_eq!(candidates[1].partition_id, partition2.id);
// --------------------------------------
// Case 6: has partition candidates for 2 sequencers
//
// The another_sequencer now has non-deleted level-0 file ingested 38 hours ago
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p6 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
sequencer_id: another_sequencer.id,
table_id: another_table.id,
partition_id: another_partition.id,
created_at: time_38_hour_ago,
..p1.clone()
};
let _pf6 = txn.parquet_files().create(p6).await.unwrap();
txn.commit().await.unwrap();
// Will have 2 candidates, one for each sequencer
let mut candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
candidates.sort();
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].partition_id, partition4.id);
assert_eq!(candidates[0].sequencer_id, sequencer.id);
assert_eq!(candidates[1].partition_id, another_partition.id);
assert_eq!(candidates[1].sequencer_id, another_sequencer.id);
// Ask for 2 candidates per sequencer; get back 3: 2 from sequencer and 1 from
// another_sequencer
let mut candidates = compactor.cold_partitions_to_compact(2).await.unwrap();
candidates.sort();
assert_eq!(candidates.len(), 3);
assert_eq!(candidates[0].partition_id, partition2.id);
assert_eq!(candidates[0].sequencer_id, sequencer.id);
assert_eq!(candidates[1].partition_id, partition4.id);
assert_eq!(candidates[1].sequencer_id, sequencer.id);
assert_eq!(candidates[2].partition_id, another_partition.id);
assert_eq!(candidates[2].sequencer_id, another_sequencer.id);
}
}

View File

@ -126,11 +126,15 @@ pub struct CompactorConfig {
/// This value must be between (0, 100)
split_percentage: u16,
/// The compactor will limit the number of simultaneous compaction jobs based on the
/// size of the input files to be compacted. This number should be less than 1/10th
/// of the available memory to ensure compactions have
/// enough space to run.
max_concurrent_compaction_size_bytes: u64,
/// The compactor will limit the number of simultaneous hot partition compaction jobs based on
/// the size of the input files to be compacted. This number should be less than 1/10th of the
/// available memory to ensure compactions have enough space to run.
max_concurrent_size_bytes: u64,
/// The compactor will limit the number of simultaneous cold partition compaction jobs based on
/// the size of the input files to be compacted. This number should be less than 1/10th of the
/// available memory to ensure compactions have enough space to run.
max_cold_concurrent_size_bytes: u64,
/// Max number of partitions per sequencer we want to compact per cycle
max_number_partitions_per_sequencer: usize,
@ -138,14 +142,19 @@ pub struct CompactorConfig {
/// Min number of recent ingested files a partition needs to be considered for compacting
min_number_recent_ingested_files_per_partition: usize,
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
/// compact together until the total size of input files crosses this threshold. Later
/// compactions will pick up the remaining L0 files.
/// A compaction operation for hot partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total size of input files crosses this
/// threshold. Later compactions will pick up the remaining L0 files.
///
/// A compaction operation will be limited by this or by the file count threshold, whichever is
/// hit first.
input_size_threshold_bytes: u64,
/// A compaction operation for cold partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total size of input files crosses this
/// threshold. Later compactions will pick up the remaining L0 files.
cold_input_size_threshold_bytes: u64,
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
/// compact together until the total number of L0 + L1 files crosses this threshold. Later
/// compactions will pick up the remaining L0 files.
@ -153,6 +162,11 @@ pub struct CompactorConfig {
/// A compaction operation will be limited by this or by the input size threshold, whichever is
/// hit first.
input_file_count_threshold: usize,
/// The multiple of times that compacting hot partitions should run for every one time that
/// compacting cold partitions runs. Set to 1 to compact hot partitions and cold partitions
/// equally.
hot_multiple: usize,
}
impl CompactorConfig {
@ -162,11 +176,14 @@ impl CompactorConfig {
max_desired_file_size_bytes: u64,
percentage_max_file_size: u16,
split_percentage: u16,
max_concurrent_compaction_size_bytes: u64,
max_concurrent_size_bytes: u64,
max_cold_concurrent_size_bytes: u64,
max_number_partitions_per_sequencer: usize,
min_number_recent_ingested_files_per_partition: usize,
input_size_threshold_bytes: u64,
cold_input_size_threshold_bytes: u64,
input_file_count_threshold: usize,
hot_multiple: usize,
) -> Self {
assert!(split_percentage > 0 && split_percentage <= 100);
@ -174,11 +191,14 @@ impl CompactorConfig {
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
max_concurrent_compaction_size_bytes,
max_concurrent_size_bytes,
max_cold_concurrent_size_bytes,
max_number_partitions_per_sequencer,
min_number_recent_ingested_files_per_partition,
input_size_threshold_bytes,
cold_input_size_threshold_bytes,
input_file_count_threshold,
hot_multiple,
}
}
@ -202,8 +222,8 @@ impl CompactorConfig {
/// level 0 files, but should later also consider the level 1 files to be compacted. This
/// number should be less than 1/10th of the available memory to ensure compactions have
/// enough space to run.
pub fn max_concurrent_compaction_size_bytes(&self) -> u64 {
self.max_concurrent_compaction_size_bytes
pub fn max_concurrent_size_bytes(&self) -> u64 {
self.max_concurrent_size_bytes
}
/// Max number of partitions per sequencer we want to compact per cycle
@ -216,9 +236,9 @@ impl CompactorConfig {
self.min_number_recent_ingested_files_per_partition
}
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
/// compact together until the total size of input files crosses this threshold. Later
/// compactions will pick up the remaining L0 files.
/// A compaction operation for hot partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total size of input files crosses this
/// threshold. Later compactions will pick up the remaining L0 files.
///
/// A compaction operation will be limited by this or by the file count threshold, whichever is
/// hit first.
@ -226,6 +246,13 @@ impl CompactorConfig {
self.input_size_threshold_bytes
}
/// A compaction operation for cold partitions will gather as many L0 files with their
/// overlapping L1 files to compact together until the total size of input files crosses this
/// threshold. Later compactions will pick up the remaining L0 files.
pub fn cold_input_size_threshold_bytes(&self) -> u64 {
self.cold_input_size_threshold_bytes
}
/// A compaction operation will gather as many L0 files with their overlapping L1 files to
/// compact together until the total number of L0 + L1 files crosses this threshold. Later
/// compactions will pick up the remaining L0 files.
@ -248,119 +275,235 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
while !shutdown.is_cancelled() {
debug!("compactor main loop tick.");
// Select partition candidates
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("partitions_to_compact", || async {
compactor
.partitions_to_compact(
compactor.config.max_number_partitions_per_sequencer(),
compactor
.config
.min_number_recent_ingested_files_per_partition(),
)
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(Attributes::from([]));
duration.record(delta);
for _ in 0..compactor.config.hot_multiple {
compact_hot_partitions(Arc::clone(&compactor)).await;
}
compact_cold_partitions(Arc::clone(&compactor)).await;
}
}
// Add other compaction-needed info into selected partitions
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("partitions_to_compact", || async {
compactor.add_info_to_partitions(&candidates).await
async fn compact_hot_partitions(compactor: Arc<Compactor>) {
// Select hot partition candidates
let hot_attributes = Attributes::from(&[("partition_type", "hot")]);
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("hot_partitions_to_compact", || async {
compactor
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_sequencer(),
compactor
.config
.min_number_recent_ingested_files_per_partition(),
)
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(hot_attributes.clone());
duration.record(delta);
}
// Add other compaction-needed info into selected partitions
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("add_info_to_partitions", || async {
compactor.add_info_to_partitions(&candidates).await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.partitions_extra_info_reading_duration
.recorder(hot_attributes.clone());
duration.record(delta);
}
let n_candidates = candidates.len();
if n_candidates == 0 {
debug!("no hot compaction candidates found");
// sleep for a second to avoid a hot busy loop when the catalog is polled
tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await;
return;
} else {
debug!(n_candidates, "found hot compaction candidates");
}
let start_time = compactor.time_provider.now();
// Repeat compacting n partitions in parallel until all candidates are compacted.
// Concurrency level calculation (this is estimated from previous experiments. The actual
// resource management will be more complicated and a future feature):
// . Each `compact partititon` takes max of this much memory input_size_threshold_bytes
// . We have this memory budget: max_concurrent_size_bytes
// --> num_parallel_partitions = max_concurrent_size_bytes/
// input_size_threshold_bytes
let num_parallel_partitions = (compactor.config.max_concurrent_size_bytes
/ compactor.config.input_size_threshold_bytes) as usize;
futures::stream::iter(candidates)
.map(|p| {
// run compaction in its own task
let comp = Arc::clone(&compactor);
tokio::task::spawn(async move {
let partition_id = p.candidate.partition_id;
let compaction_result = crate::compact_hot_partition(&comp, p).await;
match compaction_result {
Err(e) => {
warn!(?e, ?partition_id, "hot compaction failed");
}
Ok(_) => {
debug!(?partition_id, "hot compaction complete");
}
};
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.partitions_extra_info_reading_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
})
// Assume we have enough resources to run
// num_parallel_partitions compactions in parallel
.buffer_unordered(num_parallel_partitions)
// report any JoinErrors (aka task panics)
.map(|join_result| {
if let Err(e) = join_result {
warn!(?e, "hot compaction task failed");
}
Ok(())
})
// Errors are reported during execution, so ignore results here
// https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each
.forward(futures::sink::drain())
.await
.ok();
let n_candidates = candidates.len();
if n_candidates == 0 {
debug!("no compaction candidates found");
// sleep for a second to avoid a hot busy loop when the
// catalog is polled
tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await;
continue;
} else {
debug!(n_candidates, "found compaction candidates");
}
// Done compacting all candidates in the cycle, record its time
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor.compaction_cycle_duration.recorder(hot_attributes);
duration.record(delta);
}
}
let start_time = compactor.time_provider.now();
async fn compact_cold_partitions(compactor: Arc<Compactor>) {
let cold_attributes = Attributes::from(&[("partition_type", "cold")]);
// Select cold partition candidates
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("cold_partitions_to_compact", || async {
compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_sequencer())
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(cold_attributes.clone());
duration.record(delta);
}
// Repeat compacting n partitions in parallel until all candidates are compacted.
// Concurrency level calculation (this is estimated from previous experiments. The actual resource
// management will be more complicated and a future feature):
// . Each `compact partititon` takes max of this much memory input_size_threshold_bytes
// . We have this memory budget: max_concurrent_compaction_size_bytes
// --> num_parallel_partitions = max_concurrent_compaction_size_bytes/ input_size_threshold_bytes
let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes
/ compactor.config.input_size_threshold_bytes)
as usize;
// Add other compaction-needed info into selected partitions
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("add_info_to_partitions", || async {
compactor.add_info_to_partitions(&candidates).await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.partitions_extra_info_reading_duration
.recorder(cold_attributes.clone());
duration.record(delta);
}
futures::stream::iter(candidates)
.map(|p| {
// run compaction in its own task
let comp = Arc::clone(&compactor);
tokio::task::spawn(async move {
let partition_id = p.candidate.partition_id;
let compaction_result = crate::compact_partition(&comp, p).await;
let n_candidates = candidates.len();
if n_candidates == 0 {
debug!("no cold compaction candidates found");
return;
} else {
debug!(n_candidates, "found cold compaction candidates");
}
match compaction_result {
Err(e) => {
warn!(?e, ?partition_id, "compaction failed");
}
Ok(_) => {
debug!(?partition_id, "compaction complete");
}
};
})
let start_time = compactor.time_provider.now();
// Repeat compacting n cold partitions in parallel until all candidates are compacted.
// Concurrency level calculation (this is estimated from previous experiments. The actual
// resource management will be more complicated and a future feature):
//
// . Each `compact partititon` takes max of this much memory cold_input_size_threshold_bytes
// . We have this memory budget: max_cold_concurrent_size_bytes
// --> num_parallel_partitions = max_cold_concurrent_size_bytes/
// cold_input_size_threshold_bytes
let num_parallel_partitions = (compactor.config.max_cold_concurrent_size_bytes
/ compactor.config.cold_input_size_threshold_bytes)
as usize;
futures::stream::iter(candidates)
.map(|p| {
// run compaction in its own task
let comp = Arc::clone(&compactor);
tokio::task::spawn(async move {
let partition_id = p.candidate.partition_id;
let compaction_result = crate::compact_cold_partition(&comp, p).await;
match compaction_result {
Err(e) => {
warn!(?e, ?partition_id, "cold compaction failed");
}
Ok(_) => {
debug!(?partition_id, "cold compaction complete");
}
};
})
// Assume we have enough resources to run
// num_parallel_partitions compactions in parallel
.buffer_unordered(num_parallel_partitions)
// report any JoinErrors (aka task panics)
.map(|join_result| {
if let Err(e) = join_result {
warn!(?e, "compaction task failed");
}
Ok(())
})
// Errors are reported during execution, so ignore results here
// https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each
.forward(futures::sink::drain())
.await
.ok();
})
// Assume we have enough resources to run
// num_parallel_partitions compactions in parallel
.buffer_unordered(num_parallel_partitions)
// report any JoinErrors (aka task panics)
.map(|join_result| {
if let Err(e) = join_result {
warn!(?e, "cold compaction task failed");
}
Ok(())
})
// Errors are reported during execution, so ignore results here
// https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each
.forward(futures::sink::drain())
.await
.ok();
// Done compacting all candidates in the cycle, record its time
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.compaction_cycle_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
// Done compacting all candidates in the cycle, record its time
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.compaction_cycle_duration
.recorder(cold_attributes);
duration.record(delta);
}
}

View File

@ -21,6 +21,7 @@ pub mod server;
pub mod utils;
use crate::compact::{Compactor, PartitionCompactionCandidateWithInfo};
use data_types::CompactionLevel;
use metric::Attributes;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
@ -29,18 +30,23 @@ use std::sync::Arc;
#[allow(missing_copy_implementations, missing_docs)]
pub(crate) enum Error {
#[snafu(display("{}", source))]
ParquetFileLookup {
Lookup {
source: parquet_file_lookup::PartitionFilesFromPartitionError,
},
#[snafu(display("{}", source))]
ParquetFileCombining {
Combining {
source: parquet_file_combining::Error,
},
#[snafu(display("{}", source))]
Upgrading {
source: iox_catalog::interface::Error,
},
}
/// One compaction operation of one partition
pub(crate) async fn compact_partition(
/// One compaction operation of one hot partition
pub(crate) async fn compact_hot_partition(
compactor: &Compactor,
partition: PartitionCompactionCandidateWithInfo,
) -> Result<(), Error> {
@ -53,9 +59,9 @@ pub(crate) async fn compact_partition(
partition.id(),
)
.await
.context(ParquetFileLookupSnafu)?;
.context(LookupSnafu)?;
let to_compact = parquet_file_filtering::filter_parquet_files(
let to_compact = parquet_file_filtering::filter_hot_parquet_files(
parquet_files_for_compaction,
compactor.config.input_size_threshold_bytes(),
compactor.config.input_file_count_threshold(),
@ -76,9 +82,79 @@ pub(crate) async fn compact_partition(
compactor.config.split_percentage(),
)
.await
.context(ParquetFileCombiningSnafu);
.context(CombiningSnafu);
let attributes = Attributes::from([("sequencer_id", format!("{}", sequencer_id).into())]);
let attributes = Attributes::from([
("sequencer_id", format!("{}", sequencer_id).into()),
("partition_type", "hot".into()),
]);
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor.compaction_duration.recorder(attributes);
duration.record(delta);
}
compact_result
}
/// One compaction operation of one cold partition
pub(crate) async fn compact_cold_partition(
compactor: &Compactor,
partition: PartitionCompactionCandidateWithInfo,
) -> Result<(), Error> {
let start_time = compactor.time_provider.now();
let sequencer_id = partition.sequencer_id();
let parquet_files_for_compaction =
parquet_file_lookup::ParquetFilesForCompaction::for_partition(
Arc::clone(&compactor.catalog),
partition.id(),
)
.await
.context(LookupSnafu)?;
let to_compact = parquet_file_filtering::filter_cold_parquet_files(
parquet_files_for_compaction,
compactor.config.cold_input_size_threshold_bytes(),
&compactor.parquet_file_candidate_gauge,
&compactor.parquet_file_candidate_bytes_gauge,
);
let compact_result =
if to_compact.len() == 1 && to_compact[0].compaction_level == CompactionLevel::Initial {
// upgrade the one l0 file to l1, don't run compaction
let mut repos = compactor.catalog.repositories().await;
repos
.parquet_files()
.update_to_level_1(&[to_compact[0].id])
.await
.context(UpgradingSnafu)?;
Ok(())
} else {
parquet_file_combining::compact_parquet_files(
to_compact,
partition,
Arc::clone(&compactor.catalog),
compactor.store.clone(),
Arc::clone(&compactor.exec),
Arc::clone(&compactor.time_provider),
&compactor.compaction_counter,
compactor.config.max_desired_file_size_bytes(),
compactor.config.percentage_max_file_size(),
compactor.config.split_percentage(),
)
.await
.context(CombiningSnafu)
};
let attributes = Attributes::from([
("sequencer_id", format!("{}", sequencer_id).into()),
("partition_type", "cold".into()),
]);
if let Some(delta) = compactor
.time_provider
.now()
@ -103,12 +179,13 @@ mod tests {
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use iox_time::{SystemProvider, TimeProvider};
use parquet_file::{storage::ParquetStorage, ParquetFilePath};
use std::time::Duration;
// A quite sophisticated integration test
// Beside lp data, every value min/max sequence numbers and min/max time are important
// to have a combination of needed tests in this test function
#[tokio::test]
async fn test_compact_partition_many_files() {
async fn test_compact_hot_partition_many_files() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
@ -253,7 +330,7 @@ mod tests {
// ------------------------------------------------
// Compact
let candidates = compactor
.partitions_to_compact(
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_sequencer(),
compactor
.config
@ -266,7 +343,7 @@ mod tests {
assert_eq!(candidates.len(), 1);
let c = candidates.pop().unwrap();
compact_partition(&compactor, c).await.unwrap();
compact_hot_partition(&compactor, c).await.unwrap();
// Should have 3 non-soft-deleted files:
//
@ -328,6 +405,362 @@ mod tests {
);
}
#[tokio::test]
async fn test_compact_cold_partition_many_files() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
// lp1 does not overlap with any other level 0
let lp1 = vec![
"table,tag1=WA field_int=1000i 10",
"table,tag1=VT field_int=10i 20",
]
.join("\n");
// lp2 overlaps with lp3
let lp2 = vec![
"table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate
"table,tag1=VT field_int=10i 10000",
"table,tag1=UT field_int=70i 20000",
]
.join("\n");
// lp3 overlaps with lp2
let lp3 = vec![
"table,tag1=WA field_int=1500i 8000", // latest duplicate and kept
"table,tag1=VT field_int=10i 6000",
"table,tag1=UT field_int=270i 25000",
]
.join("\n");
// lp4 does not overlap with any
let lp4 = vec![
"table,tag2=WA,tag3=10 field_int=1600i 28000",
"table,tag2=VT,tag3=20 field_int=20i 26000",
]
.join("\n");
// lp5 overlaps with lp1
let lp5 = vec![
"table,tag2=PA,tag3=15 field_int=1601i 9",
"table,tag2=OH,tag3=21 field_int=21i 25",
]
.join("\n");
// lp6 does not overlap with any
let lp6 = vec![
"table,tag2=PA,tag3=15 field_int=81601i 90000",
"table,tag2=OH,tag3=21 field_int=421i 91000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_sequencer(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
.await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
Arc::clone(&metrics),
);
// parquet files that are all in the same partition
// pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
.with_max_seq(3)
.with_min_time(10)
.with_max_time(20)
.with_file_size_bytes(compactor.config.max_desired_file_size_bytes() + 10)
.with_creation_time(time_38_hour_ago);
partition.create_parquet_file(builder).await;
// pf2 overlaps with pf3
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp2)
.with_max_seq(5)
.with_min_time(8_000)
.with_max_time(20_000)
.with_file_size_bytes(100) // small file
.with_creation_time(time_38_hour_ago);
partition.create_parquet_file(builder).await;
// pf3 overlaps with pf2
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp3)
.with_max_seq(10)
.with_min_time(6_000)
.with_max_time(25_000)
.with_file_size_bytes(100) // small file
.with_creation_time(time_38_hour_ago);
partition.create_parquet_file(builder).await;
// pf4 does not overlap with any but is small
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp4)
.with_max_seq(18)
.with_min_time(26_000)
.with_max_time(28_000)
.with_file_size_bytes(100) // small file
.with_creation_time(time_38_hour_ago);
partition.create_parquet_file(builder).await;
// pf5 was created in a previous compaction cycle; overlaps with pf1
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp5)
.with_max_seq(1)
.with_min_time(9)
.with_max_time(25)
.with_file_size_bytes(100) // small file
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
partition.create_parquet_file(builder).await;
// pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6)
.with_max_seq(20)
.with_min_time(90000)
.with_max_time(91000)
.with_file_size_bytes(100) // small file
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
partition.create_parquet_file(builder).await;
// should have 4 level-0 files before compacting
let count = catalog.count_level_0_files(sequencer.sequencer.id).await;
assert_eq!(count, 4);
// ------------------------------------------------
// Compact
let candidates = compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_sequencer())
.await
.unwrap();
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop().unwrap();
compact_cold_partition(&compactor, c).await.unwrap();
// Should have 3 non-soft-deleted files:
//
// - the level 1 file that didn't overlap with anything
// - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 3);
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
vec![
(6, CompactionLevel::FileNonOverlapped),
(7, CompactionLevel::FileNonOverlapped),
(8, CompactionLevel::FileNonOverlapped),
]
);
// ------------------------------------------------
// Verify the parquet file content
// Later compacted file
let file1 = files.pop().unwrap();
let batches = read_parquet_file(&table, file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+-----------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+-----------------------------+",
"| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |",
"| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |",
"| 270 | UT | | | 1970-01-01T00:00:00.000025Z |",
"+-----------+------+------+------+-----------------------------+",
],
&batches
);
// Earlier compacted file
let file0 = files.pop().unwrap();
let batches = read_parquet_file(&table, file0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+------+--------------------------------+",
"| field_int | tag1 | tag2 | tag3 | time |",
"+-----------+------+------+------+--------------------------------+",
"| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000006Z |",
"| 10 | VT | | | 1970-01-01T00:00:00.000010Z |",
"| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |",
"| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |",
"| 1601 | | PA | 15 | 1970-01-01T00:00:00.000000009Z |",
"| 21 | | OH | 21 | 1970-01-01T00:00:00.000000025Z |",
"| 70 | UT | | | 1970-01-01T00:00:00.000020Z |",
"+-----------+------+------+------+--------------------------------+",
],
&batches
);
}
#[tokio::test]
async fn test_compact_cold_partition_one_level_0_without_overlap() {
test_helpers::maybe_start_logging();
let catalog = TestCatalog::new();
// lp1 does not overlap with any other level 0 or level 1
let lp1 = vec![
"table,tag1=WA field_int=1000i 10",
"table,tag1=VT field_int=10i 20",
]
.join("\n");
// lp6 does not overlap with any
let lp6 = vec![
"table,tag2=PA,tag3=15 field_int=81601i 90000",
"table,tag2=OH,tag3=21 field_int=421i 91000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_sequencer(1).await;
let table = ns.create_table("table").await;
table.create_column("field_int", ColumnType::I64).await;
table.create_column("tag1", ColumnType::Tag).await;
table.create_column("tag2", ColumnType::Tag).await;
table.create_column("tag3", ColumnType::Tag).await;
table.create_column("time", ColumnType::Time).await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
.await;
let time = Arc::new(SystemProvider::new());
let time_38_hour_ago = (time.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos();
let config = make_compactor_config();
let metrics = Arc::new(metric::Registry::new());
let compactor = Compactor::new(
vec![sequencer.sequencer.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
Arc::new(SystemProvider::new()),
BackoffConfig::default(),
config,
Arc::clone(&metrics),
);
// parquet files that are all in the same partition
// pf1 does not overlap with any other level 0
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp1)
.with_max_seq(3)
.with_min_time(10)
.with_max_time(20)
.with_file_size_bytes(compactor.config.max_desired_file_size_bytes() + 10)
.with_creation_time(time_38_hour_ago);
partition.create_parquet_file(builder).await;
// pf6 was created in a previous compaction cycle; does not overlap with any
let builder = TestParquetFileBuilder::default()
.with_line_protocol(&lp6)
.with_max_seq(20)
.with_min_time(90000)
.with_max_time(91000)
.with_file_size_bytes(100) // small file
.with_creation_time(time_38_hour_ago)
.with_compaction_level(CompactionLevel::FileNonOverlapped);
partition.create_parquet_file(builder).await;
// should have 1 level-0 file before compacting
let count = catalog.count_level_0_files(sequencer.sequencer.id).await;
assert_eq!(count, 1);
// ------------------------------------------------
// Compact
let candidates = compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_sequencer())
.await
.unwrap();
let mut candidates = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(candidates.len(), 1);
let c = candidates.pop().unwrap();
compact_cold_partition(&compactor, c).await.unwrap();
// Should have 2 non-soft-deleted files:
//
// - the level 1 file that didn't overlap with anything
// - the newly created level 1 file that was only upgraded from level 0
// - the two newly created after compacting and splitting pf1, pf2, pf3, pf4, pf5
let mut files = catalog.list_by_table_not_to_delete(table.table.id).await;
assert_eq!(files.len(), 2);
let files_and_levels: Vec<_> = files
.iter()
.map(|f| (f.id.get(), f.compaction_level))
.collect();
assert_eq!(
files_and_levels,
vec![
(1, CompactionLevel::FileNonOverlapped),
(2, CompactionLevel::FileNonOverlapped),
]
);
// ------------------------------------------------
// Verify the parquet file content
// Later compacted file
let file1 = files.pop().unwrap();
let batches = read_parquet_file(&table, file1).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+------+-----------------------------+",
"| field_int | tag2 | tag3 | time |",
"+-----------+------+------+-----------------------------+",
"| 421 | OH | 21 | 1970-01-01T00:00:00.000091Z |",
"| 81601 | PA | 15 | 1970-01-01T00:00:00.000090Z |",
"+-----------+------+------+-----------------------------+",
],
&batches
);
// Earlier compacted file
let file0 = files.pop().unwrap();
let batches = read_parquet_file(&table, file0).await;
assert_batches_sorted_eq!(
&[
"+-----------+------+--------------------------------+",
"| field_int | tag1 | time |",
"+-----------+------+--------------------------------+",
"| 10 | VT | 1970-01-01T00:00:00.000000020Z |",
"| 1000 | WA | 1970-01-01T00:00:00.000000010Z |",
"+-----------+------+--------------------------------+",
],
&batches
);
}
async fn read_parquet_file(table: &Arc<TestTable>, file: ParquetFile) -> Vec<RecordBatch> {
let storage = ParquetStorage::new(table.catalog.object_store());
@ -354,19 +787,26 @@ mod tests {
let percentage_max_file_size = 30;
let split_percentage = 80;
let max_concurrent_size_bytes = 100_000;
let max_cold_concurrent_size_bytes = 90_000;
let max_number_partitions_per_sequencer = 1;
let min_number_recent_ingested_per_partition = 1;
let input_size_threshold_bytes = 300 * 1024 * 1024;
let cold_input_size_threshold_bytes = 600 * 1024 * 1024;
let input_file_count_threshold = 100;
let hot_multiple = 4;
CompactorConfig::new(
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
max_concurrent_size_bytes,
max_cold_concurrent_size_bytes,
max_number_partitions_per_sequencer,
min_number_recent_ingested_per_partition,
input_size_threshold_bytes,
cold_input_size_threshold_bytes,
input_file_count_threshold,
hot_multiple,
)
}
}

File diff suppressed because it is too large Load Diff

View File

@ -420,10 +420,13 @@ impl Config {
percentage_max_file_size: 30,
split_percentage: 80,
max_concurrent_size_bytes: 100_000,
max_cold_concurrent_size_bytes: 90_000,
max_number_partitions_per_sequencer: 1,
min_number_recent_ingested_files_per_partition: 1,
input_size_threshold_bytes: 314_572_800,
cold_input_size_threshold_bytes: 629_145_600,
input_file_count_threshold: 100,
hot_multiple: 4,
};
let querier_config = QuerierConfig {

View File

@ -552,10 +552,12 @@ pub trait ParquetFileRepo: Send + Sync {
num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
/// List partitions with the most level 0 files for a given sequencer
/// List partitions with the most level 0 files created earlier than `older_than_num_hours`
/// hours ago for a given sequencer. In other words, "cold" partitions that need compaction.
async fn most_level_0_files_partitions(
&mut self,
sequencer_id: SequencerId,
older_than_num_hours: u32,
num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
@ -2686,17 +2688,25 @@ pub(crate) mod test_helpers {
.await
.unwrap();
let time_five_hour_ago = Timestamp::new(
(catalog.time_provider().now() - Duration::from_secs(60 * 60 * 5)).timestamp_nanos(),
);
let time_38_hour_ago = Timestamp::new(
(catalog.time_provider().now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(),
);
let older_than = 24;
let num_partitions = 2;
// Db has no partition
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
// The DB has 1 partition but it does not have any file
// The DB has 1 partition but it does not have any files
let partition = repos
.partitions()
.create_or_get("one".into(), sequencer.id, table.id)
@ -2704,7 +2714,7 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
@ -2722,7 +2732,7 @@ pub(crate) mod test_helpers {
file_size_bytes: 1337,
row_count: 0,
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
created_at: time_38_hour_ago,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
};
let delete_l0_file = repos
@ -2737,12 +2747,42 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
// The partition has one non-deleted level 0 files
// A partition with one cold file and one hot file
let hot_partition = repos
.partitions()
.create_or_get("hot".into(), sequencer.id, table.id)
.await
.unwrap();
let cold_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: hot_partition.id,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(cold_file_params)
.await
.unwrap();
let hot_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: hot_partition.id,
created_at: time_five_hour_ago,
..parquet_file_params.clone()
};
repos.parquet_files().create(hot_file_params).await.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
// The partition has one non-deleted level 0 file
let l0_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
@ -2754,12 +2794,12 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
// The DB has 2 partitions both has non-deleled L0 files
// The DB has 2 partitions; both have non-deleted L0 files
let another_partition = repos
.partitions()
.create_or_get("two".into(), sequencer.id, table.id)
@ -2788,7 +2828,7 @@ pub(crate) mod test_helpers {
// Must return 2 partitions
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
@ -2815,11 +2855,11 @@ pub(crate) mod test_helpers {
// Still return 2 partitions the limit num_partitions=2
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// and the first one should stil be the one with the most L0 files
// and the first one should still be the one with the most L0 files
assert_eq!(partitions[0].partition_id, another_partition.id);
}

View File

@ -1169,10 +1169,16 @@ impl ParquetFileRepo for MemTxn {
async fn most_level_0_files_partitions(
&mut self,
sequencer_id: SequencerId,
older_than_num_hours: u32,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let time_nano = (self.time_provider.now()
- Duration::from_secs(60 * 60 * older_than_num_hours as u64))
.timestamp_nanos();
let older_than = Timestamp::new(time_nano);
let stage = self.stage();
let partitions = stage
let relevant_parquet_files = stage
.parquet_files
.iter()
.filter(|f| {
@ -1180,30 +1186,44 @@ impl ParquetFileRepo for MemTxn {
&& f.compaction_level == CompactionLevel::Initial
&& f.to_delete.is_none()
})
.map(|pf| PartitionParam {
partition_id: pf.partition_id,
sequencer_id: pf.sequencer_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
})
.collect::<Vec<_>>();
// Count num of files per partition by simply count the number of partition duplicates
let mut partition_duplicate_count: HashMap<PartitionParam, i32> =
HashMap::with_capacity(partitions.len());
for p in partitions {
let count = partition_duplicate_count.entry(p).or_insert(0);
HashMap::with_capacity(relevant_parquet_files.len());
let mut partition_max_created_at = HashMap::with_capacity(relevant_parquet_files.len());
for pf in relevant_parquet_files {
let key = PartitionParam {
partition_id: pf.partition_id,
sequencer_id: pf.sequencer_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
};
let count = partition_duplicate_count.entry(key).or_insert(0);
*count += 1;
let max_created_at = partition_max_created_at.entry(key).or_insert(pf.created_at);
if pf.created_at > *max_created_at {
*max_created_at = pf.created_at;
}
}
// Sort partitions by file count
let mut partitions = partition_duplicate_count.iter().collect::<Vec<_>>();
// Sort partitions whose max created at is older than the limit by their file count
let mut partitions = partition_duplicate_count
.iter()
.filter(|(k, _v)| partition_max_created_at.get(k).unwrap() < &older_than)
.collect::<Vec<_>>();
partitions.sort_by(|a, b| b.1.cmp(a.1));
// Return top partitions with most file counts
let partitions = partitions
.into_iter()
.map(|(k, _)| *k)
.map(|pf| PartitionParam {
partition_id: pf.partition_id,
sequencer_id: pf.sequencer_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
})
.take(num_partitions)
.collect::<Vec<_>>();

View File

@ -280,7 +280,7 @@ decorate!(
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, sequencer_id: SequencerId, num_hours: u32, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_level_0_files_partitions" = most_level_0_files_partitions(&mut self, sequencer_id: SequencerId, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_level_0_files_partitions" = most_level_0_files_partitions(&mut self, sequencer_id: SequencerId, older_than_num_hours: u32, num_partitions: usize) -> Result<Vec<PartitionParam>>;
]
);

View File

@ -1695,14 +1695,14 @@ WHERE parquet_file.sequencer_id = $1
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id)
FROM parquet_file
FROM parquet_file
WHERE compaction_level = 0 and to_delete is null
and sequencer_id = $1
and to_timestamp(created_at/1000000000) > now() - ($2 || 'hour')::interval
group by 1, 2, 3, 4
having count(id) >= $3
order by 5 DESC
limit $4;
limit $4;
"#,
)
.bind(&sequencer_id) // $1
@ -1717,25 +1717,30 @@ limit $4;
async fn most_level_0_files_partitions(
&mut self,
sequencer_id: SequencerId,
older_than_num_hours: u32,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let older_than_num_hours = older_than_num_hours as i32;
let num_partitions = num_partitions as i32;
// The preliminary performance test says this query runs around 50ms
// We have index on (sequencer_id, comapction_level, to_delete)
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id)
FROM parquet_file
WHERE compaction_level = 0 and to_delete is null
and sequencer_id = $1
group by 1, 2, 3, 4
order by 5 DESC
limit $2;
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id), max(created_at)
FROM parquet_file
WHERE compaction_level = 0
AND to_delete IS NULL
AND sequencer_id = $1
GROUP BY 1, 2, 3, 4
HAVING to_timestamp(max(created_at)/1000000000) < now() - ($2 || 'hour')::interval
ORDER BY 5 DESC
LIMIT $3;
"#,
)
.bind(&sequencer_id) // $1
.bind(&num_partitions) // $2
.bind(&older_than_num_hours) // $2
.bind(&num_partitions) // $3
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })

View File

@ -168,10 +168,13 @@ pub async fn create_compactor_server_type(
compactor_config.percentage_max_file_size,
compactor_config.split_percentage,
compactor_config.max_concurrent_size_bytes,
compactor_config.max_cold_concurrent_size_bytes,
compactor_config.max_number_partitions_per_sequencer,
compactor_config.min_number_recent_ingested_files_per_partition,
compactor_config.input_size_threshold_bytes,
compactor_config.cold_input_size_threshold_bytes,
compactor_config.input_file_count_threshold,
compactor_config.hot_multiple,
);
let compactor_handler = Arc::new(CompactorHandlerImpl::new(
sequencers,