Merge pull request #7577 from influxdata/cn/remove-old-catalog-functions

fix: remove old, unused catalog functions (and a few types)
pull/24376/head
kodiakhq[bot] 2023-04-18 10:11:39 +00:00 committed by GitHub
commit 14877144eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 47 additions and 3499 deletions

View File

@ -291,29 +291,6 @@ impl std::fmt::Display for PartitionId {
}
}
/// Combination of Shard ID, Table ID, and Partition ID useful for identifying groups of
/// Parquet files to be compacted together.
#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)]
pub struct TablePartition {
/// The shard ID
pub shard_id: ShardId,
/// The table ID
pub table_id: TableId,
/// The partition ID
pub partition_id: PartitionId,
}
impl TablePartition {
/// Combine the relevant parts
pub fn new(shard_id: ShardId, table_id: TableId, partition_id: PartitionId) -> Self {
Self {
shard_id,
table_id,
partition_id,
}
}
}
/// A sequence number from a `router::Shard` (kafka partition)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
#[sqlx(transparent)]
@ -1003,22 +980,6 @@ pub struct SkippedCompaction {
pub limit_num_files_first_in_partition: i64,
}
/// Map of a column type to its count
#[derive(Debug, Copy, Clone, PartialEq, Eq, sqlx::FromRow)]
pub struct ColumnTypeCount {
/// column type
pub col_type: ColumnType,
/// count of the column type
pub count: i64,
}
impl ColumnTypeCount {
/// make a new ColumnTypeCount
pub fn new(col_type: ColumnType, count: i64) -> Self {
Self { col_type, count }
}
}
/// Set of columns.
#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)]
#[sqlx(transparent)]

File diff suppressed because it is too large Load Diff

View File

