feat: Select cold partitions

pull/24376/head
Carol (Nichols || Goulding) 2022-08-03 13:16:00 -04:00
parent 6e9c752230
commit fc62c82722
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
6 changed files with 468 additions and 75 deletions

View File

@ -252,8 +252,10 @@ impl Compactor {
let mut repos = self.catalog.repositories().await;
for sequencer_id in &self.sequencers {
let attributes =
Attributes::from([("sequencer_id", format!("{}", *sequencer_id).into())]);
let attributes = Attributes::from([
("sequencer_id", format!("{}", *sequencer_id).into()),
("partition_type", "hot".into()),
]);
// Get the most recent highest ingested throughput partitions within
// the last 4 hours. If nothing, increase to 24 hours
@ -285,35 +287,57 @@ impl Compactor {
}
}
// No active ingesting partitions the last 24 hours,
// get partition with the most level-0 files
if num_partitions == 0 {
debug!(
sequencer_id = sequencer_id.get(),
"no active ingesting partitions",
);
// Record metric for candidates per sequencer
debug!(
sequencer_id = sequencer_id.get(),
n = num_partitions,
"hot compaction candidates",
);
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
number_gauge.set(num_partitions as u64);
}
let mut partitions = repos
.parquet_files()
.most_level_0_files_partitions(*sequencer_id, max_num_partitions_per_sequencer)
.await
.context(MostL0PartitionsSnafu {
sequencer_id: *sequencer_id,
})?;
Ok(candidates)
}
if !partitions.is_empty() {
num_partitions = partitions.len();
candidates.append(&mut partitions);
}
}
/// Return a list of partitions that:
///
/// - Have not received any writes in 24 hours (determined by all parquet files having a
/// created_at time older than 24 hours ago)
/// - Have some level 0 parquet files that need to be upgraded or compacted
pub async fn cold_partitions_to_compact(
&self,
// Max number of cold partitions per sequencer we want to compact
max_num_partitions_per_sequencer: usize,
) -> Result<Vec<PartitionParam>> {
let mut candidates =
Vec::with_capacity(self.sequencers.len() * max_num_partitions_per_sequencer);
let mut repos = self.catalog.repositories().await;
for sequencer_id in &self.sequencers {
let attributes = Attributes::from([
("sequencer_id", format!("{}", *sequencer_id).into()),
("partition_type", "cold".into()),
]);
let mut partitions = repos
.parquet_files()
.most_level_0_files_partitions(*sequencer_id, 24, max_num_partitions_per_sequencer)
.await
.context(MostL0PartitionsSnafu {
sequencer_id: *sequencer_id,
})?;
let num_partitions = partitions.len();
candidates.append(&mut partitions);
// Record metric for candidates per sequencer
debug!(
sequencer_id = sequencer_id.get(),
n = num_partitions,
"compaction candidates",
"cold compaction candidates",
);
let number_gauge = self.compaction_candidate_gauge.recorder(attributes.clone());
let number_gauge = self.compaction_candidate_gauge.recorder(attributes);
number_gauge.set(num_partitions as u64);
}
@ -454,7 +478,7 @@ mod tests {
use uuid::Uuid;
#[tokio::test]
async fn test_candidate_partitions() {
async fn test_hot_partitions_to_compact() {
let catalog = TestCatalog::new();
// Create a db with 2 sequencers, one with 4 empty partitions and the other one with one
@ -465,7 +489,12 @@ mod tests {
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create("namespace_candidate_partitions", "inf", kafka.id, pool.id)
.create(
"namespace_hot_partitions_to_compact",
"inf",
kafka.id,
pool.id,
)
.await
.unwrap();
let table = txn
@ -542,9 +571,6 @@ mod tests {
// Some times in the past to set to created_at of the files
let time_now = Timestamp::new(compactor.time_provider.now().timestamp_nanos());
let _time_one_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60)).timestamp_nanos(),
);
let time_three_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60 * 3)).timestamp_nanos(),
);
@ -573,7 +599,7 @@ mod tests {
};
// Note: The order of the test cases below is important and should not be changed
// becasue they depend on the order of the writes and their content. For example,
// because they depend on the order of the writes and their content. For example,
// in order to test `Case 3`, we do not need to add asserts for `Case 1` and `Case 2`,
// but all the writes, deletes and updates in Cases 1 and 2 are a must for testing Case 3.
// In order words, the last Case needs all content of previous tests.
@ -610,9 +636,10 @@ mod tests {
assert!(candidates.is_empty());
// --------------------------------------
// Case 3: no new recent writes (within the last 24 hours) --> return candidates with the most L0
// Case 3: no new recent writes (within the last 24 hours) --> no partition candidates
// (the cold case will pick them up)
//
// partition2 has an old (more 24 hours ago) non-deleted level 0 file
// partition2 has an old (more than 24 hours ago) non-deleted level 0 file
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p3 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
@ -622,11 +649,10 @@ mod tests {
};
let _pf3 = txn.parquet_files().create(p3).await.unwrap();
txn.commit().await.unwrap();
//
// Has at least one partition with a L0 file --> make it a candidate
// No hot candidates
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition2.id);
assert!(candidates.is_empty());
// --------------------------------------
// Case 4: has one partition with recent writes (5 hours ago) --> return that partition
@ -670,16 +696,16 @@ mod tests {
assert_eq!(candidates[0].partition_id, partition3.id);
// --------------------------------------
// Case 6: has partittion candidates for 2 sequecers
// Case 6: has partition candidates for 2 sequencers
//
// The another_sequencer now has non-deleted level-0 file ingested 38 hours ago
// The another_sequencer now has non-deleted level-0 file ingested 5 hours ago
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p6 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
sequencer_id: another_sequencer.id,
table_id: another_table.id,
partition_id: another_partition.id,
created_at: time_38_hour_ago,
created_at: time_five_hour_ago,
..p1.clone()
};
let _pf6 = txn.parquet_files().create(p6).await.unwrap();
@ -738,4 +764,285 @@ mod tests {
input_file_count_threshold,
)
}
#[tokio::test]
async fn test_cold_partitions_to_compact() {
let catalog = TestCatalog::new();
// Create a db with 2 sequencers, one with 4 empty partitions and the other one with one
// empty partition
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap();
let pool = txn.query_pools().create_or_get("foo").await.unwrap();
let namespace = txn
.namespaces()
.create(
"namespace_hot_partitions_to_compact",
"inf",
kafka.id,
pool.id,
)
.await
.unwrap();
let table = txn
.tables()
.create_or_get("test_table", namespace.id)
.await
.unwrap();
let sequencer = txn
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(1))
.await
.unwrap();
let partition1 = txn
.partitions()
.create_or_get("one".into(), sequencer.id, table.id)
.await
.unwrap();
let partition2 = txn
.partitions()
.create_or_get("two".into(), sequencer.id, table.id)
.await
.unwrap();
let partition3 = txn
.partitions()
.create_or_get("three".into(), sequencer.id, table.id)
.await
.unwrap();
let partition4 = txn
.partitions()
.create_or_get("four".into(), sequencer.id, table.id)
.await
.unwrap();
let partition5 = txn
.partitions()
.create_or_get("five".into(), sequencer.id, table.id)
.await
.unwrap();
// other sequencer
let another_table = txn
.tables()
.create_or_get("another_test_table", namespace.id)
.await
.unwrap();
let another_sequencer = txn
.sequencers()
.create_or_get(&kafka, KafkaPartition::new(2))
.await
.unwrap();
let another_partition = txn
.partitions()
.create_or_get(
"another_partition".into(),
another_sequencer.id,
another_table.id,
)
.await
.unwrap();
// update sort key for this another_partition
let another_partition = txn
.partitions()
.update_sort_key(another_partition.id, &["tag1", "time"])
.await
.unwrap();
txn.commit().await.unwrap();
// Create a compactor
let time_provider = Arc::new(SystemProvider::new());
let config = make_compactor_config();
let compactor = Compactor::new(
vec![sequencer.id, another_sequencer.id],
Arc::clone(&catalog.catalog),
ParquetStorage::new(Arc::clone(&catalog.object_store)),
Arc::new(Executor::new(1)),
time_provider,
BackoffConfig::default(),
config,
Arc::new(metric::Registry::new()),
);
// Some times in the past to set to created_at of the files
let time_five_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60 * 5)).timestamp_nanos(),
);
let time_38_hour_ago = Timestamp::new(
(compactor.time_provider.now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(),
);
// Basic parquet info
let p1 = ParquetFileParams {
sequencer_id: sequencer.id,
namespace_id: namespace.id,
table_id: table.id,
partition_id: partition1.id,
object_store_id: Uuid::new_v4(),
max_sequence_number: SequenceNumber::new(100),
min_time: Timestamp::new(1),
max_time: Timestamp::new(5),
file_size_bytes: 1337,
row_count: 0,
compaction_level: CompactionLevel::Initial, // level of file of new writes
created_at: time_38_hour_ago, // create cold files by default
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
};
// Note: The order of the test cases below is important and should not be changed
// because they depend on the order of the writes and their content. For example,
// in order to test `Case 3`, we do not need to add asserts for `Case 1` and `Case 2`,
// but all the writes, deletes and updates in Cases 1 and 2 are a must for testing Case 3.
// In order words, the last Case needs all content of previous tests.
// This shows the priority of selecting compaction candidates
// --------------------------------------
// Case 1: no files yet --> no partition candidates
//
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
// Case 2: no non-deleleted cold L0 files --> no partition candidates
//
// partition1 has a cold deleted L0
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let pf1 = txn.parquet_files().create(p1.clone()).await.unwrap();
txn.parquet_files().flag_for_delete(pf1.id).await.unwrap();
//
// partition2 has a cold non-L0 file
let p2 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition2.id,
..p1.clone()
};
let pf2 = txn.parquet_files().create(p2).await.unwrap();
txn.parquet_files()
.update_to_level_1(&[pf2.id])
.await
.unwrap();
txn.commit().await.unwrap();
// No non-deleted level 0 files yet --> no candidates
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
// Case 3: no new recent writes (within the last 24 hours) --> return that partition
//
// partition2 has a cold (more than 24 hours ago) non-deleted level 0 file
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p3 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition2.id,
..p1.clone()
};
let _pf3 = txn.parquet_files().create(p3).await.unwrap();
txn.commit().await.unwrap();
//
// Has at least one partition with a L0 file --> make it a candidate
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition2.id);
// --------------------------------------
// Case 4: has two cold partitions --> return the candidate with the most L0
//
// partition4 has two cold non-deleted level 0 files
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p4 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition4.id,
..p1.clone()
};
let _pf4 = txn.parquet_files().create(p4).await.unwrap();
let p5 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition4.id,
..p1.clone()
};
let _pf5 = txn.parquet_files().create(p5).await.unwrap();
txn.commit().await.unwrap();
// Partition with the most l0 files is the candidate
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition4.id);
// --------------------------------------
// Case 5: "warm" and "hot" partitions aren't returned
//
// partition3 has one cold level 0 file and one hot level 0 file
// partition5 has one hot level 0 file
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p3_cold = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition3.id,
..p1.clone()
};
let _pf3_cold = txn.parquet_files().create(p3_cold).await.unwrap();
let p3_hot = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition3.id,
created_at: time_five_hour_ago,
..p1.clone()
};
let _pf3_hot = txn.parquet_files().create(p3_hot).await.unwrap();
let p5_hot = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: partition5.id,
created_at: time_five_hour_ago,
..p1.clone()
};
let _pf5_hot = txn.parquet_files().create(p5_hot).await.unwrap();
txn.commit().await.unwrap();
// Partition4 is still the only candidate
let candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition4.id);
// --------------------------------------
// Case 6: has partition candidates for 2 sequencers
//
// The another_sequencer now has non-deleted level-0 file ingested 38 hours ago
let mut txn = catalog.catalog.start_transaction().await.unwrap();
let p6 = ParquetFileParams {
object_store_id: Uuid::new_v4(),
sequencer_id: another_sequencer.id,
table_id: another_table.id,
partition_id: another_partition.id,
created_at: time_38_hour_ago,
..p1.clone()
};
let _pf6 = txn.parquet_files().create(p6).await.unwrap();
txn.commit().await.unwrap();
//
// Will have 2 candidates, one for each sequencer
let mut candidates = compactor.cold_partitions_to_compact(1).await.unwrap();
candidates.sort();
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].partition_id, partition4.id);
assert_eq!(candidates[0].sequencer_id, sequencer.id);
assert_eq!(candidates[1].partition_id, another_partition.id);
assert_eq!(candidates[1].sequencer_id, another_sequencer.id);
// Add info to partition
let partitions_with_info = compactor.add_info_to_partitions(&candidates).await.unwrap();
assert_eq!(partitions_with_info.len(), 2);
assert_eq!(*partitions_with_info[0].namespace, namespace);
assert_eq!(*partitions_with_info[0].table, table);
assert_eq!(
partitions_with_info[0].partition_key,
partition4.partition_key
);
assert_eq!(partitions_with_info[0].sort_key, partition4.sort_key()); // this sort key is None
assert_eq!(*partitions_with_info[1].namespace, namespace);
assert_eq!(*partitions_with_info[1].table, another_table);
assert_eq!(
partitions_with_info[1].partition_key,
another_partition.partition_key
);
assert_eq!(
partitions_with_info[1].sort_key,
another_partition.sort_key()
); // this sort key is Some(tag1, time)
}
}

