Merge pull request #7612 from influxdata/cn/remove-partitions_with_recent_created_files

fix: Remove unused partitions_with_recent_created_files method
pull/24376/head
kodiakhq[bot] 2023-04-20 19:09:38 +00:00 committed by GitHub
commit 8cce0baab1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 256 deletions

View File

@ -945,20 +945,6 @@ impl Partition {
}
}
/// Data for a partition chosen from its parquet files
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::FromRow)]
pub struct PartitionParam {
/// the partition
pub partition_id: PartitionId,
// Remove this shard_id: https://github.com/influxdata/influxdb_iox/issues/6518
/// the partition's shard
pub shard_id: ShardId,
/// the partition's namespace
pub namespace_id: NamespaceId,
/// the partition's table
pub table_id: TableId,
}
/// Data recorded when compaction skips a partition.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::FromRow)]
pub struct SkippedCompaction {

View File

@ -3,9 +3,9 @@
use async_trait::async_trait;
use data_types::{
Column, ColumnSchema, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceSchema,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey,
PartitionParam, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex,
SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId, TopicMetadata,
ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool,
QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId,
TableSchema, Timestamp, TopicId, TopicMetadata,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -535,15 +535,6 @@ pub trait PartitionRepo: Send + Sync {
/// Return the N most recently created partitions.
async fn most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>>;
/// Select partition for cold/warm/hot compaction
/// These are partitions with files created recently (aka created after the specified time_in_the_past)
/// These files include all levels of compaction files
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
max_num_partitions: usize,
) -> Result<Vec<PartitionParam>>;
/// Select partitions with a `new_file_at` value greater than the minimum time value and, if specified, less than
/// the maximum time value. Both range ends are exclusive; a timestamp exactly equal to either end will _not_ be
/// included in the results.
@ -854,7 +845,7 @@ pub(crate) mod test_helpers {
{
test_setup(clean_state().await).await;
test_namespace_soft_deletion(clean_state().await).await;
test_partitions_with_recent_created_files(clean_state().await).await;
test_partitions_new_file_between(clean_state().await).await;
test_query_pool(clean_state().await).await;
test_column(clean_state().await).await;
test_partition(clean_state().await).await;
@ -2390,32 +2381,26 @@ pub(crate) mod test_helpers {
assert_eq!(ids, vec![parquet_file_2.id]);
}
async fn test_partitions_with_recent_created_files(catalog: Arc<dyn Catalog>) {
let max_num_partition = 100;
async fn test_partitions_new_file_between(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos
.topics()
.create_or_get("recent_created_files")
.create_or_get("new_file_between")
.await
.unwrap();
let pool = repos
.query_pools()
.create_or_get("recent_created_files")
.create_or_get("new_file_between")
.await
.unwrap();
let namespace = repos
.namespaces()
.create(
"test_partitions_with_recent_created_files",
None,
topic.id,
pool.id,
)
.create("test_partitions_new_file_between", None, topic.id, pool.id)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("test_table_for_recent_created_files", namespace.id)
.create_or_get("test_table_for_new_file_between", namespace.id)
.await
.unwrap();
let shard = repos
@ -2432,11 +2417,10 @@ pub(crate) mod test_helpers {
let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5));
let time_six_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(6));
// Db has no partition
// get from partition table
// Db has no partitions
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.partitions_new_file_between(time_two_hour_ago, None)
.await
.unwrap();
assert!(partitions.is_empty());
@ -2449,16 +2433,14 @@ pub(crate) mod test_helpers {
.create_or_get("one".into(), shard.id, table.id)
.await
.unwrap();
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.partitions_new_file_between(time_two_hour_ago, None)
.await
.unwrap();
assert!(partitions.is_empty());
// create files for partition one
let parquet_file_params = ParquetFileParams {
shard_id: shard.id,
namespace_id: namespace.id,
@ -2487,14 +2469,6 @@ pub(crate) mod test_helpers {
.flag_for_delete(delete_l0_file.id)
.await
.unwrap();
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert!(partitions.is_empty());
// read from partition table only
let partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
@ -2514,7 +2488,7 @@ pub(crate) mod test_helpers {
.unwrap();
assert!(partitions.is_empty());
// create a deleted L0 file that was created 1 hour ago which is recently
// create a deleted L0 file that was created 1 hour ago
let l0_one_hour_ago_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_one_hour_ago,
@ -2526,15 +2500,6 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// partition one should be returned
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
@ -2572,15 +2537,6 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// should return partition one only
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
@ -2596,7 +2552,7 @@ pub(crate) mod test_helpers {
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition1.id);
// Add a L0 file created non-recently (5 hours ago)
// Add a L0 file created 5 hours ago
let l0_five_hour_ago_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_five_hour_ago,
@ -2609,15 +2565,6 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// still return partition one only
// get from partition table
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].partition_id, partition1.id);
// read from partition table only
let partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
@ -2648,7 +2595,7 @@ pub(crate) mod test_helpers {
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0], partition2.id);
// Add a L1 created recently (just now)
// Add an L1 file created just now
let l1_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_now,
@ -2662,29 +2609,17 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// should return both partitions
// get from partition table
let mut partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let mut partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// Only return partition1: the creation time must be strictly less than the maximum time, not equal
// Only return partition1: the creation time must be strictly less than the maximum time,
// not equal
let mut partitions = repos
.partitions()
.partitions_new_file_between(time_three_hour_ago, Some(time_now))
@ -2710,29 +2645,17 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// should return partition one and two only
// get from partition table
let mut partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let mut partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// Only return partition1: the creation time must be strictly less than the maximum time, not equal
// Only return partition1: the creation time must be strictly less than the maximum time,
// not equal
let partitions = repos
.partitions()
.partitions_new_file_between(time_three_hour_ago, Some(time_now))
@ -2758,8 +2681,8 @@ pub(crate) mod test_helpers {
.unwrap();
assert!(partitions.is_empty());
// Add a L2 created recently (just now) for partition three
// Since it is L2, the partition won't get updated
// Add an L2 file created just now for partition three
// Since the file is L2, the partition won't get updated
let l2_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_now,
@ -2773,29 +2696,17 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// still should return partition one and two only
// get from partition table
let mut partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
// read from partition table only
let mut partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
// sort by partition id
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
// Only return partition1: the creation time must be strictly less than the maximum time, not equal
// Only return partition1: the creation time must be strictly less than the maximum time,
// not equal
let partitions = repos
.partitions()
.partitions_new_file_between(time_three_hour_ago, Some(time_now))
@ -2811,7 +2722,7 @@ pub(crate) mod test_helpers {
.unwrap();
assert!(partitions.is_empty());
// add an L0 file created recently (one hour ago) for partition three
// add an L0 file created one hour ago for partition three
let l0_one_hour_ago_file_params = ParquetFileParams {
object_store_id: Uuid::new_v4(),
created_at: time_one_hour_ago,
@ -2824,26 +2735,12 @@ pub(crate) mod test_helpers {
.await
.unwrap();
// should return all partitions
// get from partition table
let mut partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, max_num_partition)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// sort by partition id
partitions.sort_by(|a, b| a.partition_id.cmp(&b.partition_id));
assert_eq!(partitions[0].partition_id, partition1.id);
assert_eq!(partitions[1].partition_id, partition2.id);
assert_eq!(partitions[2].partition_id, partition3.id);
// read from partition table only
let mut partitions = repos
.partitions()
.partitions_new_file_between(time_two_hour_ago, None)
.await
.unwrap();
assert_eq!(partitions.len(), 3);
// sort by partition id
partitions.sort();
assert_eq!(partitions[0], partition1.id);
assert_eq!(partitions[1], partition2.id);
@ -2865,33 +2762,6 @@ pub(crate) mod test_helpers {
.await
.unwrap();
assert!(partitions.is_empty());
// Limit max num partition
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, 2)
.await
.unwrap();
assert_eq!(partitions.len(), 2);
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, 1)
.await
.unwrap();
assert_eq!(partitions.len(), 1);
let partitions = repos
.partitions()
.partitions_with_recent_created_files(time_two_hour_ago, 0)
.await
.unwrap();
assert_eq!(partitions.len(), 0);
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.soft_delete("test_partitions_with_recent_created_files")
.await
.expect("delete namespace should succeed");
}
async fn test_list_by_partiton_not_to_delete(catalog: Arc<dyn Catalog>) {

View File

@ -13,9 +13,9 @@ use crate::{
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table,
TableId, Timestamp, TopicId, TopicMetadata,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, Timestamp,
TopicId, TopicMetadata,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::warn;
@ -904,39 +904,6 @@ impl PartitionRepo for MemTxn {
Ok(stage.partitions.iter().rev().take(n).cloned().collect())
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
max_num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
let stage = self.stage();
let partitions: Vec<_> = stage
.partitions
.iter()
.filter(|p| p.new_file_at > Some(time_in_the_past))
.map(|p| {
// get namesapce_id of this partition
let namespace_id = stage
.tables
.iter()
.find(|t| t.id == p.table_id)
.map(|t| t.namespace_id)
.unwrap_or(NamespaceId::new(1));
PartitionParam {
partition_id: p.id,
table_id: p.table_id,
shard_id: ShardId::new(1), // this is unused and will be removed when we remove shard_id
namespace_id,
}
})
.take(max_num_partitions)
.collect();
Ok(partitions)
}
async fn partitions_new_file_between(
&mut self,
minimum_time: Timestamp,

View File

@ -8,9 +8,9 @@ use crate::interface::{
use async_trait::async_trait;
use data_types::{
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, QueryPool,
QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId,
Timestamp, TopicId, TopicMetadata,
ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, Timestamp,
TopicId, TopicMetadata,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -238,7 +238,6 @@ decorate!(
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
"partition_most_recent_n" = most_recent_n(&mut self, n: usize) -> Result<Vec<Partition>>;
"partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp, max_num_partitions: usize) -> Result<Vec<PartitionParam>>;
"partitions_new_file_between" = partitions_new_file_between(&mut self, minimum_time: Timestamp, maximum_time: Option<Timestamp>) -> Result<Vec<PartitionId>>;
"get_in_skipped_compaction" = get_in_skipped_compaction(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
]

View File

@ -13,9 +13,9 @@ use crate::{
use async_trait::async_trait;
use data_types::{
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, QueryPool,
QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId,
Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, Timestamp,
TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, info, warn};
@ -1436,27 +1436,6 @@ RETURNING *
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
max_num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
sqlx::query_as(
r#"
SELECT p.id as partition_id, p.table_id, t.namespace_id, p.shard_id
FROM partition p, table_name t
WHERE p.new_file_at > $1
AND p.table_id = t.id
LIMIT $2;
"#,
)
.bind(time_in_the_past) // $1
.bind(max_num_partitions as i64) // $2
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_new_file_between(
&mut self,
minimum_time: Timestamp,

View File

@ -13,9 +13,9 @@ use crate::{
use async_trait::async_trait;
use data_types::{
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam,
QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table,
TableId, Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, QueryPool, QueryPoolId,
SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, Timestamp,
TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX,
};
use serde::{Deserialize, Serialize};
use std::ops::Deref;
@ -1253,27 +1253,6 @@ RETURNING *
.collect())
}
async fn partitions_with_recent_created_files(
&mut self,
time_in_the_past: Timestamp,
max_num_partitions: usize,
) -> Result<Vec<PartitionParam>> {
sqlx::query_as(
r#"
SELECT p.id as partition_id, p.table_id, t.namespace_id, p.shard_id
FROM partition p, table_name t
WHERE p.new_file_at > $1
AND p.table_id = t.id
LIMIT $2;
"#,
)
.bind(time_in_the_past) // $1
.bind(max_num_partitions as i64) // $2
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn partitions_new_file_between(
&mut self,
minimum_time: Timestamp,