@ -12,10 +12,10 @@ use crate::{
};
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
PartitionParam, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex,
SkippedCompaction, Table, TableId, TablePartition, Timestamp, TopicId, TopicMetadata,
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table,
TableId, Timestamp, TopicId, TopicMetadata,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::warn;
@ -657,37 +657,6 @@ impl ColumnRepo for MemTxn {
let stage = self.stage();
Ok(stage.columns.clone())
}
async fn list_type_count_by_table_id(
&mut self,
table_id: TableId,
) -> Result<Vec<ColumnTypeCount>> {
let stage = self.stage();
let columns = stage
.columns
.iter()
.filter(|c| c.table_id == table_id)
.map(|c| c.column_type)
.collect::<Vec<_>>();
let mut cols = HashMap::new();
for c in columns {
cols.entry(c)
.and_modify(|counter| *counter += 1)
.or_insert(1);
}
let column_type_counts = cols
.iter()
.map(|c| ColumnTypeCount {
col_type: *c.0,
count: *c.1,
})
.collect::<Vec<_>>();
Ok(column_type_counts)
}
}
#[async_trait]
@ -811,23 +780,6 @@ impl PartitionRepo for MemTxn {
.cloned())
}
async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>> {
let stage = self.stage();
let table_ids: HashSet<_> = stage
.tables
.iter()
.filter_map(|table| (table.namespace_id == namespace_id).then_some(table.id))
.collect();
let partitions: Vec<_> = stage
.partitions
.iter()
.filter(|p| table_ids.contains(&p.table_id))
.cloned()
.collect();
Ok(partitions)
}
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
let stage = self.stage();
@ -947,42 +899,11 @@ impl PartitionRepo for MemTxn {
}
}
async fn update_persisted_sequence_number(
&mut self,
partition_id: PartitionId,
sequence_number: SequenceNumber,
) -> Result<()> {
let stage = self.stage();
match stage.partitions.iter_mut().find(|p| p.id == partition_id) {
Some(p) => {
p.persisted_sequence_number = Some(sequence_number);
Ok(())
}
None => Err(Error::PartitionNotFound { id: partition_id }),
}
}
async fn most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>> {
let stage = self.stage();
Ok(stage.partitions.iter().rev().take(n).cloned().collect())
}
async fn most_recent_n_in_shards(
&mut self,
n: usize,
shards: &[ShardId],
) -> Result<Vec<Partition>> {
let stage = self.stage();
Ok(stage
.partitions
.iter()
.rev()
.filter(|p| shards.contains(&p.shard_id))
.take(n)
.cloned()
.collect())
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
@ -1119,22 +1040,6 @@ impl ParquetFileRepo for MemTxn {
.collect())
}
async fn list_by_shard_greater_than(
&mut self,
shard_id: ShardId,
sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
let files: Vec<_> = stage
.parquet_files
.iter()
.filter(|f| f.shard_id == shard_id && f.max_sequence_number > sequence_number)
.cloned()
.collect();
Ok(files)
}
async fn list_by_namespace_not_to_delete(
&mut self,
namespace_id: NamespaceId,
@ -1179,7 +1084,7 @@ impl ParquetFileRepo for MemTxn {
Ok(parquet_files)
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>> {
let stage = self.stage();
let (delete, keep): (Vec<_>, Vec<_>) = stage.parquet_files.iter().cloned().partition(
@ -1188,275 +1093,10 @@ impl ParquetFileRepo for MemTxn {
stage.parquet_files = keep;
let delete = delete.into_iter().map(|f| f.id).collect();
Ok(delete)
}
async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>> {
let delete = self
.delete_old(older_than)
.await
.unwrap()
.into_iter()
.map(|f| f.id)
.collect();
Ok(delete)
}
async fn level_0(&mut self, shard_id: ShardId) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.filter(|f| {
f.shard_id == shard_id
&& f.compaction_level == CompactionLevel::Initial
&& f.to_delete.is_none()
})
.cloned()
.collect())
}
async fn level_1(
&mut self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.filter(|f| {
f.shard_id == table_partition.shard_id
&& f.table_id == table_partition.table_id
&& f.partition_id == table_partition.partition_id
&& f.compaction_level == CompactionLevel::FileNonOverlapped
&& f.to_delete.is_none()
&& ((f.min_time <= min_time && f.max_time >= min_time)
|| (f.min_time > min_time && f.min_time <= max_time))
})
.cloned()
.collect())
}
async fn recent_highest_throughput_partitions(
&mut self,
shard_id: Option<ShardId>,
time_in_the_past: Timestamp,
min_num_files: usize,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let recent_time = time_in_the_past;
let stage = self.stage();
// Get partition info of selected files
let partitions = stage
.parquet_files
.iter()
.filter(|f| {
let shard_matches_if_specified = if let Some(shard_id) = shard_id {
f.shard_id == shard_id
} else {
true
};
shard_matches_if_specified
&& f.created_at > recent_time
&& f.compaction_level == CompactionLevel::Initial
&& f.to_delete.is_none()
})
.map(|pf| PartitionParam {
partition_id: pf.partition_id,
shard_id: pf.shard_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
})
.collect::<Vec<_>>();
// Count num of files per partition by simply count the number of partition duplicates
let mut partition_duplicate_count: HashMap<PartitionParam, usize> =
HashMap::with_capacity(partitions.len());
for p in partitions {
let count = partition_duplicate_count.entry(p).or_insert(0);
*count += 1;
}
// Partitions with select file count >= min_num_files that haven't been skipped by the
// compactor
let skipped_partitions: Vec<_> = stage
.skipped_compactions
.iter()
.map(|s| s.partition_id)
.collect();
let mut partitions = partition_duplicate_count
.iter()
.filter(|(_, v)| v >= &&min_num_files)
.filter(|(p, _)| !skipped_partitions.contains(&p.partition_id))
.collect::<Vec<_>>();
// Sort partitions by file count
partitions.sort_by(|a, b| b.1.cmp(a.1));
// only return top partitions
let partitions = partitions
.into_iter()
.map(|(k, _)| *k)
.take(num_partitions)
.collect::<Vec<_>>();
Ok(partitions)
}
async fn partitions_with_small_l1_file_count(
&mut self,
shard_id: Option<ShardId>,
small_size_threshold_bytes: i64,
min_small_file_count: usize,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let stage = self.stage();
let skipped_partitions: Vec<_> = stage
.skipped_compactions
.iter()
.map(|s| s.partition_id)
.collect();
// get a list of files for the shard that are under the size threshold and don't belong to
// a partition that has been skipped by the compactor
let relevant_parquet_files = stage
.parquet_files
.iter()
.filter(|f| {
let shard_matches_if_specified = if let Some(shard_id) = shard_id {
f.shard_id == shard_id
} else {
true
};
shard_matches_if_specified
&& f.compaction_level == CompactionLevel::FileNonOverlapped
&& f.file_size_bytes < small_size_threshold_bytes
&& !skipped_partitions.contains(&f.partition_id)
})
.collect::<Vec<_>>();
// count the number of files per partition & use that to retain only a list of counts that
// are above our threshold. the keys then become our partition candidates
let mut partition_small_file_count: HashMap<PartitionParam, usize> =
HashMap::with_capacity(relevant_parquet_files.len());
for pf in relevant_parquet_files {
let key = PartitionParam {
partition_id: pf.partition_id,
shard_id: pf.shard_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
};
if pf.to_delete.is_none() {
let count = partition_small_file_count.entry(key).or_insert(0);
*count += 1;
}
}
partition_small_file_count.retain(|_key, c| *c >= min_small_file_count);
let mut partitions = partition_small_file_count.iter().collect::<Vec<_>>();
// sort and return top N
partitions.sort_by(|a, b| b.1.cmp(a.1));
Ok(partitions
.into_iter()
.map(|(k, _)| *k)
.take(num_partitions)
.collect::<Vec<_>>())
}
async fn most_cold_files_partitions(
&mut self,
shard_id: Option<ShardId>,
time_in_the_past: Timestamp,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let stage = self.stage();
let relevant_parquet_files = stage
.parquet_files
.iter()
.filter(|f| {
let shard_matches_if_specified = if let Some(shard_id) = shard_id {
f.shard_id == shard_id
} else {
true
};
shard_matches_if_specified
&& (f.compaction_level == CompactionLevel::Initial
|| f.compaction_level == CompactionLevel::FileNonOverlapped)
})
.collect::<Vec<_>>();
// Count num of files per partition by simply count the number of partition duplicates
let mut partition_duplicate_count: HashMap<PartitionParam, i32> =
HashMap::with_capacity(relevant_parquet_files.len());
let mut partition_max_created_at = HashMap::with_capacity(relevant_parquet_files.len());
for pf in relevant_parquet_files {
let key = PartitionParam {
partition_id: pf.partition_id,
shard_id: pf.shard_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
};
if pf.to_delete.is_none() {
let count = partition_duplicate_count.entry(key).or_insert(0);
*count += 1;
}
let created_at = if pf.compaction_level == CompactionLevel::Initial {
// the file is level-0, use its created_at time even if it is deleted
Some(pf.created_at)
} else if pf.to_delete.is_none() {
// non deleted level-1, make it `time_in_the_past - 1` to have this partition always the cold one
Some(time_in_the_past - 1)
} else {
// This is the case of deleted level-1
None
};
if let Some(created_at) = created_at {
let max_created_at = partition_max_created_at.entry(key).or_insert(created_at);
*max_created_at = std::cmp::max(*max_created_at, created_at);
if created_at > *max_created_at {
*max_created_at = created_at;
}
}
}
// Sort partitions whose max created at is older than the limit by their file count
let mut partitions = partition_duplicate_count
.iter()
.filter(|(k, _v)| partition_max_created_at.get(k).unwrap() < &time_in_the_past)
.collect::<Vec<_>>();
partitions.sort_by(|a, b| b.1.cmp(a.1));
// Return top partitions with most file counts that haven't been skipped by the compactor
let skipped_partitions: Vec<_> = stage
.skipped_compactions
.iter()
.map(|s| s.partition_id)
.collect();
let partitions = partitions
.into_iter()
.map(|(k, _)| *k)
.filter(|pf| !skipped_partitions.contains(&pf.partition_id))
.map(|pf| PartitionParam {
partition_id: pf.partition_id,
shard_id: pf.shard_id,
namespace_id: pf.namespace_id,
table_id: pf.table_id,
})
.take(num_partitions)
.collect::<Vec<_>>();
Ok(partitions)
}
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
@ -1509,58 +1149,6 @@ impl ParquetFileRepo for MemTxn {
Ok(count_i64.unwrap())
}
async fn count_by_overlaps_with_level_0(
&mut self,
table_id: TableId,
shard_id: ShardId,
min_time: Timestamp,
max_time: Timestamp,
sequence_number: SequenceNumber,
) -> Result<i64> {
let stage = self.stage();
let count = stage
.parquet_files
.iter()
.filter(|f| {
f.shard_id == shard_id
&& f.table_id == table_id
&& f.max_sequence_number < sequence_number
&& f.to_delete.is_none()
&& f.compaction_level == CompactionLevel::Initial
&& ((f.min_time <= min_time && f.max_time >= min_time)
|| (f.min_time > min_time && f.min_time <= max_time))
})
.count();
i64::try_from(count).map_err(|_| Error::InvalidValue { value: count })
}
async fn count_by_overlaps_with_level_1(
&mut self,
table_id: TableId,
shard_id: ShardId,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<i64> {
let stage = self.stage();
let count = stage
.parquet_files
.iter()
.filter(|f| {
f.shard_id == shard_id
&& f.table_id == table_id
&& f.to_delete.is_none()
&& f.compaction_level == CompactionLevel::FileNonOverlapped
&& ((f.min_time <= min_time && f.max_time >= min_time)
|| (f.min_time > min_time && f.min_time <= max_time))
})
.count();
i64::try_from(count).map_err(|_| Error::InvalidValue { value: count })
}
async fn get_by_object_store_id(
&mut self,
object_store_id: Uuid,

View File

@ -7,10 +7,10 @@ use crate::interface::{
};
use async_trait::async_trait;
use data_types::{
Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table,
TableId, TablePartition, Timestamp, TopicId, TopicMetadata,
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, QueryPool,
QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId,
Timestamp, TopicId, TopicMetadata,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -212,7 +212,6 @@ decorate!(
"column_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Column>>;
"column_create_or_get_many_unchecked" = create_or_get_many_unchecked(&mut self, table_id: TableId, columns: HashMap<&str, ColumnType>) -> Result<Vec<Column>>;
"column_list" = list(&mut self) -> Result<Vec<Column>>;
"column_list_type_count_by_table_id" = list_type_count_by_table_id(&mut self, table_id: TableId) -> Result<Vec<ColumnTypeCount>>;
]
);
@ -232,16 +231,13 @@ decorate!(
methods = [
"partition_create_or_get" = create_or_get(&mut self, key: PartitionKey, shard_id: ShardId, table_id: TableId) -> Result<Partition>;
"partition_get_by_id" = get_by_id(&mut self, partition_id: PartitionId) -> Result<Option<Partition>>;
"partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>>;
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
"partition_list_ids" = list_ids(&mut self) -> Result<Vec<PartitionId>>;
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: PartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str]) -> Result<Partition, CasFailure<Vec<String>>>;
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
"partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>;
"partition_most_recent_n" = most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>>;
"partition_most_recent_n_in_shards" = most_recent_n_in_shards(&mut self, n: usize, shards: &[ShardId]) -> Result<Vec<Partition>>;
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp, max_num_partitions: usize) -> Result<Vec<PartitionParam>>;
"partitions_new_file_between" = partitions_new_file_between(&mut self, minimum_time: Timestamp, maximum_time: Option<Timestamp>) -> Result<Vec<PartitionId>>;
"get_in_skipped_compaction" = get_in_skipped_compaction(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
@ -254,23 +250,14 @@ decorate!(
"parquet_create" = create( &mut self, parquet_file_params: ParquetFileParams) -> Result<ParquetFile>;
"parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>;
"parquet_flag_for_delete_by_retention" = flag_for_delete_by_retention(&mut self) -> Result<Vec<ParquetFileId>>;
"parquet_list_by_shard_greater_than" = list_by_shard_greater_than(&mut self, shard_id: ShardId, sequence_number: SequenceNumber) -> Result<Vec<ParquetFile>>;
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table" = list_by_table(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
"parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_delete_old_ids_only" = delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>>;
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
"parquet_level_0" = level_0(&mut self, shard_id: ShardId) -> Result<Vec<ParquetFile>>;
"parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result<Vec<ParquetFile>>;
"parquet_update_compaction_level" = update_compaction_level(&mut self, parquet_file_ids: &[ParquetFileId], compaction_level: CompactionLevel) -> Result<Vec<ParquetFileId>>;
"parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result<bool>;
"parquet_count" = count(&mut self) -> Result<i64>;
"parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result<i64>;
"parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result<i64>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"parquet_partitions_with_small_l1_file_count" = partitions_with_small_l1_file_count(&mut self, shard_id: Option<ShardId>, small_size_threshold_bytes: i64, min_small_file_count: usize, num_partitions: usize) -> Result<Vec<PartitionParam>>;
"most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: Option<ShardId>, time_in_the_past: Timestamp, num_partitions: usize) -> Result<Vec<PartitionParam>>;
]
);

View File

@ -12,11 +12,10 @@ use crate::{
};
use async_trait::async_trait;
use data_types::{
Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table,
TableId, TablePartition, Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID,
TRANSITION_SHARD_INDEX,
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, QueryPool,
QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId,
Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, info, warn};
@ -1090,21 +1089,6 @@ RETURNING *;
Ok(out)
}
async fn list_type_count_by_table_id(
&mut self,
table_id: TableId,
) -> Result<Vec<ColumnTypeCount>> {
sqlx::query_as::<_, ColumnTypeCount>(
r#"
select column_type as col_type, count(1) from column_name where table_id = $1 group by 1;
"#,
)
.bind(table_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]
@ -1265,21 +1249,6 @@ RETURNING *;
Ok(Some(partition))
}
async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>> {
sqlx::query_as::<_, Partition>(
r#"
SELECT partition.*
FROM table_name
INNER JOIN partition on partition.table_id = table_name.id
WHERE table_name.namespace_id = $1;
"#,
)
.bind(namespace_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
sqlx::query_as::<_, Partition>(
r#"
@ -1459,27 +1428,6 @@ RETURNING *
.context(interface::CouldNotDeleteSkippedCompactionsSnafu)
}
async fn update_persisted_sequence_number(
&mut self,
partition_id: PartitionId,
sequence_number: SequenceNumber,
) -> Result<()> {
let _ = sqlx::query(
r#"
UPDATE partition
SET persisted_sequence_number = $1
WHERE id = $2;
"#,
)
.bind(sequence_number.get()) // $1
.bind(partition_id) // $2
.execute(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
}
async fn most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>> {
sqlx::query_as(r#"SELECT * FROM partition ORDER BY id DESC LIMIT $1;"#)
.bind(n as i64) // $1
@ -1488,21 +1436,6 @@ WHERE id = $2;
.map_err(|e| Error::SqlxError { source: e })
}
async fn most_recent_n_in_shards(
&mut self,
n: usize,
shards: &[ShardId],
) -> Result<Vec<Partition>> {
sqlx::query_as(
r#"SELECT * FROM partition WHERE shard_id IN (SELECT UNNEST($1)) ORDER BY id DESC LIMIT $2;"#,
)
.bind(shards.iter().map(|v| v.get()).collect::<Vec<_>>())
.bind(n as i64)
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
@ -1646,31 +1579,6 @@ RETURNING *;
Ok(flagged)
}
async fn list_by_shard_greater_than(
&mut self,
shard_id: ShardId,
sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE shard_id = $1
AND max_sequence_number > $2
ORDER BY id;
"#,
)
.bind(shard_id) // $1
.bind(sequence_number) // $2
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_namespace_not_to_delete(
&mut self,
namespace_id: NamespaceId,
@ -1733,20 +1641,6 @@ WHERE table_id = $1;
.map_err(|e| Error::SqlxError { source: e })
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(
r#"
DELETE FROM parquet_file
WHERE to_delete < $1
RETURNING *;
"#,
)
.bind(older_than) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>> {
// see https://www.crunchydata.com/blog/simulating-update-or-delete-with-limit-in-postgres-ctes-to-the-rescue
let deleted = sqlx::query(
@ -1772,239 +1666,6 @@ RETURNING id;
Ok(deleted)
}
async fn level_0(&mut self, shard_id: ShardId) -> Result<Vec<ParquetFile>> {
// this intentionally limits the returned files to 10,000 as it is used to make
// a decision on the highest priority partitions. If compaction has never been
// run this could end up returning millions of results and taking too long to run.
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE parquet_file.shard_id = $1
AND parquet_file.compaction_level = $2
AND parquet_file.to_delete IS NULL
LIMIT 1000;
"#,
)
.bind(shard_id) // $1
.bind(CompactionLevel::Initial) // $2
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn level_1(
&mut self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE parquet_file.shard_id = $1
AND parquet_file.table_id = $2
AND parquet_file.partition_id = $3
AND parquet_file.compaction_level = $4
AND parquet_file.to_delete IS NULL
AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $5)
OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $6));
"#,
)
.bind(table_partition.shard_id) // $1
.bind(table_partition.table_id) // $2
.bind(table_partition.partition_id) // $3
.bind(CompactionLevel::FileNonOverlapped) // $4
.bind(min_time) // $5
.bind(max_time) // $6
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn recent_highest_throughput_partitions(
&mut self,
shard_id: Option<ShardId>,
time_in_the_past: Timestamp,
min_num_files: usize,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let min_num_files = min_num_files as i32;
let num_partitions = num_partitions as i32;
match shard_id {
Some(shard_id) => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id,
parquet_file.namespace_id, count(parquet_file.id)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE compaction_level = $5
AND to_delete is null
AND shard_id = $1
AND created_at > $2
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(id) >= $3
ORDER BY 5 DESC
LIMIT $4;
"#,
)
.bind(shard_id) // $1
.bind(time_in_the_past) //$2
.bind(min_num_files) // $3
.bind(num_partitions) // $4
.bind(CompactionLevel::Initial) // $5
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
None => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id,
parquet_file.namespace_id, count(parquet_file.id)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE compaction_level = $4
AND to_delete is null
AND created_at > $1
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(id) >= $2
ORDER BY 5 DESC
LIMIT $3;
"#,
)
.bind(time_in_the_past) //$1
.bind(min_num_files) // $2
.bind(num_partitions) // $3
.bind(CompactionLevel::Initial) // $4
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
}
async fn most_cold_files_partitions(
&mut self,
shard_id: Option<ShardId>,
time_in_the_past: Timestamp,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let num_partitions = num_partitions as i32;
// This query returns partitions with most L0+L1 files and all L0 files (both deleted and
// non deleted) are either created before the given time ($2) or not available (removed by
// garbage collector)
match shard_id {
Some(shard_id) => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id,
count(case when to_delete is null then 1 end) total_count,
max(case when compaction_level= $4 then parquet_file.created_at end)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE (compaction_level = $4 OR compaction_level = $5)
AND shard_id = $1
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(case when to_delete is null then 1 end) > 0
AND ( max(case when compaction_level= $4 then parquet_file.created_at end) < $2 OR
max(case when compaction_level= $4 then parquet_file.created_at end) is null)
ORDER BY total_count DESC
LIMIT $3;
"#,
)
.bind(shard_id) // $1
.bind(time_in_the_past) // $2
.bind(num_partitions) // $3
.bind(CompactionLevel::Initial) // $4
.bind(CompactionLevel::FileNonOverlapped) // $5
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
None => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id,
count(case when to_delete is null then 1 end) total_count,
max(case when compaction_level= $4 then parquet_file.created_at end)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE (compaction_level = $3 OR compaction_level = $4)
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(case when to_delete is null then 1 end) > 0
AND ( max(case when compaction_level= $3 then parquet_file.created_at end) < $1 OR
max(case when compaction_level= $3 then parquet_file.created_at end) is null)
ORDER BY total_count DESC
LIMIT $2;
"#,
)
.bind(time_in_the_past) // $1
.bind(num_partitions) // $2
.bind(CompactionLevel::Initial) // $3
.bind(CompactionLevel::FileNonOverlapped) // $4
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
}
async fn partitions_with_small_l1_file_count(
&mut self,
shard_id: Option<ShardId>,
small_size_threshold_bytes: i64,
min_small_file_count: usize,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
// This query returns partitions with at least `min_small_file_count` small L1 files,
// where "small" means no bigger than `small_size_threshold_bytes`, limited to the top `num_partitions`.
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id,
COUNT(1) AS l1_file_count
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE compaction_level = $5
AND to_delete IS NULL
AND shard_id = $1
AND skipped_compactions.partition_id IS NULL
AND file_size_bytes < $3
GROUP BY 1, 2, 3, 4
HAVING COUNT(1) >= $2
ORDER BY l1_file_count DESC
LIMIT $4;
"#,
)
.bind(shard_id) // $1
.bind(min_small_file_count as i32) // $2
.bind(small_size_threshold_bytes) // $3
.bind(num_partitions as i32) // $4
.bind(CompactionLevel::FileNonOverlapped) // $5
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
@ -2075,71 +1736,6 @@ RETURNING id;
Ok(read_result.count)
}
async fn count_by_overlaps_with_level_0(
&mut self,
table_id: TableId,
shard_id: ShardId,
min_time: Timestamp,
max_time: Timestamp,
sequence_number: SequenceNumber,
) -> Result<i64> {
let read_result = sqlx::query_as::<_, Count>(
r#"
SELECT count(1) as count
FROM parquet_file
WHERE table_id = $1
AND shard_id = $2
AND max_sequence_number < $3
AND parquet_file.to_delete IS NULL
AND compaction_level = $6
AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4)
OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5));
"#,
)
.bind(table_id) // $1
.bind(shard_id) // $2
.bind(sequence_number) // $3
.bind(min_time) // $4
.bind(max_time) // $5
.bind(CompactionLevel::Initial) // $6
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.count)
}
async fn count_by_overlaps_with_level_1(
&mut self,
table_id: TableId,
shard_id: ShardId,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<i64> {
let read_result = sqlx::query_as::<_, Count>(
r#"
SELECT count(1) as count
FROM parquet_file
WHERE table_id = $1
AND shard_id = $2
AND parquet_file.to_delete IS NULL
AND compaction_level = $5
AND ((parquet_file.min_time <= $3 AND parquet_file.max_time >= $3)
OR (parquet_file.min_time > $3 AND parquet_file.min_time <= $4));
"#,
)
.bind(table_id) // $1
.bind(shard_id) // $2
.bind(min_time) // $3
.bind(max_time) // $4
.bind(CompactionLevel::FileNonOverlapped) // $5
.fetch_one(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.count)
}
async fn get_by_object_store_id(
&mut self,
object_store_id: Uuid,
@ -2908,7 +2504,7 @@ mod tests {
.repositories()
.await
.parquet_files()
.delete_old(now)
.delete_old_ids_only(now)
.await
.expect("parquet file deletion should succeed");
let total_file_size_bytes: i64 =

View File

@ -12,11 +12,10 @@ use crate::{
};
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnSet, ColumnType, ColumnTypeCount, CompactionLevel, Namespace,
NamespaceId, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId,
PartitionKey, PartitionParam, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId,
ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, TopicId,
TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table,
TableId, Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use serde::{Deserialize, Serialize};
use std::ops::Deref;
@ -870,21 +869,6 @@ RETURNING *;
Ok(out)
}
async fn list_type_count_by_table_id(
&mut self,
table_id: TableId,
) -> Result<Vec<ColumnTypeCount>> {
sqlx::query_as::<_, ColumnTypeCount>(
r#"
select column_type as col_type, count(1) AS count from column_name where table_id = $1 group by 1;
"#,
)
.bind(table_id) // $1
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
#[async_trait]
@ -1073,24 +1057,6 @@ RETURNING *;
Ok(Some(partition.into()))
}
async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result<Vec<Partition>> {
Ok(sqlx::query_as::<_, PartitionPod>(
r#"
SELECT partition.*
FROM table_name
INNER JOIN partition on partition.table_id = table_name.id
WHERE table_name.namespace_id = $1;
"#,
)
.bind(namespace_id) // $1
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>> {
Ok(sqlx::query_as::<_, PartitionPod>(
r#"
@ -1274,27 +1240,6 @@ RETURNING *
.context(interface::CouldNotDeleteSkippedCompactionsSnafu)
}
async fn update_persisted_sequence_number(
&mut self,
partition_id: PartitionId,
sequence_number: SequenceNumber,
) -> Result<()> {
let _ = sqlx::query(
r#"
UPDATE partition
SET persisted_sequence_number = $1
WHERE id = $2;
"#,
)
.bind(sequence_number.get()) // $1
.bind(partition_id) // $2
.execute(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(())
}
async fn most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>> {
Ok(sqlx::query_as::<_, PartitionPod>(
r#"SELECT * FROM partition ORDER BY id DESC LIMIT $1;"#,
@ -1308,24 +1253,6 @@ WHERE id = $2;
.collect())
}
async fn most_recent_n_in_shards(
&mut self,
n: usize,
shards: &[ShardId],
) -> Result<Vec<Partition>> {
Ok(sqlx::query_as::<_, PartitionPod>(
r#"SELECT * FROM partition WHERE shard_id IN (SELECT value FROM json_each($1)) ORDER BY id DESC LIMIT $2;"#,
)
.bind(&Json(shards.iter().map(|v| v.get()).collect::<Vec<_>>()))
.bind(n as i64)
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
@ -1520,34 +1447,6 @@ RETURNING *;
Ok(flagged)
}
async fn list_by_shard_greater_than(
&mut self,
shard_id: ShardId,
sequence_number: SequenceNumber,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
Ok(sqlx::query_as::<_, ParquetFilePod>(
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE shard_id = $1
AND max_sequence_number > $2
ORDER BY id;
"#,
)
.bind(shard_id) // $1
.bind(sequence_number) // $2
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn list_by_namespace_not_to_delete(
&mut self,
namespace_id: NamespaceId,
@ -1619,23 +1518,6 @@ WHERE table_id = $1;
.collect())
}
async fn delete_old(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFile>> {
Ok(sqlx::query_as::<_, ParquetFilePod>(
r#"
DELETE FROM parquet_file
WHERE to_delete < $1
RETURNING *;
"#,
)
.bind(older_than) // $1
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>> {
// see https://www.crunchydata.com/blog/simulating-update-or-delete-with-limit-in-sqlite-ctes-to-the-rescue
let deleted = sqlx::query(
@ -1661,245 +1543,6 @@ RETURNING id;
Ok(deleted)
}
async fn level_0(&mut self, shard_id: ShardId) -> Result<Vec<ParquetFile>> {
// this intentionally limits the returned files to 10,000 as it is used to make
// a decision on the highest priority partitions. If compaction has never been
// run this could end up returning millions of results and taking too long to run.
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
Ok(sqlx::query_as::<_, ParquetFilePod>(
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE parquet_file.shard_id = $1
AND parquet_file.compaction_level = $2
AND parquet_file.to_delete IS NULL
LIMIT 1000;
"#,
)
.bind(shard_id) // $1
.bind(CompactionLevel::Initial) // $2
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn level_1(
&mut self,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<Vec<ParquetFile>> {
// Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large
// `parquet_metadata` column!!
Ok(sqlx::query_as::<_, ParquetFilePod>(
r#"
SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id,
max_sequence_number, min_time, max_time, to_delete, file_size_bytes,
row_count, compaction_level, created_at, column_set, max_l0_created_at
FROM parquet_file
WHERE parquet_file.shard_id = $1
AND parquet_file.table_id = $2
AND parquet_file.partition_id = $3
AND parquet_file.compaction_level = $4
AND parquet_file.to_delete IS NULL
AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $5)
OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $6));
"#,
)
.bind(table_partition.shard_id) // $1
.bind(table_partition.table_id) // $2
.bind(table_partition.partition_id) // $3
.bind(CompactionLevel::FileNonOverlapped) // $4
.bind(min_time) // $5
.bind(max_time) // $6
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn recent_highest_throughput_partitions(
&mut self,
shard_id: Option<ShardId>,
time_in_the_past: Timestamp,
min_num_files: usize,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let min_num_files = min_num_files as i32;
let num_partitions = num_partitions as i32;
match shard_id {
Some(shard_id) => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id,
parquet_file.namespace_id, count(parquet_file.id)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE compaction_level = $5
AND to_delete is null
AND shard_id = $1
AND created_at > $2
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(id) >= $3
ORDER BY 5 DESC
LIMIT $4;
"#,
)
.bind(shard_id) // $1
.bind(time_in_the_past) //$2
.bind(min_num_files) // $3
.bind(num_partitions) // $4
.bind(CompactionLevel::Initial) // $5
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })
}
None => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id,
parquet_file.namespace_id, count(parquet_file.id)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE compaction_level = $4
AND to_delete is null
AND created_at > $1
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(id) >= $2
ORDER BY 5 DESC
LIMIT $3;
"#,
)
.bind(time_in_the_past) //$1
.bind(min_num_files) // $2
.bind(num_partitions) // $3
.bind(CompactionLevel::Initial) // $4
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
}
async fn partitions_with_small_l1_file_count(
&mut self,
shard_id: Option<ShardId>,
small_size_threshold_bytes: i64,
min_small_file_count: usize,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
// This query returns partitions with at least `min_small_file_count` small L1 files,
// where "small" means no bigger than `small_size_threshold_bytes`, limited to the top `num_partitions`.
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id,
COUNT(1) AS l1_file_count
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE compaction_level = $5
AND to_delete IS NULL
AND shard_id = $1
AND skipped_compactions.partition_id IS NULL
AND file_size_bytes < $3
GROUP BY 1, 2, 3, 4
HAVING COUNT(1) >= $2
ORDER BY l1_file_count DESC
LIMIT $4;
"#,
)
.bind(shard_id) // $1
.bind(min_small_file_count as i32) // $2
.bind(small_size_threshold_bytes) // $3
.bind(num_partitions as i32) // $4
.bind(CompactionLevel::FileNonOverlapped) // $5
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn most_cold_files_partitions(
&mut self,
shard_id: Option<ShardId>,
time_in_the_past: Timestamp,
num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let num_partitions = num_partitions as i32;
// This query returns partitions with most L0+L1 files and all L0 files (both deleted and
// non deleted) are either created before the given time ($2) or not available (removed by
// garbage collector)
match shard_id {
Some(shard_id) => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id,
count(case when to_delete is null then 1 end) total_count,
max(case when compaction_level= $4 then parquet_file.created_at end)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE (compaction_level = $4 OR compaction_level = $5)
AND shard_id = $1
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(case when to_delete is null then 1 end) > 0
AND ( max(case when compaction_level= $4 then parquet_file.created_at end) < $2 OR
max(case when compaction_level= $4 then parquet_file.created_at end) is null)
ORDER BY total_count DESC
LIMIT $3;
"#,
)
.bind(shard_id) // $1
.bind(time_in_the_past) // $2
.bind(num_partitions) // $3
.bind(CompactionLevel::Initial) // $4
.bind(CompactionLevel::FileNonOverlapped) // $5
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })
}
None => {
sqlx::query_as::<_, PartitionParam>(
r#"
SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id,
parquet_file.table_id,
count(case when to_delete is null then 1 end) total_count,
max(case when compaction_level= $4 then parquet_file.created_at end)
FROM parquet_file
LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id
WHERE (compaction_level = $3 OR compaction_level = $4)
AND skipped_compactions.partition_id IS NULL
GROUP BY 1, 2, 3, 4
HAVING count(case when to_delete is null then 1 end) > 0
AND ( max(case when compaction_level= $3 then parquet_file.created_at end) < $1 OR
max(case when compaction_level= $3 then parquet_file.created_at end) is null)
ORDER BY total_count DESC
LIMIT $2;
"#,
)
.bind(time_in_the_past) // $1
.bind(num_partitions) // $2
.bind(CompactionLevel::Initial) // $3
.bind(CompactionLevel::FileNonOverlapped) // $4
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })
}
}
}
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
@ -1973,71 +1616,6 @@ RETURNING id;
Ok(read_result.count)
}
async fn count_by_overlaps_with_level_0(
&mut self,
table_id: TableId,
shard_id: ShardId,
min_time: Timestamp,
max_time: Timestamp,
sequence_number: SequenceNumber,
) -> Result<i64> {
let read_result = sqlx::query_as::<_, Count>(
r#"
SELECT count(1) as count
FROM parquet_file
WHERE table_id = $1
AND shard_id = $2
AND max_sequence_number < $3
AND parquet_file.to_delete IS NULL
AND compaction_level = $6
AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4)
OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5));
"#,
)
.bind(table_id) // $1
.bind(shard_id) // $2
.bind(sequence_number) // $3
.bind(min_time) // $4
.bind(max_time) // $5
.bind(CompactionLevel::Initial) // $6
.fetch_one(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.count)
}
async fn count_by_overlaps_with_level_1(
&mut self,
table_id: TableId,
shard_id: ShardId,
min_time: Timestamp,
max_time: Timestamp,
) -> Result<i64> {
let read_result = sqlx::query_as::<_, Count>(
r#"
SELECT count(1) as count
FROM parquet_file
WHERE table_id = $1
AND shard_id = $2
AND parquet_file.to_delete IS NULL
AND compaction_level = $5
AND ((parquet_file.min_time <= $3 AND parquet_file.max_time >= $3)
OR (parquet_file.min_time > $3 AND parquet_file.min_time <= $4));
"#,
)
.bind(table_id) // $1
.bind(shard_id) // $2
.bind(min_time) // $3
.bind(max_time) // $4
.bind(CompactionLevel::FileNonOverlapped) // $5
.fetch_one(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?;
Ok(read_result.count)
}
async fn get_by_object_store_id(
&mut self,
object_store_id: Uuid,
@ -2544,7 +2122,7 @@ mod tests {
.repositories()
.await
.parquet_files()
.delete_old(now)
.delete_old_ids_only(now)
.await
.expect("parquet file deletion should succeed");
let total_file_size_bytes: i64 =

View File

@ -6,8 +6,8 @@ use arrow::{
};
use data_types::{
Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceSchema, ParquetFile,
ParquetFileParams, Partition, PartitionId, QueryPool, SequenceNumber, Shard, ShardId,
ShardIndex, Table, TableId, TablePartition, TableSchema, Timestamp, TopicMetadata,
ParquetFileParams, Partition, PartitionId, QueryPool, SequenceNumber, Shard, ShardIndex, Table,
TableId, TableSchema, Timestamp, TopicMetadata,
};
use datafusion::physical_plan::metrics::Count;
use datafusion_util::MemoryStream;
@ -185,48 +185,6 @@ impl TestCatalog {
.await
}
/// List level 0 files
pub async fn list_level_0_files(self: &Arc<Self>, shard_id: ShardId) -> Vec<ParquetFile> {
self.catalog
.repositories()
.await
.parquet_files()
.level_0(shard_id)
.await
.unwrap()
}
/// Count level 0 files
pub async fn count_level_0_files(self: &Arc<Self>, shard_id: ShardId) -> usize {
let level_0 = self
.catalog
.repositories()
.await
.parquet_files()
.level_0(shard_id)
.await
.unwrap();
level_0.len()
}
/// Count level 1 files
pub async fn count_level_1_files(
self: &Arc<Self>,
table_partition: TablePartition,
min_time: Timestamp,
max_time: Timestamp,
) -> usize {
let level_1 = self
.catalog
.repositories()
.await
.parquet_files()
.level_1(table_partition, min_time, max_time)
.await
.unwrap();
level_1.len()
}
/// List all non-deleted files
pub async fn list_by_table_not_to_delete(
self: &Arc<Self>,