View File

@ -276,7 +276,7 @@ async fn compact_hot_partitions(compactor: Arc<Compactor>) {
{
let duration = compactor
.candidate_selection_duration
.recorder(Attributes::from([]));
.recorder(Attributes::from(&[("partition_type", "hot")]));
duration.record(delta);
}
@ -370,7 +370,28 @@ async fn compact_hot_partitions(compactor: Arc<Compactor>) {
}
}
async fn compact_cold_partitions(_compactor: Arc<Compactor>) {}
async fn compact_cold_partitions(compactor: Arc<Compactor>) {
// Select cold partition candidates
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("cold_partitions_to_compact", || async {
compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_sequencer())
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(Attributes::from(&[("partition_type", "cold")]));
duration.record(delta);
}
}
#[async_trait]
impl CompactorHandler for CompactorHandlerImpl {

View File

@ -552,10 +552,12 @@ pub trait ParquetFileRepo: Send + Sync {
num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
/// List partitions with the most level 0 files for a given sequencer
/// List partitions with the most level 0 files created earlier than `older_than_num_hours`
/// hours ago for a given sequencer. In other words, "cold" partitions that need compaction.
async fn most_level_0_files_partitions(
&mut self,
sequencer_id: SequencerId,
older_than_num_hours: u32,
num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
@ -2686,17 +2688,25 @@ pub(crate) mod test_helpers {
.await
.unwrap();
let time_five_hour_ago = Timestamp::new(
(catalog.time_provider().now() - Duration::from_secs(60 * 60 * 5)).timestamp_nanos(),
);
let time_38_hour_ago = Timestamp::new(
(catalog.time_provider().now() - Duration::from_secs(60 * 60 * 38)).timestamp_nanos(),
);
let older_than = 24;
let num_partitions = 2;
// Db has no partition
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
// The DB has 1 partition but it does not have any file
// The DB has 1 partition but it does not have any files
let partition = repos
.partitions()
.create_or_get("one".into(), sequencer.id, table.id)
@ -2704,7 +2714,7 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
@ -2722,7 +2732,7 @@ pub(crate) mod test_helpers {
file_size_bytes: 1337,
row_count: 0,
compaction_level: CompactionLevel::Initial,
created_at: Timestamp::new(1),
created_at: time_38_hour_ago,
column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]),
};
let delete_l0_file = repos
@ -2737,12 +2747,42 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
// The partition has one non-deleted level 0 files
// A partition with one cold file and one hot file
let hot_partition = repos
.partitions()
.create_or_get("hot".into(), sequencer.id, table.id)
.await
.unwrap();
let cold_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: hot_partition.id,
..parquet_file_params.clone()
};
repos
.parquet_files()
.create(cold_file_params)
.await
.unwrap();
let hot_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
partition_id: hot_partition.id,
created_at: time_five_hour_ago,
..parquet_file_params.clone()
};
repos.parquet_files().create(hot_file_params).await.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert!(partitions.is_empty());
// The partition has one non-deleted level 0 file
let l0_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
@ -2754,12 +2794,12 @@ pub(crate) mod test_helpers {
.unwrap();
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
// The DB has 2 partitions both has non-deleled L0 files
// The DB has 2 partitions; both have non-deleted L0 files
let another_partition = repos
.partitions()
.create_or_get("two".into(), sequencer.id, table.id)
@ -2788,7 +2828,7 @@ pub(crate) mod test_helpers {
// Must return 2 partitions
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
@ -2815,11 +2855,11 @@ pub(crate) mod test_helpers {
// Still return 2 partitions the limit num_partitions=2
let partitions = repos
.parquet_files()
.most_level_0_files_partitions(sequencer.id, num_partitions)
.most_level_0_files_partitions(sequencer.id, older_than, num_partitions)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// and the first one should stil be the one with the most L0 files
// and the first one should still be the one with the most L0 files
assert_eq!(partitions[0].partition_id, another_partition.id);
}

View File

@ -1169,10 +1169,16 @@ impl ParquetFileRepo for MemTxn {
async fn most_level_0_files_partitions(
&mut self,
sequencer_id: SequencerId,
older_than_num_hours: u32,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let time_nano = (self.time_provider.now()
- Duration::from_secs(60 * 60 * older_than_num_hours as u64))
.timestamp_nanos();
let older_than = Timestamp::new(time_nano);
let stage = self.stage();
let partitions = stage
let relevant_parquet_files = stage
.parquet_files
.iter()
.filter(|f| {
@ -1180,30 +1186,44 @@ impl ParquetFileRepo for MemTxn {
&& f.compaction_level == CompactionLevel::Initial
&& f.to_delete.is_none()
})
.map(|pf| PartitionParam {
partition_id: pf.partition_id,
sequencer_id: pf.sequencer_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
})
.collect::<Vec<_>>();
// Count num of files per partition by simply count the number of partition duplicates
let mut partition_duplicate_count: HashMap<PartitionParam, i32> =
HashMap::with_capacity(partitions.len());
for p in partitions {
let count = partition_duplicate_count.entry(p).or_insert(0);
HashMap::with_capacity(relevant_parquet_files.len());
let mut partition_max_created_at = HashMap::with_capacity(relevant_parquet_files.len());
for pf in relevant_parquet_files {
let key = PartitionParam {
partition_id: pf.partition_id,
sequencer_id: pf.sequencer_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
};
let count = partition_duplicate_count.entry(key).or_insert(0);
*count += 1;
let max_created_at = partition_max_created_at.entry(key).or_insert(pf.created_at);
if pf.created_at > *max_created_at {
*max_created_at = pf.created_at;
}
}
// Sort partitions by file count
let mut partitions = partition_duplicate_count.iter().collect::<Vec<_>>();
// Sort partitions whose max created at is older than the limit by their file count
let mut partitions = partition_duplicate_count
.iter()
.filter(|(k, _v)| partition_max_created_at.get(k).unwrap() < &older_than)
.collect::<Vec<_>>();
partitions.sort_by(|a, b| b.1.cmp(a.1));
// Return top partitions with most file counts
let partitions = partitions
.into_iter()
.map(|(k, _)| *k)
.map(|pf| PartitionParam {
partition_id: pf.partition_id,
sequencer_id: pf.sequencer_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
})
.take(num_partitions)
.collect::<Vec<_>>();

View File

@ -280,7 +280,7 @@ decorate!(
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, sequencer_id: SequencerId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, sequencer_id: SequencerId, num_hours: u32, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_level_0_files_partitions" = most_level_0_files_partitions(&mut self, sequencer_id: SequencerId, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_level_0_files_partitions" = most_level_0_files_partitions(&mut self, sequencer_id: SequencerId, older_than_num_hours: u32, num_partitions: usize) -> Result<Vec<PartitionParam>>;
]
);

View File

@ -1695,14 +1695,14 @@ WHERE parquet_file.sequencer_id = $1
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id)
FROM parquet_file
FROM parquet_file
WHERE compaction_level = 0 and to_delete is null
and sequencer_id = $1
and to_timestamp(created_at/1000000000) > now() - ($2 || 'hour')::interval
group by 1, 2, 3, 4
having count(id) >= $3
order by 5 DESC
limit $4;
limit $4;
"#,
)
.bind(&sequencer_id) // $1
@ -1717,25 +1717,30 @@ limit $4;
async fn most_level_0_files_partitions(
&mut self,
sequencer_id: SequencerId,
older_than_num_hours: u32,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let older_than_num_hours = older_than_num_hours as i32;
let num_partitions = num_partitions as i32;
// The preliminary performance test says this query runs around 50ms
// We have index on (sequencer_id, comapction_level, to_delete)
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id)
FROM parquet_file
WHERE compaction_level = 0 and to_delete is null
and sequencer_id = $1
group by 1, 2, 3, 4
order by 5 DESC
limit $2;
SELECT partition_id, sequencer_id, namespace_id, table_id, count(id), max(created_at)
FROM parquet_file
WHERE compaction_level = 0
AND to_delete IS NULL
AND sequencer_id = $1
GROUP BY 1, 2, 3, 4
HAVING to_timestamp(max(created_at)/1000000000) < now() - ($2 || 'hour')::interval
ORDER BY 5 DESC
LIMIT $3;
"#,
)
.bind(&sequencer_id) // $1
.bind(&num_partitions) // $2
.bind(&older_than_num_hours) // $2
.bind(&num_partitions) // $3
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })