diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 716f578e5a..5b2bdb24f3 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -291,29 +291,6 @@ impl std::fmt::Display for PartitionId { } } -/// Combination of Shard ID, Table ID, and Partition ID useful for identifying groups of -/// Parquet files to be compacted together. -#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord)] -pub struct TablePartition { - /// The shard ID - pub shard_id: ShardId, - /// The table ID - pub table_id: TableId, - /// The partition ID - pub partition_id: PartitionId, -} - -impl TablePartition { - /// Combine the relevant parts - pub fn new(shard_id: ShardId, table_id: TableId, partition_id: PartitionId) -> Self { - Self { - shard_id, - table_id, - partition_id, - } - } -} - /// A sequence number from a `router::Shard` (kafka partition) #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)] #[sqlx(transparent)] @@ -1003,22 +980,6 @@ pub struct SkippedCompaction { pub limit_num_files_first_in_partition: i64, } -/// Map of a column type to its count -#[derive(Debug, Copy, Clone, PartialEq, Eq, sqlx::FromRow)] -pub struct ColumnTypeCount { - /// column type - pub col_type: ColumnType, - /// count of the column type - pub count: i64, -} - -impl ColumnTypeCount { - /// make a new ColumnTypeCount - pub fn new(col_type: ColumnType, count: i64) -> Self { - Self { col_type, count } - } -} - /// Set of columns. #[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] #[sqlx(transparent)] diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 5456b1da41..b7a1c6e78b 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -2,11 +2,10 @@ use async_trait::async_trait; use data_types::{ - Column, ColumnSchema, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, - NamespaceSchema, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, - PartitionKey, PartitionParam, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, - ShardIndex, SkippedCompaction, Table, TableId, TablePartition, TableSchema, Timestamp, TopicId, - TopicMetadata, + Column, ColumnSchema, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceSchema, + ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, + PartitionParam, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, + SkippedCompaction, Table, TableId, TableSchema, Timestamp, TopicId, TopicMetadata, }; use iox_time::TimeProvider; use snafu::{OptionExt, Snafu}; @@ -432,12 +431,6 @@ pub trait ColumnRepo: Send + Sync { /// List all columns. async fn list(&mut self) -> Result>; - - /// List column types and their count for a table - async fn list_type_count_by_table_id( - &mut self, - table_id: TableId, - ) -> Result>; } /// Functions for working with shards in the catalog @@ -486,9 +479,6 @@ pub trait PartitionRepo: Send + Sync { /// get partition by ID async fn get_by_id(&mut self, partition_id: PartitionId) -> Result>; - /// return partitions for a given namespace - async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result>; - /// return the partitions by table id async fn list_by_table_id(&mut self, table_id: TableId) -> Result>; @@ -542,26 +532,9 @@ pub trait PartitionRepo: Send + Sync { partition_id: PartitionId, ) -> Result>; - /// Update the per-partition persistence watermark. - /// - /// The given `sequence_number` is the inclusive maximum [`SequenceNumber`] - /// of the most recently persisted data for this partition. - async fn update_persisted_sequence_number( - &mut self, - partition_id: PartitionId, - sequence_number: SequenceNumber, - ) -> Result<()>; - /// Return the N most recently created partitions. async fn most_recent_n(&mut self, n: usize) -> Result>; - /// Return the N most recently created partitions for the specified shards. - async fn most_recent_n_in_shards( - &mut self, - n: usize, - shards: &[ShardId], - ) -> Result>; - /// Select partition for cold/warm/hot compaction /// These are partitions with files created recently (aka created after the specified time_in_the_past) /// These files include all levels of compaction files @@ -593,16 +566,6 @@ pub trait ParquetFileRepo: Send + Sync { /// Flag all parquet files for deletion that are older than their namespace's retention period. async fn flag_for_delete_by_retention(&mut self) -> Result>; - /// Get all parquet files for a shard with a max_sequence_number greater than the - /// one passed in. The ingester will use this on startup to see which files were persisted - /// that are greater than its min_unpersisted_number so that it can discard any data in - /// these partitions on replay. - async fn list_by_shard_greater_than( - &mut self, - shard_id: ShardId, - sequence_number: SequenceNumber, - ) -> Result>; - /// List all parquet files within a given namespace that are NOT marked as /// [`to_delete`](ParquetFile::to_delete). async fn list_by_namespace_not_to_delete( @@ -618,10 +581,6 @@ pub trait ParquetFileRepo: Send + Sync { /// This is for debug purpose async fn list_by_table(&mut self, table_id: TableId) -> Result>; - /// Delete all parquet files that were marked to be deleted earlier than the specified time. - /// Returns the deleted records. - async fn delete_old(&mut self, older_than: Timestamp) -> Result>; - /// Delete parquet files that were marked to be deleted earlier than the specified time. /// /// Returns the deleted IDs only. @@ -630,53 +589,6 @@ pub trait ParquetFileRepo: Send + Sync { /// MAY call this method again if the result was NOT empty. async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result>; - /// List parquet files for a given shard with compaction level 0 and other criteria that - /// define a file as a candidate for compaction - async fn level_0(&mut self, shard_id: ShardId) -> Result>; - - /// List parquet files for a given table partition, in a given time range, with compaction - /// level 1, and other criteria that define a file as a candidate for compaction with a level 0 - /// file - async fn level_1( - &mut self, - table_partition: TablePartition, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result>; - - // Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518 - /// List the most recent highest throughput partition for a given shard, if specified - async fn recent_highest_throughput_partitions( - &mut self, - shard_id: Option, - time_at_num_minutes_ago: Timestamp, - min_num_files: usize, - num_partitions: usize, - ) -> Result>; - - // Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518 - /// List the partitions for a given shard that have at least the given count of L1 files no - /// bigger than the provided size threshold. Limits the number of partitions returned to - /// num_partitions, for performance. - async fn partitions_with_small_l1_file_count( - &mut self, - shard_id: Option, - small_size_threshold_bytes: i64, - min_small_file_count: usize, - num_partitions: usize, - ) -> Result>; - - // Remove this function: https://github.com/influxdata/influxdb_iox/issues/6518 - /// List partitions with the most level 0 + level 1 files created earlier than - /// `older_than_num_hours` hours ago for a given shard (if specified). In other words, "cold" - /// partitions that need compaction. - async fn most_cold_files_partitions( - &mut self, - shard_id: Option, - time_in_the_past: Timestamp, - num_partitions: usize, - ) -> Result>; - /// List parquet files for a given partition that are NOT marked as /// [`to_delete`](ParquetFile::to_delete). async fn list_by_partition_not_to_delete( @@ -699,28 +611,6 @@ pub trait ParquetFileRepo: Send + Sync { /// Return count async fn count(&mut self) -> Result; - /// Return count of level-0 files of given tableId and shardId that - /// overlap with the given min_time and max_time and have sequence number - /// smaller the given one - async fn count_by_overlaps_with_level_0( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - sequence_number: SequenceNumber, - ) -> Result; - - /// Return count of level-1 files of given tableId and shardId that - /// overlap with the given min_time and max_time - async fn count_by_overlaps_with_level_1( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result; - /// Return the parquet file with the given object store id async fn get_by_object_store_id( &mut self, @@ -965,16 +855,11 @@ pub(crate) mod test_helpers { test_setup(clean_state().await).await; test_namespace_soft_deletion(clean_state().await).await; test_partitions_with_recent_created_files(clean_state().await).await; - test_most_cold_files_partitions(clean_state().await).await; test_query_pool(clean_state().await).await; test_column(clean_state().await).await; test_partition(clean_state().await).await; test_parquet_file(clean_state().await).await; test_parquet_file_delete_broken(clean_state().await).await; - test_parquet_file_compaction_level_0(clean_state().await).await; - test_parquet_file_compaction_level_1(clean_state().await).await; - test_recent_highest_throughput_partitions(clean_state().await).await; - test_partitions_with_small_l1_file_count(clean_state().await).await; test_update_to_compaction_level_1(clean_state().await).await; test_list_by_partiton_not_to_delete(clean_state().await).await; test_txn_isolation(clean_state().await).await; @@ -1609,25 +1494,6 @@ pub(crate) mod test_helpers { .create_or_get("b", table2.id, ColumnType::Tag) .await .unwrap(); - // Listing count of column types - let mut col_count = repos - .columns() - .list_type_count_by_table_id(table2.id) - .await - .unwrap(); - let mut expect = vec![ - ColumnTypeCount { - col_type: ColumnType::Tag, - count: 1, - }, - ColumnTypeCount { - col_type: ColumnType::U64, - count: 1, - }, - ]; - expect.sort_by_key(|c| c.col_type); - col_count.sort_by_key(|c| c.col_type); - assert_eq!(expect, col_count); // Listing columns should return all columns in the catalog let list = repos.columns().list().await.unwrap(); @@ -1833,40 +1699,6 @@ pub(crate) mod test_helpers { assert_eq!(created.keys().copied().collect::>(), listed); - // test list_by_namespace - let namespace2 = repos - .namespaces() - .create("namespace_partition_test2", None, topic.id, pool.id) - .await - .unwrap(); - let table2 = repos - .tables() - .create_or_get("test_table2", namespace2.id) - .await - .unwrap(); - repos - .partitions() - .create_or_get("some_key".into(), shard.id, table2.id) - .await - .expect("failed to create partition"); - let listed = repos - .partitions() - .list_by_namespace(namespace.id) - .await - .expect("failed to list partitions") - .into_iter() - .map(|v| (v.id, v)) - .collect::>(); - let expected: BTreeMap<_, _> = created - .iter() - .map(|(k, v)| (*k, v.clone())) - .chain(std::iter::once(( - other_partition.id, - other_partition.clone(), - ))) - .collect(); - assert_eq!(expected, listed); - // sort_key should be empty on creation assert!(other_partition.sort_key.is_empty()); @@ -2053,45 +1885,19 @@ pub(crate) mod test_helpers { "Expected no skipped compactions, got: {skipped_compactions:?}" ); - // Test setting and reading the per-partition persistence numbers - let partition = repos - .partitions() - .get_by_id(other_partition.id) - .await - .unwrap() - .unwrap(); - assert_eq!(partition.persisted_sequence_number, None); - // Set - repos - .partitions() - .update_persisted_sequence_number(other_partition.id, SequenceNumber::new(42)) - .await - .unwrap(); - // Read - let partition = repos - .partitions() - .get_by_id(other_partition.id) - .await - .unwrap() - .unwrap(); - assert_eq!( - partition.persisted_sequence_number, - Some(SequenceNumber::new(42)) - ); - let recent = repos .partitions() .most_recent_n(10) .await .expect("should list most recent"); - assert_eq!(recent.len(), 4); + assert_eq!(recent.len(), 3); let recent = repos .partitions() - .most_recent_n(4) + .most_recent_n(3) .await .expect("should list most recent"); - assert_eq!(recent.len(), 4); + assert_eq!(recent.len(), 3); let recent = repos .partitions() @@ -2100,32 +1906,6 @@ pub(crate) mod test_helpers { .expect("should list most recent"); assert_eq!(recent.len(), 2); - let recent = repos - .partitions() - .most_recent_n_in_shards(10, &[shard.id, other_shard.id]) - .await - .expect("should list most recent"); - assert_eq!(recent.len(), 4); - - let recent = repos - .partitions() - .most_recent_n_in_shards(10, &[shard.id]) - .await - .expect("should list most recent"); - assert_eq!(recent.len(), 3); - - let recent2 = repos - .partitions() - .most_recent_n_in_shards(10, &[shard.id, ShardId::new(42)]) - .await - .expect("should list most recent"); - assert_eq!(recent, recent2); - - repos - .namespaces() - .soft_delete("namespace_partition_test2") - .await - .expect("delete namespace should succeed"); repos .namespaces() .soft_delete("namespace_partition_test") @@ -2224,26 +2004,17 @@ pub(crate) mod test_helpers { assert!(repos.parquet_files().exist(exist_id).await.unwrap()); assert!(!repos.parquet_files().exist(non_exist_id).await.unwrap()); - let files = repos - .parquet_files() - .list_by_shard_greater_than(shard.id, SequenceNumber::new(1)) - .await - .unwrap(); - assert_eq!(vec![parquet_file.clone(), other_file.clone()], files); - let files = repos - .parquet_files() - .list_by_shard_greater_than(shard.id, SequenceNumber::new(150)) - .await - .unwrap(); - assert_eq!(vec![other_file.clone()], files); - // verify that to_delete is initially set to null and the file does not get deleted assert!(parquet_file.to_delete.is_none()); let older_than = Timestamp::new( (catalog.time_provider().now() + Duration::from_secs(100)).timestamp_nanos(), ); - let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap(); - assert_matches!(deleted_files.as_slice(), []); + let deleted = repos + .parquet_files() + .delete_old_ids_only(older_than) + .await + .unwrap(); + assert!(deleted.is_empty()); assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap()); // test list_by_table that includes soft-deleted file @@ -2261,13 +2032,6 @@ pub(crate) mod test_helpers { .flag_for_delete(parquet_file.id) .await .unwrap(); - let files = repos - .parquet_files() - .list_by_shard_greater_than(shard.id, SequenceNumber::new(1)) - .await - .unwrap(); - let marked_deleted = files.first().unwrap(); - assert!(marked_deleted.to_delete.is_some()); // test list_by_table that includes soft-deleted file // at this time the file is soft-deleted and will be included in the returned list @@ -2277,17 +2041,19 @@ pub(crate) mod test_helpers { .await .unwrap(); assert_eq!(files.len(), 1); + let marked_deleted = files.first().unwrap(); + assert!(marked_deleted.to_delete.is_some()); // File is not deleted if it was marked to be deleted after the specified time let before_deleted = Timestamp::new( (catalog.time_provider().now() - Duration::from_secs(100)).timestamp_nanos(), ); - let deleted_files = repos + let deleted = repos .parquet_files() - .delete_old(before_deleted) + .delete_old_ids_only(before_deleted) .await .unwrap(); - assert!(deleted_files.is_empty()); + assert!(deleted.is_empty()); assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap()); // test list_by_table that includes soft-deleted file @@ -2301,9 +2067,13 @@ pub(crate) mod test_helpers { assert_eq!(files.len(), 1); // File is deleted if it was marked to be deleted before the specified time - let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap(); - assert_eq!(deleted_files.len(), 1); - assert_eq!(marked_deleted, &deleted_files[0]); + let deleted = repos + .parquet_files() + .delete_old_ids_only(older_than) + .await + .unwrap(); + assert_eq!(deleted.len(), 1); + assert_eq!(marked_deleted.id, deleted[0]); assert!(!repos.parquet_files().exist(parquet_file.id).await.unwrap()); // test list_by_table that includes soft-deleted file @@ -2430,176 +2200,6 @@ pub(crate) mod test_helpers { .unwrap(); assert!(files.is_empty()); - // test count_by_overlaps_with_level_0 - // not time overlap - let count = repos - .parquet_files() - .count_by_overlaps_with_level_0( - partition2.table_id, - shard.id, - Timestamp::new(11), - Timestamp::new(20), - SequenceNumber::new(20), - ) - .await - .unwrap(); - assert_eq!(count, 0); - // overlaps with f1 - let count = repos - .parquet_files() - .count_by_overlaps_with_level_0( - partition2.table_id, - shard.id, - Timestamp::new(1), - Timestamp::new(10), - SequenceNumber::new(20), - ) - .await - .unwrap(); - assert_eq!(count, 1); - // overlaps with f1 and f3 - // f2 is deleted and should not be counted - let count = repos - .parquet_files() - .count_by_overlaps_with_level_0( - partition2.table_id, - shard.id, - Timestamp::new(7), - Timestamp::new(55), - SequenceNumber::new(20), - ) - .await - .unwrap(); - assert_eq!(count, 2); - // overlaps with f1 and f3 but on different time range - let count = repos - .parquet_files() - .count_by_overlaps_with_level_0( - partition2.table_id, - shard.id, - Timestamp::new(1), - Timestamp::new(100), - SequenceNumber::new(20), - ) - .await - .unwrap(); - assert_eq!(count, 2); - // overlaps with f3 - let count = repos - .parquet_files() - .count_by_overlaps_with_level_0( - partition2.table_id, - shard.id, - Timestamp::new(15), - Timestamp::new(100), - SequenceNumber::new(20), - ) - .await - .unwrap(); - assert_eq!(count, 1); - // no overlaps due to smaller sequence number - let count = repos - .parquet_files() - .count_by_overlaps_with_level_0( - partition2.table_id, - shard.id, - Timestamp::new(15), - Timestamp::new(100), - SequenceNumber::new(2), - ) - .await - .unwrap(); - assert_eq!(count, 0); - - // test count_by_overlaps_with_level_1 - // - // no level-1 file -> nothing overlap - let count = repos - .parquet_files() - .count_by_overlaps_with_level_1( - partition2.table_id, - shard.id, - Timestamp::new(1), - Timestamp::new(200), - ) - .await - .unwrap(); - assert_eq!(count, 0); - - // Let upgrade all files (only f1 and f3 are not deleted) to level 1 - repos - .parquet_files() - .update_compaction_level(&[f1.id], CompactionLevel::FileNonOverlapped) - .await - .unwrap(); - repos - .parquet_files() - .update_compaction_level(&[f3.id], CompactionLevel::FileNonOverlapped) - .await - .unwrap(); - // - // not overlap with any - let count = repos - .parquet_files() - .count_by_overlaps_with_level_1( - partition2.table_id, - shard.id, - Timestamp::new(11), - Timestamp::new(20), - ) - .await - .unwrap(); - assert_eq!(count, 0); - // overlaps with f1 - let count = repos - .parquet_files() - .count_by_overlaps_with_level_1( - partition2.table_id, - shard.id, - Timestamp::new(1), - Timestamp::new(10), - ) - .await - .unwrap(); - assert_eq!(count, 1); - // overlaps with f1 and f3 - // f2 is deleted and should not be counted - let count = repos - .parquet_files() - .count_by_overlaps_with_level_1( - partition2.table_id, - shard.id, - Timestamp::new(7), - Timestamp::new(55), - ) - .await - .unwrap(); - assert_eq!(count, 2); - // overlaps with f1 and f3 but on different time range - let count = repos - .parquet_files() - .count_by_overlaps_with_level_1( - partition2.table_id, - shard.id, - Timestamp::new(1), - Timestamp::new(100), - ) - .await - .unwrap(); - assert_eq!(count, 2); - // overlaps with f3 - let count = repos - .parquet_files() - .count_by_overlaps_with_level_1( - partition2.table_id, - shard.id, - Timestamp::new(15), - Timestamp::new(100), - ) - .await - .unwrap(); - assert_eq!(count, 1); - // test delete_old_ids_only let older_than = Timestamp::new( (catalog.time_provider().now() + Duration::from_secs(100)).timestamp_nanos(), @@ -2790,1009 +2390,6 @@ pub(crate) mod test_helpers { assert_eq!(ids, vec![parquet_file_2.id]); } - async fn test_parquet_file_compaction_level_0(catalog: Arc) { - let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); - let namespace = repos - .namespaces() - .create( - "namespace_parquet_file_compaction_level_0_test", - None, - topic.id, - pool.id, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let shard = repos - .shards() - .create_or_get(&topic, ShardIndex::new(100)) - .await - .unwrap(); - let other_shard = repos - .shards() - .create_or_get(&topic, ShardIndex::new(101)) - .await - .unwrap(); - - let partition = repos - .partitions() - .create_or_get("one".into(), shard.id, table.id) - .await - .unwrap(); - - let min_time = Timestamp::new(1); - let max_time = Timestamp::new(10); - - let parquet_file_params = ParquetFileParams { - shard_id: shard.id, - namespace_id: namespace.id, - table_id: partition.table_id, - partition_id: partition.id, - object_store_id: Uuid::new_v4(), - max_sequence_number: SequenceNumber::new(140), - min_time, - max_time, - file_size_bytes: 1337, - row_count: 0, - compaction_level: CompactionLevel::Initial, - created_at: Timestamp::new(1), - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - max_l0_created_at: Timestamp::new(1), - }; - - let parquet_file = repos - .parquet_files() - .create(parquet_file_params.clone()) - .await - .unwrap(); - - // Create a compaction level 0 file for some other shard - let other_shard_params = ParquetFileParams { - shard_id: other_shard.id, - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - - let _other_shard_file = repos - .parquet_files() - .create(other_shard_params) - .await - .unwrap(); - - // Create a compaction level 0 file marked to delete - let to_delete_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - let to_delete_file = repos - .parquet_files() - .create(to_delete_params) - .await - .unwrap(); - repos - .parquet_files() - .flag_for_delete(to_delete_file.id) - .await - .unwrap(); - - // Create a compaction level 1 file - let level_1_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - let level_1_file = repos.parquet_files().create(level_1_params).await.unwrap(); - repos - .parquet_files() - .update_compaction_level(&[level_1_file.id], CompactionLevel::FileNonOverlapped) - .await - .unwrap(); - - // Level 0 parquet files for a shard should contain only those that match the right - // criteria - let level_0 = repos.parquet_files().level_0(shard.id).await.unwrap(); - let mut level_0_ids: Vec<_> = level_0.iter().map(|pf| pf.id).collect(); - level_0_ids.sort(); - let expected = vec![parquet_file]; - let mut expected_ids: Vec<_> = expected.iter().map(|pf| pf.id).collect(); - expected_ids.sort(); - - assert_eq!( - level_0_ids, expected_ids, - "\nlevel 0: {level_0:#?}\nexpected: {expected:#?}", - ); - - // drop the namespace to avoid the created data in this tests from affecting other tests - repos - .namespaces() - .soft_delete("namespace_parquet_file_compaction_level_0_test") - .await - .expect("delete namespace should succeed"); - } - - async fn test_parquet_file_compaction_level_1(catalog: Arc) { - let mut repos = catalog.repositories().await; - let topic = repos.topics().create_or_get("foo").await.unwrap(); - let pool = repos.query_pools().create_or_get("foo").await.unwrap(); - let namespace = repos - .namespaces() - .create( - "namespace_parquet_file_compaction_level_1_test", - None, - topic.id, - pool.id, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let other_table = repos - .tables() - .create_or_get("test_table2", namespace.id) - .await - .unwrap(); - let shard = repos - .shards() - .create_or_get(&topic, ShardIndex::new(100)) - .await - .unwrap(); - let other_shard = repos - .shards() - .create_or_get(&topic, ShardIndex::new(101)) - .await - .unwrap(); - let partition = repos - .partitions() - .create_or_get( - "test_parquet_file_compaction_level_1_one".into(), - shard.id, - table.id, - ) - .await - .unwrap(); - let other_partition = repos - .partitions() - .create_or_get( - "test_parquet_file_compaction_level_1_two".into(), - shard.id, - table.id, - ) - .await - .unwrap(); - - // Set up the window of times we're interested in level 1 files for - let query_min_time = Timestamp::new(5); - let query_max_time = Timestamp::new(10); - - // Create a file with times entirely within the window - let parquet_file_params = ParquetFileParams { - shard_id: shard.id, - namespace_id: namespace.id, - table_id: partition.table_id, - partition_id: partition.id, - object_store_id: Uuid::new_v4(), - max_sequence_number: SequenceNumber::new(140), - min_time: query_min_time + 1, - max_time: query_max_time - 1, - file_size_bytes: 1337, - row_count: 0, - compaction_level: CompactionLevel::Initial, - created_at: Timestamp::new(1), - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - max_l0_created_at: Timestamp::new(1), - }; - let parquet_file = repos - .parquet_files() - .create(parquet_file_params.clone()) - .await - .unwrap(); - - // Create a file that will remain as level 0 - let level_0_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - let _level_0_file = repos.parquet_files().create(level_0_params).await.unwrap(); - - // Create a file completely before the window - let too_early_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - min_time: query_min_time - 2, - max_time: query_min_time - 1, - ..parquet_file_params.clone() - }; - let too_early_file = repos - .parquet_files() - .create(too_early_params) - .await - .unwrap(); - - // Create a file overlapping the window on the lower end - let overlap_lower_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - min_time: query_min_time - 1, - max_time: query_min_time + 1, - ..parquet_file_params.clone() - }; - let overlap_lower_file = repos - .parquet_files() - .create(overlap_lower_params) - .await - .unwrap(); - - // Create a file overlapping the window on the upper end - let overlap_upper_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - min_time: query_max_time - 1, - max_time: query_max_time + 1, - ..parquet_file_params.clone() - }; - let overlap_upper_file = repos - .parquet_files() - .create(overlap_upper_params) - .await - .unwrap(); - - // Create a file completely after the window - let too_late_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - min_time: query_max_time + 1, - max_time: query_max_time + 2, - ..parquet_file_params.clone() - }; - let too_late_file = repos.parquet_files().create(too_late_params).await.unwrap(); - - // Create a file for some other shard - let other_shard_params = ParquetFileParams { - shard_id: other_shard.id, - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - let other_shard_file = repos - .parquet_files() - .create(other_shard_params) - .await - .unwrap(); - - // Create a file for the same shard but a different table - let other_table_params = ParquetFileParams { - table_id: other_table.id, - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - let other_table_file = repos - .parquet_files() - .create(other_table_params) - .await - .unwrap(); - - // Create a file for the same shard and table but a different partition - let other_partition_params = ParquetFileParams { - partition_id: other_partition.id, - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - let other_partition_file = repos - .parquet_files() - .create(other_partition_params) - .await - .unwrap(); - - // Create a file marked to be deleted - let to_delete_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - let to_delete_file = repos - .parquet_files() - .create(to_delete_params) - .await - .unwrap(); - repos - .parquet_files() - .flag_for_delete(to_delete_file.id) - .await - .unwrap(); - - // Make all but _level_0_file compaction level 1 - repos - .parquet_files() - .update_compaction_level( - &[ - parquet_file.id, - too_early_file.id, - too_late_file.id, - overlap_lower_file.id, - overlap_upper_file.id, - other_shard_file.id, - other_table_file.id, - other_partition_file.id, - to_delete_file.id, - ], - CompactionLevel::FileNonOverlapped, - ) - .await - .unwrap(); - - // Level 1 parquet files for a shard should contain only those that match the right - // criteria - let table_partition = TablePartition::new(shard.id, table.id, partition.id); - let level_1 = repos - .parquet_files() - .level_1(table_partition, query_min_time, query_max_time) - .await - .unwrap(); - let mut level_1_ids: Vec<_> = level_1.iter().map(|pf| pf.id).collect(); - level_1_ids.sort(); - let expected = vec![parquet_file, overlap_lower_file, overlap_upper_file]; - let mut expected_ids: Vec<_> = expected.iter().map(|pf| pf.id).collect(); - expected_ids.sort(); - - assert_eq!( - level_1_ids, expected_ids, - "\nlevel 1: {level_1:#?}\nexpected: {expected:#?}", - ); - - // drop the namespace to avoid the created data in this tests from affecting other tests - repos - .namespaces() - .soft_delete("namespace_parquet_file_compaction_level_1_test") - .await - .expect("delete namespace should succeed"); - } - - async fn test_most_cold_files_partitions(catalog: Arc) { - let mut repos = catalog.start_transaction().await.unwrap(); - let topic = repos.topics().create_or_get("most_cold").await.unwrap(); - let pool = repos - .query_pools() - .create_or_get("most_cold") - .await - .unwrap(); - let namespace = repos - .namespaces() - .create( - "test_most_level_0_files_partitions", - None, - topic.id, - pool.id, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let shard = repos - .shards() - .create_or_get(&topic, ShardIndex::new(88)) - .await - .unwrap(); - - let time_now = Timestamp::from(catalog.time_provider().now()); - let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5)); - let time_8_hours_ago = Timestamp::from(catalog.time_provider().hours_ago(8)); - let time_38_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(38)); - - let num_partitions = 2; - - // Db has no partition - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - // Across all shards - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - - // The DB has 1 partition, partition_1, but it does not have any files - let partition_1 = repos - .partitions() - .create_or_get("one".into(), shard.id, table.id) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - // Across all shards - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - - // The partition_1 has one deleted file - let parquet_file_params = ParquetFileParams { - shard_id: shard.id, - namespace_id: namespace.id, - table_id: partition_1.table_id, - partition_id: partition_1.id, - object_store_id: Uuid::new_v4(), - max_sequence_number: SequenceNumber::new(140), - min_time: Timestamp::new(1), - max_time: Timestamp::new(10), - file_size_bytes: 1337, - row_count: 0, - compaction_level: CompactionLevel::Initial, - created_at: time_38_hour_ago, - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - max_l0_created_at: time_now, - }; - let delete_l0_file = repos - .parquet_files() - .create(parquet_file_params.clone()) - .await - .unwrap(); - repos - .parquet_files() - .flag_for_delete(delete_l0_file.id) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - // Across all shards - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - - // A hot_partition with one cold file and one hot file - let hot_partition = repos - .partitions() - .create_or_get("hot".into(), shard.id, table.id) - .await - .unwrap(); - let cold_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: hot_partition.id, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(cold_file_params) - .await - .unwrap(); - let hot_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: hot_partition.id, - created_at: time_five_hour_ago, - ..parquet_file_params.clone() - }; - repos.parquet_files().create(hot_file_params).await.unwrap(); - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - // Across all shards - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - - // An already_compacted_partition that has only one non-deleted level 2 file, should never - // be returned - let already_compacted_partition = repos - .partitions() - .create_or_get("already_compacted".into(), shard.id, table.id) - .await - .unwrap(); - let l2_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: already_compacted_partition.id, - compaction_level: CompactionLevel::Final, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l2_file_params.clone()) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - // Across all shards - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert!( - partitions.is_empty(), - "Expected no partitions, instead got {partitions:#?}", - ); - - // The partition_1 has one non-deleted level 0 file created 38 hours ago - let l0_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l0_file_params.clone()) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 1); - // Across all shards - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 1); - - // Partition_2 has 2 non-deleted L0 file created 38 hours ago - let partition_2 = repos - .partitions() - .create_or_get( - "test_most_cold_files_partitions_two".into(), - shard.id, - table.id, - ) - .await - .unwrap(); - let another_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_2.id, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(another_file_params.clone()) - .await - .unwrap(); - let another_file_params_for_second_l0 = ParquetFileParams { - object_store_id: Uuid::new_v4(), - ..another_file_params.clone() - }; - repos - .parquet_files() - .create(another_file_params_for_second_l0.clone()) - .await - .unwrap(); - // Must return 2 partitions - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // They must be in order partition_2 (more files), partition - assert_eq!(partitions[0].partition_id, partition_2.id); // 2 files - assert_eq!(partitions[1].partition_id, partition_1.id); // 1 file - // Across all shards - // Must return 2 partitions - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // They must be in order partition_2 (more files), partition - assert_eq!(partitions[0].partition_id, partition_2.id); // 2 files - assert_eq!(partitions[1].partition_id, partition_1.id); // 1 file - - // Make partition_3 that has one level-1 file, no level-0 - // The DB now has 3 cold partitions, two with non-deleted L0 files and one with only - // non-deleted L1 - let partition_3 = repos - .partitions() - .create_or_get( - "test_most_cold_files_partitions_three".into(), - shard.id, - table.id, - ) - .await - .unwrap(); - // recent L1 but since no L0, this partition is still cold - let file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_3.id, - compaction_level: CompactionLevel::FileNonOverlapped, - created_at: time_five_hour_ago, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(file_params.clone()) - .await - .unwrap(); - - // Still return 2 partitions because the limit num_partitions is 2 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // and the first one should still be the one, partition_2, with the most files - assert_eq!(partitions[0].partition_id, partition_2.id); - // return 3 partitions becasue the limit num_partitions is now 5 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, 5) - .await - .unwrap(); - assert_eq!(partitions.len(), 3); - // and the first one should still be the one with the most files - assert_eq!(partitions[0].partition_id, partition_2.id); - // Across all shards - // Still return 2 partitions because the limit num_partitions is 2 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // and the first one should still be the one, partition_2, with the most files - assert_eq!(partitions[0].partition_id, partition_2.id); - // return 3 partitions becasue the limit num_partitions is now 5 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, 5) - .await - .unwrap(); - assert_eq!(partitions.len(), 3); - // and the first one should still be the one with the most files - assert_eq!(partitions[0].partition_id, partition_2.id); - - // The compactor skipped compacting partition_2 - repos - .partitions() - .record_skipped_compaction( - partition_2.id, - "Not feeling up to it today", - 1, - 2, - 4, - 10, - 20, - ) - .await - .unwrap(); - - // partition_2 should no longer be selected for compaction - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - assert!( - partitions.iter().all(|p| p.partition_id != partition_2.id), - "Expected partitions not to include {}: {partitions:?}", - partition_2.id - ); - // Across all shards - // partition_2 should no longer be selected for compaction - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - assert!( - partitions.iter().all(|p| p.partition_id != partition_2.id), - "Expected partitions not to include {}: {partitions:?}", - partition_2.id - ); - - // Add another L1 files into partition_3 to make it have 2 L1 files for easier to check the - // output - // A non-recent L1 - let file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_3.id, - compaction_level: CompactionLevel::FileNonOverlapped, - created_at: time_38_hour_ago, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(file_params.clone()) - .await - .unwrap(); - - // Create partition_4 with 3 non-deleted L1 files - // The DB now has 4 cold partitions but partition_2 should still be skipped - let partition_4 = repos - .partitions() - .create_or_get( - "test_most_cold_files_partitions_four".into(), - shard.id, - table.id, - ) - .await - .unwrap(); - for _ in 0..3 { - let file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_4.id, - compaction_level: CompactionLevel::FileNonOverlapped, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(file_params.clone()) - .await - .unwrap(); - } - // Still return 2 partitions with the limit num_partitions=2 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // the first one should now be the one with the most files: 3 L1s - assert_eq!(partitions[0].partition_id, partition_4.id); - // second one should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[1].partition_id, partition_3.id); - // Across all shards - // Still return 2 partitions with the limit num_partitions=2 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // the first one should now be the one with the most files: 3 L1s - assert_eq!(partitions[0].partition_id, partition_4.id); - // second one should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[1].partition_id, partition_3.id); - - // Return 3 partitions with the limit num_partitions=4 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, 4) - .await - .unwrap(); - assert_eq!(partitions.len(), 3); - // the first one should now be the one with the most files: 3 L1s - assert_eq!(partitions[0].partition_id, partition_4.id); - // second one should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[1].partition_id, partition_3.id); - // third one should be partition_1 witth 1 file: 1 L0 - assert_eq!(partitions[2].partition_id, partition_1.id); - // Across all shards - // Return 3 partitions with the limit num_partitions=4 - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, 4) - .await - .unwrap(); - assert_eq!(partitions.len(), 3); - // the first one should now be the one with the most files: 3 L1s - assert_eq!(partitions[0].partition_id, partition_4.id); - // second one should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[1].partition_id, partition_3.id); - // third one should be partition_1 witth 1 file: 1 L0 - assert_eq!(partitions[2].partition_id, partition_1.id); - - // Partition_5 with a non-deleted L1 and a deleted L0 created recently - // The DB now still has 4 cold partitions but partition_2 should still be skipped - // partition_5 is hot because it has a recent L0 even though it is deleted - let partition_5 = repos - .partitions() - .create_or_get("five".into(), shard.id, table.id) - .await - .unwrap(); - // L1 created recently - let file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_5.id, - compaction_level: CompactionLevel::FileNonOverlapped, - created_at: time_five_hour_ago, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(file_params.clone()) - .await - .unwrap(); - // L0 created recently but deleted - let file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_5.id, - compaction_level: CompactionLevel::Initial, - created_at: time_five_hour_ago, - ..parquet_file_params.clone() - }; - let delete_l0_file = repos - .parquet_files() - .create(file_params.clone()) - .await - .unwrap(); - repos - .parquet_files() - .flag_for_delete(delete_l0_file.id) - .await - .unwrap(); - - // Return 3 cold partitions, partition_1, partition_3, partition_4 becasue num_partitions=5 - // still skip partition_2 and partition_5 is considered hot becasue it has a (deleted) L0 - // created recently - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, 5) - .await - .unwrap(); - assert_eq!(partitions.len(), 3); - // the first one should now be the one with the most files 3 L1s - assert_eq!(partitions[0].partition_id, partition_4.id); - // second one should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[1].partition_id, partition_3.id); - // third one should be partition_1 witth 1 file: 1 L0 - assert_eq!(partitions[2].partition_id, partition_1.id); - // Across all shards - // Return 3 cold partitions, partition_1, partition_3, partition_4 becasue num_partitions=5 - // still skip partition_2 and partition_5 is considered hot becasue it has a (deleted) L0 - // created recently - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, 5) - .await - .unwrap(); - assert_eq!(partitions.len(), 3); - // the first one should now be the one with the most files 3 L1s - assert_eq!(partitions[0].partition_id, partition_4.id); - // second one should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[1].partition_id, partition_3.id); - // third one should be partition_1 witth 1 file: 1 L0 - assert_eq!(partitions[2].partition_id, partition_1.id); - - // Create partition_6 with 4 L1s and one deleted but non-recent L0 - // The DB now has 5 cold partitions but partition_2 should still be skipped - let partition_6 = repos - .partitions() - .create_or_get("six".into(), shard.id, table.id) - .await - .unwrap(); - for _ in 0..4 { - // L1 created recently - let file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_6.id, - compaction_level: CompactionLevel::FileNonOverlapped, - created_at: time_five_hour_ago, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(file_params.clone()) - .await - .unwrap(); - } - // old and deleted L0 - let file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: partition_6.id, - compaction_level: CompactionLevel::Initial, - created_at: time_38_hour_ago, - ..parquet_file_params.clone() - }; - let delete_l0_file = repos - .parquet_files() - .create(file_params.clone()) - .await - .unwrap(); - repos - .parquet_files() - .flag_for_delete(delete_l0_file.id) - .await - .unwrap(); - - // Return 4 cold partitions, partition_1, partition_3, partition_4, partition_6 because - // num_partitions=5 - // still skip partition_2 and partition_5 is considered hot because it has a (deleted) L0 - // created recently - let partitions = repos - .parquet_files() - .most_cold_files_partitions(Some(shard.id), time_8_hours_ago, 5) - .await - .unwrap(); - assert_eq!(partitions.len(), 4); - // the first one should now be the one with the most files: 4 L1s - assert_eq!(partitions[0].partition_id, partition_6.id); - // then should be the one with the most files: 3 L1s - assert_eq!(partitions[1].partition_id, partition_4.id); - // then should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[2].partition_id, partition_3.id); - // then should be partition_1 witth 1 file: 1 L0 - assert_eq!(partitions[3].partition_id, partition_1.id); - // Across all shards - // Return 4 cold partitions, partition_1, partition_3, partition_4, partition_6 because - // num_partitions=5 - // still skip partition_2 and partition_5 is considered hot because it has a (deleted) L0 - // created recently - let partitions = repos - .parquet_files() - .most_cold_files_partitions(None, time_8_hours_ago, 5) - .await - .unwrap(); - assert_eq!(partitions.len(), 4); - // the first one should now be the one with the most files: 4 L1s - assert_eq!(partitions[0].partition_id, partition_6.id); - // then should be the one with the most files: 3 L1s - assert_eq!(partitions[1].partition_id, partition_4.id); - // then should be partition_3 with 2 files: 2 L1s - assert_eq!(partitions[2].partition_id, partition_3.id); - // then should be partition_1 witth 1 file: 1 L0 - assert_eq!(partitions[3].partition_id, partition_1.id); - - // drop the namespace to avoid the created data in this tests from affecting other tests - repos - .namespaces() - .soft_delete("test_most_level_0_files_partitions") - .await - .expect("delete namespace should succeed"); - - repos.abort().await.unwrap(); - } - async fn test_partitions_with_recent_created_files(catalog: Arc) { let max_num_partition = 100; let mut repos = catalog.repositories().await; @@ -4297,682 +2894,6 @@ pub(crate) mod test_helpers { .expect("delete namespace should succeed"); } - async fn test_recent_highest_throughput_partitions(catalog: Arc) { - let mut repos = catalog.repositories().await; - let topic = repos - .topics() - .create_or_get("highest_throughput") - .await - .unwrap(); - let pool = repos - .query_pools() - .create_or_get("highest_throughput") - .await - .unwrap(); - let namespace = repos - .namespaces() - .create( - "test_recent_highest_throughput_partitions", - None, - topic.id, - pool.id, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let shard = repos - .shards() - .create_or_get(&topic, ShardIndex::new(100)) - .await - .unwrap(); - - // params for the tests - let num_minutes = 4 * 60; - let min_num_files = 2; - let num_partitions = 2; - - let time_at_num_minutes_ago = - Timestamp::from(catalog.time_provider().minutes_ago(num_minutes)); - - // Case 1 - // Db has no partition - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - None, - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - - // Case 2 - // The DB has 1 partition but it does not have any file - let partition = repos - .partitions() - .create_or_get("one".into(), shard.id, table.id) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - None, - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - - // Time for testing - let time_now = Timestamp::from(catalog.time_provider().now()); - let time_one_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(1)); - let time_two_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(2)); - let time_three_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(3)); - let time_five_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(5)); - let time_ten_hour_ago = Timestamp::from(catalog.time_provider().hours_ago(10)); - - // Case 3 - // The partition has one deleted file - let parquet_file_params = ParquetFileParams { - shard_id: shard.id, - namespace_id: namespace.id, - table_id: partition.table_id, - partition_id: partition.id, - object_store_id: Uuid::new_v4(), - max_sequence_number: SequenceNumber::new(140), - min_time: Timestamp::new(1), - max_time: Timestamp::new(10), - file_size_bytes: 1337, - row_count: 0, - compaction_level: CompactionLevel::Initial, - created_at: time_now, - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - max_l0_created_at: time_now, - }; - let delete_l0_file = repos - .parquet_files() - .create(parquet_file_params.clone()) - .await - .unwrap(); - repos - .parquet_files() - .flag_for_delete(delete_l0_file.id) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - None, - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - - // Case 4 - // Partition has only 1 file created recently - let l0_one_hour_ago_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - created_at: time_one_hour_ago, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l0_one_hour_ago_file_params.clone()) - .await - .unwrap(); - // Case 4.1: min_num_files = 2 - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - // nothing return because the partition has only one recent L0 file which is smaller than - // min_num_files = 2 - assert!(partitions.is_empty()); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - None, - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - // nothing return because the partition has only one recent L0 file which is smaller than - // min_num_files = 2 - assert!(partitions.is_empty()); - - // Case 4.2: min_num_files = 1 - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - 1, - num_partitions, - ) - .await - .unwrap(); - // and have one partition - assert_eq!(partitions.len(), 1); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions(None, time_at_num_minutes_ago, 1, num_partitions) - .await - .unwrap(); - // and have one partition - assert_eq!(partitions.len(), 1); - - // Case 5 - // Let us create another partition with 2 L0 recent files - let another_partition = repos - .partitions() - .create_or_get( - "test_recent_highest_throughput_partitions_two".into(), - shard.id, - table.id, - ) - .await - .unwrap(); - let l0_2_hours_ago_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - created_at: time_two_hour_ago, - partition_id: another_partition.id, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l0_2_hours_ago_file_params) - .await - .unwrap(); - let l0_3_hours_ago_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - created_at: time_three_hour_ago, - partition_id: another_partition.id, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l0_3_hours_ago_file_params) - .await - .unwrap(); - - // Case 5.1: min_num_files = 2 - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert_eq!(partitions.len(), 1); - // must be the partition with 2 files - assert_eq!(partitions[0].partition_id, another_partition.id); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - None, - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - assert_eq!(partitions.len(), 1); - // must be the partition with 2 files - assert_eq!(partitions[0].partition_id, another_partition.id); - - // Case 5.2: min_num_files = 1 - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - 1, - num_partitions, - ) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // partition with 2 files must be first - assert_eq!(partitions[0].partition_id, another_partition.id); - assert_eq!(partitions[1].partition_id, partition.id); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions(None, time_at_num_minutes_ago, 1, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // partition with 2 files must be first - assert_eq!(partitions[0].partition_id, another_partition.id); - assert_eq!(partitions[1].partition_id, partition.id); - - // Case 6 - // Add 2 not-recent files to the first partition - let l0_5_hours_ago_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - created_at: time_five_hour_ago, - partition_id: partition.id, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l0_5_hours_ago_file_params) - .await - .unwrap(); - let l0_10_hours_ago_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - created_at: time_ten_hour_ago, - partition_id: partition.id, - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l0_10_hours_ago_file_params) - .await - .unwrap(); - - // Case 6.1: min_num_files = 2 - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - // result still 1 partition because the old files do not contribute to recent throughput - assert_eq!(partitions.len(), 1); - // must be the partition with 2 files - assert_eq!(partitions[0].partition_id, another_partition.id); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - None, - time_at_num_minutes_ago, - min_num_files, - num_partitions, - ) - .await - .unwrap(); - // result still 1 partition because the old files do not contribute to recent throughput - assert_eq!(partitions.len(), 1); - // must be the partition with 2 files - assert_eq!(partitions[0].partition_id, another_partition.id); - - // Case 6.2: min_num_files = 1 - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - 1, - num_partitions, - ) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // partition with 2 files must be first - assert_eq!(partitions[0].partition_id, another_partition.id); - assert_eq!(partitions[1].partition_id, partition.id); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions(None, time_at_num_minutes_ago, 1, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // partition with 2 files must be first - assert_eq!(partitions[0].partition_id, another_partition.id); - assert_eq!(partitions[1].partition_id, partition.id); - - // The compactor skipped compacting another_partition - repos - .partitions() - .record_skipped_compaction(another_partition.id, "Secret reasons", 1, 2, 4, 10, 20) - .await - .unwrap(); - - // another_partition should no longer be selected for compaction - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions( - Some(shard.id), - time_at_num_minutes_ago, - 1, - num_partitions, - ) - .await - .unwrap(); - assert_eq!(partitions.len(), 1); - assert_eq!(partitions[0].partition_id, partition.id); - // Across all shards - let partitions = repos - .parquet_files() - .recent_highest_throughput_partitions(None, time_at_num_minutes_ago, 1, num_partitions) - .await - .unwrap(); - assert_eq!(partitions.len(), 1); - assert_eq!(partitions[0].partition_id, partition.id); - - // Uncomment this out after https://github.com/influxdata/influxdb_iox/issues/6517 is fixed - // // remove namespace to avoid it from affecting later tests - // repos - // .namespaces() - // .delete("test_recent_highest_throughput_partitions") - // .await - // .expect("delete namespace should succeed"); - } - - async fn test_partitions_with_small_l1_file_count(catalog: Arc) { - let mut repos = catalog.repositories().await; - let topic = repos - .topics() - .create_or_get("small_l1_files") - .await - .unwrap(); - let pool = repos - .query_pools() - .create_or_get("small_l1_files") - .await - .unwrap(); - let namespace = repos - .namespaces() - .create( - "test_partitions_with_small_l1_file_count", - None, - topic.id, - pool.id, - ) - .await - .unwrap(); - let table = repos - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let shard = repos - .shards() - .create_or_get(&topic, ShardIndex::new(100)) - .await - .unwrap(); - - // params for the tests - let small_size_threshold_bytes = 2048; - let min_small_file_count = 2; - let num_partitions = 2; - - // Time for testing - let time_now = Timestamp::from(catalog.time_provider().now()); - - // Case 1 - // Db has no partition - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - min_small_file_count, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - - // Case 2 - // The DB has 1 partition but it does not have any file - let partition = repos - .partitions() - .create_or_get("one".into(), shard.id, table.id) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - min_small_file_count, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - - // Case 3 - // The partition has one deleted L1 file - let parquet_file_params = ParquetFileParams { - shard_id: shard.id, - namespace_id: namespace.id, - table_id: partition.table_id, - partition_id: partition.id, - object_store_id: Uuid::new_v4(), - max_sequence_number: SequenceNumber::new(140), - min_time: Timestamp::new(1), - max_time: Timestamp::new(10), - file_size_bytes: 1337, - row_count: 0, - compaction_level: CompactionLevel::FileNonOverlapped, - created_at: time_now, - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - max_l0_created_at: time_now, - }; - let delete_l1_file = repos - .parquet_files() - .create(parquet_file_params.clone()) - .await - .unwrap(); - repos - .parquet_files() - .flag_for_delete(delete_l1_file.id) - .await - .unwrap(); - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - min_small_file_count, - num_partitions, - ) - .await - .unwrap(); - assert!(partitions.is_empty()); - - // Case 4 - // Partition has only 1 file - let l1_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - ..parquet_file_params.clone() - }; - repos - .parquet_files() - .create(l1_file_params.clone()) - .await - .unwrap(); - // Case 4.1: min_num_files = 2 - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - min_small_file_count, - num_partitions, - ) - .await - .unwrap(); - // nothing return because the partition has only one L1 file which is smaller than - // min_small_file_count = 2 - assert!(partitions.is_empty()); - // Case 4.2: min_small_file_count = 1 - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - 1, - num_partitions, - ) - .await - .unwrap(); - // and have one partition - assert_eq!(partitions.len(), 1); - // Case 4.3: small_size_threshold_bytes = 500 - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - 500, // smaller than our file size of 1337 - 1, - num_partitions, - ) - .await - .unwrap(); - // nothing to return because there aren't any files small enough - assert!(partitions.is_empty()); - - // Case 5 - // Let us create another partition with 2 small L1 files - let another_partition = repos - .partitions() - .create_or_get("two".into(), shard.id, table.id) - .await - .unwrap(); - let l1_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: another_partition.id, - ..parquet_file_params.clone() - }; - repos.parquet_files().create(l1_file_params).await.unwrap(); - let l1_file_params = ParquetFileParams { - object_store_id: Uuid::new_v4(), - partition_id: another_partition.id, - ..parquet_file_params.clone() - }; - repos.parquet_files().create(l1_file_params).await.unwrap(); - // Case 5.1: min_num_files = 2 - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - min_small_file_count, - num_partitions, - ) - .await - .unwrap(); - // BUG: https://github.com/influxdata/influxdb_iox/issues/6517 - assert_eq!(partitions.len(), 1); - // must be the partition with 2 files - assert_eq!(partitions[0].partition_id, another_partition.id); - - // Case 5.2: min_num_files = 1 - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - 1, - num_partitions, - ) - .await - .unwrap(); - assert_eq!(partitions.len(), 2); - // partition with 2 files must be first - assert_eq!(partitions[0].partition_id, another_partition.id); - assert_eq!(partitions[1].partition_id, partition.id); - - // The compactor skipped compacting another_partition - repos - .partitions() - .record_skipped_compaction(another_partition.id, "Secret reasons", 1, 2, 4, 10, 20) - .await - .unwrap(); - - // another_partition should no longer be selected for compaction - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - Some(shard.id), - small_size_threshold_bytes, - 1, - num_partitions, - ) - .await - .unwrap(); - assert_eq!(partitions.len(), 1); - assert_eq!(partitions[0].partition_id, partition.id); - - // remove namespace to avoid it from affecting later tests - repos - .namespaces() - .soft_delete("test_partitions_with_small_l1_file_count") - .await - .expect("delete namespace should succeed"); - } - async fn test_list_by_partiton_not_to_delete(catalog: Arc) { let mut repos = catalog.repositories().await; let topic = repos.topics().create_or_get("foo").await.unwrap(); @@ -5174,18 +3095,6 @@ pub(crate) mod test_helpers { // Create a ParquetFileId that doesn't actually exist in the catalog let nonexistent_parquet_file_id = ParquetFileId::new(level_0_file.id.get() + 1); - // Level 0 parquet files should contain both existing files at this point - let expected = vec![parquet_file.clone(), level_0_file.clone()]; - let level_0 = repos.parquet_files().level_0(shard.id).await.unwrap(); - let mut level_0_ids: Vec<_> = level_0.iter().map(|pf| pf.id).collect(); - level_0_ids.sort(); - let mut expected_ids: Vec<_> = expected.iter().map(|pf| pf.id).collect(); - expected_ids.sort(); - assert_eq!( - level_0_ids, expected_ids, - "\nlevel 0: {level_0:#?}\nexpected: {expected:#?}", - ); - // Make parquet_file compaction level 1, attempt to mark the nonexistent file; operation // should succeed let updated = repos @@ -5198,35 +3107,6 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(updated, vec![parquet_file.id]); - // Level 0 parquet files should only contain level_0_file - let expected = vec![level_0_file]; - let level_0 = repos.parquet_files().level_0(shard.id).await.unwrap(); - let mut level_0_ids: Vec<_> = level_0.iter().map(|pf| pf.id).collect(); - level_0_ids.sort(); - let mut expected_ids: Vec<_> = expected.iter().map(|pf| pf.id).collect(); - expected_ids.sort(); - assert_eq!( - level_0_ids, expected_ids, - "\nlevel 0: {level_0:#?}\nexpected: {expected:#?}", - ); - - // Level 1 parquet files for a shard should only contain parquet_file - let expected = vec![parquet_file]; - let table_partition = TablePartition::new(shard.id, table.id, partition.id); - let level_1 = repos - .parquet_files() - .level_1(table_partition, query_min_time, query_max_time) - .await - .unwrap(); - let mut level_1_ids: Vec<_> = level_1.iter().map(|pf| pf.id).collect(); - level_1_ids.sort(); - let mut expected_ids: Vec<_> = expected.iter().map(|pf| pf.id).collect(); - expected_ids.sort(); - assert_eq!( - level_1_ids, expected_ids, - "\nlevel 1: {level_1:#?}\nexpected: {expected:#?}", - ); - // remove namespace to avoid it from affecting later tests repos .namespaces() diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index d8ce657321..768a20fe6c 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -12,10 +12,10 @@ use crate::{ }; use async_trait::async_trait; use data_types::{ - Column, ColumnId, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, - ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, - PartitionParam, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, - SkippedCompaction, Table, TableId, TablePartition, Timestamp, TopicId, TopicMetadata, + Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, + ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, + QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, + TableId, Timestamp, TopicId, TopicMetadata, }; use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::warn; @@ -657,37 +657,6 @@ impl ColumnRepo for MemTxn { let stage = self.stage(); Ok(stage.columns.clone()) } - - async fn list_type_count_by_table_id( - &mut self, - table_id: TableId, - ) -> Result> { - let stage = self.stage(); - - let columns = stage - .columns - .iter() - .filter(|c| c.table_id == table_id) - .map(|c| c.column_type) - .collect::>(); - - let mut cols = HashMap::new(); - for c in columns { - cols.entry(c) - .and_modify(|counter| *counter += 1) - .or_insert(1); - } - - let column_type_counts = cols - .iter() - .map(|c| ColumnTypeCount { - col_type: *c.0, - count: *c.1, - }) - .collect::>(); - - Ok(column_type_counts) - } } #[async_trait] @@ -811,23 +780,6 @@ impl PartitionRepo for MemTxn { .cloned()) } - async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result> { - let stage = self.stage(); - - let table_ids: HashSet<_> = stage - .tables - .iter() - .filter_map(|table| (table.namespace_id == namespace_id).then_some(table.id)) - .collect(); - let partitions: Vec<_> = stage - .partitions - .iter() - .filter(|p| table_ids.contains(&p.table_id)) - .cloned() - .collect(); - Ok(partitions) - } - async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { let stage = self.stage(); @@ -947,42 +899,11 @@ impl PartitionRepo for MemTxn { } } - async fn update_persisted_sequence_number( - &mut self, - partition_id: PartitionId, - sequence_number: SequenceNumber, - ) -> Result<()> { - let stage = self.stage(); - match stage.partitions.iter_mut().find(|p| p.id == partition_id) { - Some(p) => { - p.persisted_sequence_number = Some(sequence_number); - Ok(()) - } - None => Err(Error::PartitionNotFound { id: partition_id }), - } - } - async fn most_recent_n(&mut self, n: usize) -> Result> { let stage = self.stage(); Ok(stage.partitions.iter().rev().take(n).cloned().collect()) } - async fn most_recent_n_in_shards( - &mut self, - n: usize, - shards: &[ShardId], - ) -> Result> { - let stage = self.stage(); - Ok(stage - .partitions - .iter() - .rev() - .filter(|p| shards.contains(&p.shard_id)) - .take(n) - .cloned() - .collect()) - } - async fn partitions_with_recent_created_files( &mut self, time_in_the_past: Timestamp, @@ -1119,22 +1040,6 @@ impl ParquetFileRepo for MemTxn { .collect()) } - async fn list_by_shard_greater_than( - &mut self, - shard_id: ShardId, - sequence_number: SequenceNumber, - ) -> Result> { - let stage = self.stage(); - - let files: Vec<_> = stage - .parquet_files - .iter() - .filter(|f| f.shard_id == shard_id && f.max_sequence_number > sequence_number) - .cloned() - .collect(); - Ok(files) - } - async fn list_by_namespace_not_to_delete( &mut self, namespace_id: NamespaceId, @@ -1179,7 +1084,7 @@ impl ParquetFileRepo for MemTxn { Ok(parquet_files) } - async fn delete_old(&mut self, older_than: Timestamp) -> Result> { + async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result> { let stage = self.stage(); let (delete, keep): (Vec<_>, Vec<_>) = stage.parquet_files.iter().cloned().partition( @@ -1188,275 +1093,10 @@ impl ParquetFileRepo for MemTxn { stage.parquet_files = keep; + let delete = delete.into_iter().map(|f| f.id).collect(); Ok(delete) } - async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result> { - let delete = self - .delete_old(older_than) - .await - .unwrap() - .into_iter() - .map(|f| f.id) - .collect(); - Ok(delete) - } - - async fn level_0(&mut self, shard_id: ShardId) -> Result> { - let stage = self.stage(); - - Ok(stage - .parquet_files - .iter() - .filter(|f| { - f.shard_id == shard_id - && f.compaction_level == CompactionLevel::Initial - && f.to_delete.is_none() - }) - .cloned() - .collect()) - } - - async fn level_1( - &mut self, - table_partition: TablePartition, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result> { - let stage = self.stage(); - - Ok(stage - .parquet_files - .iter() - .filter(|f| { - f.shard_id == table_partition.shard_id - && f.table_id == table_partition.table_id - && f.partition_id == table_partition.partition_id - && f.compaction_level == CompactionLevel::FileNonOverlapped - && f.to_delete.is_none() - && ((f.min_time <= min_time && f.max_time >= min_time) - || (f.min_time > min_time && f.min_time <= max_time)) - }) - .cloned() - .collect()) - } - - async fn recent_highest_throughput_partitions( - &mut self, - shard_id: Option, - time_in_the_past: Timestamp, - min_num_files: usize, - num_partitions: usize, - ) -> Result> { - let recent_time = time_in_the_past; - - let stage = self.stage(); - - // Get partition info of selected files - let partitions = stage - .parquet_files - .iter() - .filter(|f| { - let shard_matches_if_specified = if let Some(shard_id) = shard_id { - f.shard_id == shard_id - } else { - true - }; - - shard_matches_if_specified - && f.created_at > recent_time - && f.compaction_level == CompactionLevel::Initial - && f.to_delete.is_none() - }) - .map(|pf| PartitionParam { - partition_id: pf.partition_id, - shard_id: pf.shard_id, - namespace_id: pf.namespace_id, - table_id: pf.table_id, - }) - .collect::>(); - - // Count num of files per partition by simply count the number of partition duplicates - let mut partition_duplicate_count: HashMap = - HashMap::with_capacity(partitions.len()); - for p in partitions { - let count = partition_duplicate_count.entry(p).or_insert(0); - *count += 1; - } - - // Partitions with select file count >= min_num_files that haven't been skipped by the - // compactor - let skipped_partitions: Vec<_> = stage - .skipped_compactions - .iter() - .map(|s| s.partition_id) - .collect(); - let mut partitions = partition_duplicate_count - .iter() - .filter(|(_, v)| v >= &&min_num_files) - .filter(|(p, _)| !skipped_partitions.contains(&p.partition_id)) - .collect::>(); - - // Sort partitions by file count - partitions.sort_by(|a, b| b.1.cmp(a.1)); - - // only return top partitions - let partitions = partitions - .into_iter() - .map(|(k, _)| *k) - .take(num_partitions) - .collect::>(); - - Ok(partitions) - } - - async fn partitions_with_small_l1_file_count( - &mut self, - shard_id: Option, - small_size_threshold_bytes: i64, - min_small_file_count: usize, - num_partitions: usize, - ) -> Result> { - let stage = self.stage(); - let skipped_partitions: Vec<_> = stage - .skipped_compactions - .iter() - .map(|s| s.partition_id) - .collect(); - // get a list of files for the shard that are under the size threshold and don't belong to - // a partition that has been skipped by the compactor - let relevant_parquet_files = stage - .parquet_files - .iter() - .filter(|f| { - let shard_matches_if_specified = if let Some(shard_id) = shard_id { - f.shard_id == shard_id - } else { - true - }; - - shard_matches_if_specified - && f.compaction_level == CompactionLevel::FileNonOverlapped - && f.file_size_bytes < small_size_threshold_bytes - && !skipped_partitions.contains(&f.partition_id) - }) - .collect::>(); - // count the number of files per partition & use that to retain only a list of counts that - // are above our threshold. the keys then become our partition candidates - let mut partition_small_file_count: HashMap = - HashMap::with_capacity(relevant_parquet_files.len()); - for pf in relevant_parquet_files { - let key = PartitionParam { - partition_id: pf.partition_id, - shard_id: pf.shard_id, - namespace_id: pf.namespace_id, - table_id: pf.table_id, - }; - if pf.to_delete.is_none() { - let count = partition_small_file_count.entry(key).or_insert(0); - *count += 1; - } - } - partition_small_file_count.retain(|_key, c| *c >= min_small_file_count); - let mut partitions = partition_small_file_count.iter().collect::>(); - // sort and return top N - partitions.sort_by(|a, b| b.1.cmp(a.1)); - Ok(partitions - .into_iter() - .map(|(k, _)| *k) - .take(num_partitions) - .collect::>()) - } - - async fn most_cold_files_partitions( - &mut self, - shard_id: Option, - time_in_the_past: Timestamp, - num_partitions: usize, - ) -> Result> { - let stage = self.stage(); - let relevant_parquet_files = stage - .parquet_files - .iter() - .filter(|f| { - let shard_matches_if_specified = if let Some(shard_id) = shard_id { - f.shard_id == shard_id - } else { - true - }; - - shard_matches_if_specified - && (f.compaction_level == CompactionLevel::Initial - || f.compaction_level == CompactionLevel::FileNonOverlapped) - }) - .collect::>(); - - // Count num of files per partition by simply count the number of partition duplicates - let mut partition_duplicate_count: HashMap = - HashMap::with_capacity(relevant_parquet_files.len()); - let mut partition_max_created_at = HashMap::with_capacity(relevant_parquet_files.len()); - for pf in relevant_parquet_files { - let key = PartitionParam { - partition_id: pf.partition_id, - shard_id: pf.shard_id, - namespace_id: pf.namespace_id, - table_id: pf.table_id, - }; - - if pf.to_delete.is_none() { - let count = partition_duplicate_count.entry(key).or_insert(0); - *count += 1; - } - - let created_at = if pf.compaction_level == CompactionLevel::Initial { - // the file is level-0, use its created_at time even if it is deleted - Some(pf.created_at) - } else if pf.to_delete.is_none() { - // non deleted level-1, make it `time_in_the_past - 1` to have this partition always the cold one - Some(time_in_the_past - 1) - } else { - // This is the case of deleted level-1 - None - }; - - if let Some(created_at) = created_at { - let max_created_at = partition_max_created_at.entry(key).or_insert(created_at); - *max_created_at = std::cmp::max(*max_created_at, created_at); - if created_at > *max_created_at { - *max_created_at = created_at; - } - } - } - - // Sort partitions whose max created at is older than the limit by their file count - let mut partitions = partition_duplicate_count - .iter() - .filter(|(k, _v)| partition_max_created_at.get(k).unwrap() < &time_in_the_past) - .collect::>(); - partitions.sort_by(|a, b| b.1.cmp(a.1)); - - // Return top partitions with most file counts that haven't been skipped by the compactor - let skipped_partitions: Vec<_> = stage - .skipped_compactions - .iter() - .map(|s| s.partition_id) - .collect(); - let partitions = partitions - .into_iter() - .map(|(k, _)| *k) - .filter(|pf| !skipped_partitions.contains(&pf.partition_id)) - .map(|pf| PartitionParam { - partition_id: pf.partition_id, - shard_id: pf.shard_id, - namespace_id: pf.namespace_id, - table_id: pf.table_id, - }) - .take(num_partitions) - .collect::>(); - - Ok(partitions) - } - async fn list_by_partition_not_to_delete( &mut self, partition_id: PartitionId, @@ -1509,58 +1149,6 @@ impl ParquetFileRepo for MemTxn { Ok(count_i64.unwrap()) } - async fn count_by_overlaps_with_level_0( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - sequence_number: SequenceNumber, - ) -> Result { - let stage = self.stage(); - - let count = stage - .parquet_files - .iter() - .filter(|f| { - f.shard_id == shard_id - && f.table_id == table_id - && f.max_sequence_number < sequence_number - && f.to_delete.is_none() - && f.compaction_level == CompactionLevel::Initial - && ((f.min_time <= min_time && f.max_time >= min_time) - || (f.min_time > min_time && f.min_time <= max_time)) - }) - .count(); - - i64::try_from(count).map_err(|_| Error::InvalidValue { value: count }) - } - - async fn count_by_overlaps_with_level_1( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result { - let stage = self.stage(); - - let count = stage - .parquet_files - .iter() - .filter(|f| { - f.shard_id == shard_id - && f.table_id == table_id - && f.to_delete.is_none() - && f.compaction_level == CompactionLevel::FileNonOverlapped - && ((f.min_time <= min_time && f.max_time >= min_time) - || (f.min_time > min_time && f.min_time <= max_time)) - }) - .count(); - - i64::try_from(count).map_err(|_| Error::InvalidValue { value: count }) - } - async fn get_by_object_store_id( &mut self, object_store_id: Uuid, diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index f3a4381ad3..6f04dbbc39 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -7,10 +7,10 @@ use crate::interface::{ }; use async_trait::async_trait; use data_types::{ - Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile, - ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, - QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, - TableId, TablePartition, Timestamp, TopicId, TopicMetadata, + Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId, + ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, QueryPool, + QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, + Timestamp, TopicId, TopicMetadata, }; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; @@ -212,7 +212,6 @@ decorate!( "column_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result>; "column_create_or_get_many_unchecked" = create_or_get_many_unchecked(&mut self, table_id: TableId, columns: HashMap<&str, ColumnType>) -> Result>; "column_list" = list(&mut self) -> Result>; - "column_list_type_count_by_table_id" = list_type_count_by_table_id(&mut self, table_id: TableId) -> Result>; ] ); @@ -232,16 +231,13 @@ decorate!( methods = [ "partition_create_or_get" = create_or_get(&mut self, key: PartitionKey, shard_id: ShardId, table_id: TableId) -> Result; "partition_get_by_id" = get_by_id(&mut self, partition_id: PartitionId) -> Result>; - "partition_list_by_namespace" = list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result>; "partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result>; "partition_list_ids" = list_ids(&mut self) -> Result>; "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: PartitionId, old_sort_key: Option>, new_sort_key: &[&str]) -> Result>>; "partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>; "partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result>; "partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result>; - "partition_update_persisted_sequence_number" = update_persisted_sequence_number(&mut self, partition_id: PartitionId, sequence_number: SequenceNumber) -> Result<()>; "partition_most_recent_n" = most_recent_n(&mut self, n: usize) -> Result>; - "partition_most_recent_n_in_shards" = most_recent_n_in_shards(&mut self, n: usize, shards: &[ShardId]) -> Result>; "partitions_with_recent_created_files" = partitions_with_recent_created_files(&mut self, time_in_the_past: Timestamp, max_num_partitions: usize) -> Result>; "partitions_new_file_between" = partitions_new_file_between(&mut self, minimum_time: Timestamp, maximum_time: Option) -> Result>; "get_in_skipped_compaction" = get_in_skipped_compaction(&mut self, partition_id: PartitionId) -> Result>; @@ -254,23 +250,14 @@ decorate!( "parquet_create" = create( &mut self, parquet_file_params: ParquetFileParams) -> Result; "parquet_flag_for_delete" = flag_for_delete(&mut self, id: ParquetFileId) -> Result<()>; "parquet_flag_for_delete_by_retention" = flag_for_delete_by_retention(&mut self) -> Result>; - "parquet_list_by_shard_greater_than" = list_by_shard_greater_than(&mut self, shard_id: ShardId, sequence_number: SequenceNumber) -> Result>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; "parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; "parquet_list_by_table" = list_by_table(&mut self, table_id: TableId) -> Result>; - "parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result>; "parquet_delete_old_ids_only" = delete_old_ids_only(&mut self, older_than: Timestamp) -> Result>; "parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result>; - "parquet_level_0" = level_0(&mut self, shard_id: ShardId) -> Result>; - "parquet_level_1" = level_1(&mut self, table_partition: TablePartition, min_time: Timestamp, max_time: Timestamp) -> Result>; "parquet_update_compaction_level" = update_compaction_level(&mut self, parquet_file_ids: &[ParquetFileId], compaction_level: CompactionLevel) -> Result>; "parquet_exist" = exist(&mut self, id: ParquetFileId) -> Result; "parquet_count" = count(&mut self) -> Result; - "parquet_count_by_overlaps_with_level_0" = count_by_overlaps_with_level_0(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp, sequence_number: SequenceNumber) -> Result; - "parquet_count_by_overlaps_with_level_1" = count_by_overlaps_with_level_1(&mut self, table_id: TableId, shard_id: ShardId, min_time: Timestamp, max_time: Timestamp) -> Result; "parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result>; - "recent_highest_throughput_partitions" = recent_highest_throughput_partitions(&mut self, shard_id: Option, time_in_the_past: Timestamp, min_num_files: usize, num_partitions: usize) -> Result>; - "parquet_partitions_with_small_l1_file_count" = partitions_with_small_l1_file_count(&mut self, shard_id: Option, small_size_threshold_bytes: i64, min_small_file_count: usize, num_partitions: usize) -> Result>; - "most_cold_files_partitions" = most_cold_files_partitions(&mut self, shard_id: Option, time_in_the_past: Timestamp, num_partitions: usize) -> Result>; ] ); diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 54036f1297..dd186fad68 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -12,11 +12,10 @@ use crate::{ }; use async_trait::async_trait; use data_types::{ - Column, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, NamespaceId, ParquetFile, - ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, - QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, - TableId, TablePartition, Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID, - TRANSITION_SHARD_INDEX, + Column, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, ParquetFileId, + ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, QueryPool, + QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, TableId, + Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }; use iox_time::{SystemProvider, TimeProvider}; use observability_deps::tracing::{debug, info, warn}; @@ -1090,21 +1089,6 @@ RETURNING *; Ok(out) } - - async fn list_type_count_by_table_id( - &mut self, - table_id: TableId, - ) -> Result> { - sqlx::query_as::<_, ColumnTypeCount>( - r#" -select column_type as col_type, count(1) from column_name where table_id = $1 group by 1; - "#, - ) - .bind(table_id) // $1 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } } #[async_trait] @@ -1265,21 +1249,6 @@ RETURNING *; Ok(Some(partition)) } - async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result> { - sqlx::query_as::<_, Partition>( - r#" -SELECT partition.* -FROM table_name -INNER JOIN partition on partition.table_id = table_name.id -WHERE table_name.namespace_id = $1; - "#, - ) - .bind(namespace_id) // $1 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { sqlx::query_as::<_, Partition>( r#" @@ -1459,27 +1428,6 @@ RETURNING * .context(interface::CouldNotDeleteSkippedCompactionsSnafu) } - async fn update_persisted_sequence_number( - &mut self, - partition_id: PartitionId, - sequence_number: SequenceNumber, - ) -> Result<()> { - let _ = sqlx::query( - r#" -UPDATE partition -SET persisted_sequence_number = $1 -WHERE id = $2; - "#, - ) - .bind(sequence_number.get()) // $1 - .bind(partition_id) // $2 - .execute(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(()) - } - async fn most_recent_n(&mut self, n: usize) -> Result> { sqlx::query_as(r#"SELECT * FROM partition ORDER BY id DESC LIMIT $1;"#) .bind(n as i64) // $1 @@ -1488,21 +1436,6 @@ WHERE id = $2; .map_err(|e| Error::SqlxError { source: e }) } - async fn most_recent_n_in_shards( - &mut self, - n: usize, - shards: &[ShardId], - ) -> Result> { - sqlx::query_as( - r#"SELECT * FROM partition WHERE shard_id IN (SELECT UNNEST($1)) ORDER BY id DESC LIMIT $2;"#, - ) - .bind(shards.iter().map(|v| v.get()).collect::>()) - .bind(n as i64) - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - async fn partitions_with_recent_created_files( &mut self, time_in_the_past: Timestamp, @@ -1646,31 +1579,6 @@ RETURNING *; Ok(flagged) } - async fn list_by_shard_greater_than( - &mut self, - shard_id: ShardId, - sequence_number: SequenceNumber, - ) -> Result> { - // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large - // `parquet_metadata` column!! - sqlx::query_as::<_, ParquetFile>( - r#" -SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set, max_l0_created_at -FROM parquet_file -WHERE shard_id = $1 - AND max_sequence_number > $2 -ORDER BY id; - "#, - ) - .bind(shard_id) // $1 - .bind(sequence_number) // $2 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - async fn list_by_namespace_not_to_delete( &mut self, namespace_id: NamespaceId, @@ -1733,20 +1641,6 @@ WHERE table_id = $1; .map_err(|e| Error::SqlxError { source: e }) } - async fn delete_old(&mut self, older_than: Timestamp) -> Result> { - sqlx::query_as::<_, ParquetFile>( - r#" -DELETE FROM parquet_file -WHERE to_delete < $1 -RETURNING *; - "#, - ) - .bind(older_than) // $1 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result> { // see https://www.crunchydata.com/blog/simulating-update-or-delete-with-limit-in-postgres-ctes-to-the-rescue let deleted = sqlx::query( @@ -1772,239 +1666,6 @@ RETURNING id; Ok(deleted) } - async fn level_0(&mut self, shard_id: ShardId) -> Result> { - // this intentionally limits the returned files to 10,000 as it is used to make - // a decision on the highest priority partitions. If compaction has never been - // run this could end up returning millions of results and taking too long to run. - // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large - // `parquet_metadata` column!! - sqlx::query_as::<_, ParquetFile>( - r#" -SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set, max_l0_created_at -FROM parquet_file -WHERE parquet_file.shard_id = $1 - AND parquet_file.compaction_level = $2 - AND parquet_file.to_delete IS NULL - LIMIT 1000; - "#, - ) - .bind(shard_id) // $1 - .bind(CompactionLevel::Initial) // $2 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - - async fn level_1( - &mut self, - table_partition: TablePartition, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result> { - // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large - // `parquet_metadata` column!! - sqlx::query_as::<_, ParquetFile>( - r#" -SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set, max_l0_created_at -FROM parquet_file -WHERE parquet_file.shard_id = $1 - AND parquet_file.table_id = $2 - AND parquet_file.partition_id = $3 - AND parquet_file.compaction_level = $4 - AND parquet_file.to_delete IS NULL - AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $5) - OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $6)); - "#, - ) - .bind(table_partition.shard_id) // $1 - .bind(table_partition.table_id) // $2 - .bind(table_partition.partition_id) // $3 - .bind(CompactionLevel::FileNonOverlapped) // $4 - .bind(min_time) // $5 - .bind(max_time) // $6 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - - async fn recent_highest_throughput_partitions( - &mut self, - shard_id: Option, - time_in_the_past: Timestamp, - min_num_files: usize, - num_partitions: usize, - ) -> Result> { - let min_num_files = min_num_files as i32; - let num_partitions = num_partitions as i32; - - match shard_id { - Some(shard_id) => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id, - parquet_file.namespace_id, count(parquet_file.id) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE compaction_level = $5 -AND to_delete is null -AND shard_id = $1 -AND created_at > $2 -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(id) >= $3 -ORDER BY 5 DESC -LIMIT $4; - "#, - ) - .bind(shard_id) // $1 - .bind(time_in_the_past) //$2 - .bind(min_num_files) // $3 - .bind(num_partitions) // $4 - .bind(CompactionLevel::Initial) // $5 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - None => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id, - parquet_file.namespace_id, count(parquet_file.id) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE compaction_level = $4 -AND to_delete is null -AND created_at > $1 -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(id) >= $2 -ORDER BY 5 DESC -LIMIT $3; - "#, - ) - .bind(time_in_the_past) //$1 - .bind(min_num_files) // $2 - .bind(num_partitions) // $3 - .bind(CompactionLevel::Initial) // $4 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - } - } - - async fn most_cold_files_partitions( - &mut self, - shard_id: Option, - time_in_the_past: Timestamp, - num_partitions: usize, - ) -> Result> { - let num_partitions = num_partitions as i32; - - // This query returns partitions with most L0+L1 files and all L0 files (both deleted and - // non deleted) are either created before the given time ($2) or not available (removed by - // garbage collector) - match shard_id { - Some(shard_id) => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, - parquet_file.table_id, - count(case when to_delete is null then 1 end) total_count, - max(case when compaction_level= $4 then parquet_file.created_at end) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE (compaction_level = $4 OR compaction_level = $5) -AND shard_id = $1 -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(case when to_delete is null then 1 end) > 0 - AND ( max(case when compaction_level= $4 then parquet_file.created_at end) < $2 OR - max(case when compaction_level= $4 then parquet_file.created_at end) is null) -ORDER BY total_count DESC -LIMIT $3; - "#, - ) - .bind(shard_id) // $1 - .bind(time_in_the_past) // $2 - .bind(num_partitions) // $3 - .bind(CompactionLevel::Initial) // $4 - .bind(CompactionLevel::FileNonOverlapped) // $5 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - None => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, - parquet_file.table_id, - count(case when to_delete is null then 1 end) total_count, - max(case when compaction_level= $4 then parquet_file.created_at end) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE (compaction_level = $3 OR compaction_level = $4) -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(case when to_delete is null then 1 end) > 0 - AND ( max(case when compaction_level= $3 then parquet_file.created_at end) < $1 OR - max(case when compaction_level= $3 then parquet_file.created_at end) is null) -ORDER BY total_count DESC -LIMIT $2; - "#, - ) - .bind(time_in_the_past) // $1 - .bind(num_partitions) // $2 - .bind(CompactionLevel::Initial) // $3 - .bind(CompactionLevel::FileNonOverlapped) // $4 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - } - } - - async fn partitions_with_small_l1_file_count( - &mut self, - shard_id: Option, - small_size_threshold_bytes: i64, - min_small_file_count: usize, - num_partitions: usize, - ) -> Result> { - // This query returns partitions with at least `min_small_file_count` small L1 files, - // where "small" means no bigger than `small_size_threshold_bytes`, limited to the top `num_partitions`. - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, - parquet_file.table_id, - COUNT(1) AS l1_file_count -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE compaction_level = $5 -AND to_delete IS NULL -AND shard_id = $1 -AND skipped_compactions.partition_id IS NULL -AND file_size_bytes < $3 -GROUP BY 1, 2, 3, 4 -HAVING COUNT(1) >= $2 -ORDER BY l1_file_count DESC -LIMIT $4; - "#, - ) - .bind(shard_id) // $1 - .bind(min_small_file_count as i32) // $2 - .bind(small_size_threshold_bytes) // $3 - .bind(num_partitions as i32) // $4 - .bind(CompactionLevel::FileNonOverlapped) // $5 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - async fn list_by_partition_not_to_delete( &mut self, partition_id: PartitionId, @@ -2075,71 +1736,6 @@ RETURNING id; Ok(read_result.count) } - async fn count_by_overlaps_with_level_0( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - sequence_number: SequenceNumber, - ) -> Result { - let read_result = sqlx::query_as::<_, Count>( - r#" -SELECT count(1) as count -FROM parquet_file -WHERE table_id = $1 - AND shard_id = $2 - AND max_sequence_number < $3 - AND parquet_file.to_delete IS NULL - AND compaction_level = $6 - AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4) - OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5)); - "#, - ) - .bind(table_id) // $1 - .bind(shard_id) // $2 - .bind(sequence_number) // $3 - .bind(min_time) // $4 - .bind(max_time) // $5 - .bind(CompactionLevel::Initial) // $6 - .fetch_one(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(read_result.count) - } - - async fn count_by_overlaps_with_level_1( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result { - let read_result = sqlx::query_as::<_, Count>( - r#" -SELECT count(1) as count -FROM parquet_file -WHERE table_id = $1 - AND shard_id = $2 - AND parquet_file.to_delete IS NULL - AND compaction_level = $5 - AND ((parquet_file.min_time <= $3 AND parquet_file.max_time >= $3) - OR (parquet_file.min_time > $3 AND parquet_file.min_time <= $4)); - "#, - ) - .bind(table_id) // $1 - .bind(shard_id) // $2 - .bind(min_time) // $3 - .bind(max_time) // $4 - .bind(CompactionLevel::FileNonOverlapped) // $5 - .fetch_one(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(read_result.count) - } - async fn get_by_object_store_id( &mut self, object_store_id: Uuid, @@ -2908,7 +2504,7 @@ mod tests { .repositories() .await .parquet_files() - .delete_old(now) + .delete_old_ids_only(now) .await .expect("parquet file deletion should succeed"); let total_file_size_bytes: i64 = diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 0adbca9ce9..b1e8236ec1 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -12,11 +12,10 @@ use crate::{ }; use async_trait::async_trait; use data_types::{ - Column, ColumnId, ColumnSet, ColumnType, ColumnTypeCount, CompactionLevel, Namespace, - NamespaceId, ParquetFile, ParquetFileId, ParquetFileParams, Partition, PartitionId, - PartitionKey, PartitionParam, QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, - ShardIndex, SkippedCompaction, Table, TableId, TablePartition, Timestamp, TopicId, - TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, + Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId, ParquetFile, + ParquetFileId, ParquetFileParams, Partition, PartitionId, PartitionKey, PartitionParam, + QueryPool, QueryPoolId, SequenceNumber, Shard, ShardId, ShardIndex, SkippedCompaction, Table, + TableId, Timestamp, TopicId, TopicMetadata, TRANSITION_SHARD_ID, TRANSITION_SHARD_INDEX, }; use serde::{Deserialize, Serialize}; use std::ops::Deref; @@ -870,21 +869,6 @@ RETURNING *; Ok(out) } - - async fn list_type_count_by_table_id( - &mut self, - table_id: TableId, - ) -> Result> { - sqlx::query_as::<_, ColumnTypeCount>( - r#" -select column_type as col_type, count(1) AS count from column_name where table_id = $1 group by 1; - "#, - ) - .bind(table_id) // $1 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e }) - } } #[async_trait] @@ -1073,24 +1057,6 @@ RETURNING *; Ok(Some(partition.into())) } - async fn list_by_namespace(&mut self, namespace_id: NamespaceId) -> Result> { - Ok(sqlx::query_as::<_, PartitionPod>( - r#" -SELECT partition.* -FROM table_name -INNER JOIN partition on partition.table_id = table_name.id -WHERE table_name.namespace_id = $1; - "#, - ) - .bind(namespace_id) // $1 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })? - .into_iter() - .map(Into::into) - .collect()) - } - async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { Ok(sqlx::query_as::<_, PartitionPod>( r#" @@ -1274,27 +1240,6 @@ RETURNING * .context(interface::CouldNotDeleteSkippedCompactionsSnafu) } - async fn update_persisted_sequence_number( - &mut self, - partition_id: PartitionId, - sequence_number: SequenceNumber, - ) -> Result<()> { - let _ = sqlx::query( - r#" -UPDATE partition -SET persisted_sequence_number = $1 -WHERE id = $2; - "#, - ) - .bind(sequence_number.get()) // $1 - .bind(partition_id) // $2 - .execute(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(()) - } - async fn most_recent_n(&mut self, n: usize) -> Result> { Ok(sqlx::query_as::<_, PartitionPod>( r#"SELECT * FROM partition ORDER BY id DESC LIMIT $1;"#, @@ -1308,24 +1253,6 @@ WHERE id = $2; .collect()) } - async fn most_recent_n_in_shards( - &mut self, - n: usize, - shards: &[ShardId], - ) -> Result> { - Ok(sqlx::query_as::<_, PartitionPod>( - r#"SELECT * FROM partition WHERE shard_id IN (SELECT value FROM json_each($1)) ORDER BY id DESC LIMIT $2;"#, - ) - .bind(&Json(shards.iter().map(|v| v.get()).collect::>())) - .bind(n as i64) - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })? - .into_iter() - .map(Into::into) - .collect()) - } - async fn partitions_with_recent_created_files( &mut self, time_in_the_past: Timestamp, @@ -1520,34 +1447,6 @@ RETURNING *; Ok(flagged) } - async fn list_by_shard_greater_than( - &mut self, - shard_id: ShardId, - sequence_number: SequenceNumber, - ) -> Result> { - // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large - // `parquet_metadata` column!! - Ok(sqlx::query_as::<_, ParquetFilePod>( - r#" -SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set, max_l0_created_at -FROM parquet_file -WHERE shard_id = $1 - AND max_sequence_number > $2 -ORDER BY id; - "#, - ) - .bind(shard_id) // $1 - .bind(sequence_number) // $2 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })? - .into_iter() - .map(Into::into) - .collect()) - } - async fn list_by_namespace_not_to_delete( &mut self, namespace_id: NamespaceId, @@ -1619,23 +1518,6 @@ WHERE table_id = $1; .collect()) } - async fn delete_old(&mut self, older_than: Timestamp) -> Result> { - Ok(sqlx::query_as::<_, ParquetFilePod>( - r#" -DELETE FROM parquet_file -WHERE to_delete < $1 -RETURNING *; - "#, - ) - .bind(older_than) // $1 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })? - .into_iter() - .map(Into::into) - .collect()) - } - async fn delete_old_ids_only(&mut self, older_than: Timestamp) -> Result> { // see https://www.crunchydata.com/blog/simulating-update-or-delete-with-limit-in-sqlite-ctes-to-the-rescue let deleted = sqlx::query( @@ -1661,245 +1543,6 @@ RETURNING id; Ok(deleted) } - async fn level_0(&mut self, shard_id: ShardId) -> Result> { - // this intentionally limits the returned files to 10,000 as it is used to make - // a decision on the highest priority partitions. If compaction has never been - // run this could end up returning millions of results and taking too long to run. - // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large - // `parquet_metadata` column!! - Ok(sqlx::query_as::<_, ParquetFilePod>( - r#" -SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set, max_l0_created_at -FROM parquet_file -WHERE parquet_file.shard_id = $1 - AND parquet_file.compaction_level = $2 - AND parquet_file.to_delete IS NULL - LIMIT 1000; - "#, - ) - .bind(shard_id) // $1 - .bind(CompactionLevel::Initial) // $2 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })? - .into_iter() - .map(Into::into) - .collect()) - } - - async fn level_1( - &mut self, - table_partition: TablePartition, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result> { - // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large - // `parquet_metadata` column!! - Ok(sqlx::query_as::<_, ParquetFilePod>( - r#" -SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, - max_sequence_number, min_time, max_time, to_delete, file_size_bytes, - row_count, compaction_level, created_at, column_set, max_l0_created_at -FROM parquet_file -WHERE parquet_file.shard_id = $1 - AND parquet_file.table_id = $2 - AND parquet_file.partition_id = $3 - AND parquet_file.compaction_level = $4 - AND parquet_file.to_delete IS NULL - AND ((parquet_file.min_time <= $5 AND parquet_file.max_time >= $5) - OR (parquet_file.min_time > $5 AND parquet_file.min_time <= $6)); - "#, - ) - .bind(table_partition.shard_id) // $1 - .bind(table_partition.table_id) // $2 - .bind(table_partition.partition_id) // $3 - .bind(CompactionLevel::FileNonOverlapped) // $4 - .bind(min_time) // $5 - .bind(max_time) // $6 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })? - .into_iter() - .map(Into::into) - .collect()) - } - - async fn recent_highest_throughput_partitions( - &mut self, - shard_id: Option, - time_in_the_past: Timestamp, - min_num_files: usize, - num_partitions: usize, - ) -> Result> { - let min_num_files = min_num_files as i32; - let num_partitions = num_partitions as i32; - - match shard_id { - Some(shard_id) => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id, - parquet_file.namespace_id, count(parquet_file.id) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE compaction_level = $5 -AND to_delete is null -AND shard_id = $1 -AND created_at > $2 -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(id) >= $3 -ORDER BY 5 DESC -LIMIT $4; - "#, - ) - .bind(shard_id) // $1 - .bind(time_in_the_past) //$2 - .bind(min_num_files) // $3 - .bind(num_partitions) // $4 - .bind(CompactionLevel::Initial) // $5 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - None => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.table_id, parquet_file.shard_id, - parquet_file.namespace_id, count(parquet_file.id) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE compaction_level = $4 -AND to_delete is null -AND created_at > $1 -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(id) >= $2 -ORDER BY 5 DESC -LIMIT $3; - "#, - ) - .bind(time_in_the_past) //$1 - .bind(min_num_files) // $2 - .bind(num_partitions) // $3 - .bind(CompactionLevel::Initial) // $4 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - } - } - - async fn partitions_with_small_l1_file_count( - &mut self, - shard_id: Option, - small_size_threshold_bytes: i64, - min_small_file_count: usize, - num_partitions: usize, - ) -> Result> { - // This query returns partitions with at least `min_small_file_count` small L1 files, - // where "small" means no bigger than `small_size_threshold_bytes`, limited to the top `num_partitions`. - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, - parquet_file.table_id, - COUNT(1) AS l1_file_count -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE compaction_level = $5 -AND to_delete IS NULL -AND shard_id = $1 -AND skipped_compactions.partition_id IS NULL -AND file_size_bytes < $3 -GROUP BY 1, 2, 3, 4 -HAVING COUNT(1) >= $2 -ORDER BY l1_file_count DESC -LIMIT $4; - "#, - ) - .bind(shard_id) // $1 - .bind(min_small_file_count as i32) // $2 - .bind(small_size_threshold_bytes) // $3 - .bind(num_partitions as i32) // $4 - .bind(CompactionLevel::FileNonOverlapped) // $5 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - - async fn most_cold_files_partitions( - &mut self, - shard_id: Option, - time_in_the_past: Timestamp, - num_partitions: usize, - ) -> Result> { - let num_partitions = num_partitions as i32; - - // This query returns partitions with most L0+L1 files and all L0 files (both deleted and - // non deleted) are either created before the given time ($2) or not available (removed by - // garbage collector) - match shard_id { - Some(shard_id) => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, - parquet_file.table_id, - count(case when to_delete is null then 1 end) total_count, - max(case when compaction_level= $4 then parquet_file.created_at end) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE (compaction_level = $4 OR compaction_level = $5) -AND shard_id = $1 -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(case when to_delete is null then 1 end) > 0 - AND ( max(case when compaction_level= $4 then parquet_file.created_at end) < $2 OR - max(case when compaction_level= $4 then parquet_file.created_at end) is null) -ORDER BY total_count DESC -LIMIT $3; - "#, - ) - .bind(shard_id) // $1 - .bind(time_in_the_past) // $2 - .bind(num_partitions) // $3 - .bind(CompactionLevel::Initial) // $4 - .bind(CompactionLevel::FileNonOverlapped) // $5 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - None => { - sqlx::query_as::<_, PartitionParam>( - r#" -SELECT parquet_file.partition_id, parquet_file.shard_id, parquet_file.namespace_id, - parquet_file.table_id, - count(case when to_delete is null then 1 end) total_count, - max(case when compaction_level= $4 then parquet_file.created_at end) -FROM parquet_file -LEFT OUTER JOIN skipped_compactions ON parquet_file.partition_id = skipped_compactions.partition_id -WHERE (compaction_level = $3 OR compaction_level = $4) -AND skipped_compactions.partition_id IS NULL -GROUP BY 1, 2, 3, 4 -HAVING count(case when to_delete is null then 1 end) > 0 - AND ( max(case when compaction_level= $3 then parquet_file.created_at end) < $1 OR - max(case when compaction_level= $3 then parquet_file.created_at end) is null) -ORDER BY total_count DESC -LIMIT $2; - "#, - ) - .bind(time_in_the_past) // $1 - .bind(num_partitions) // $2 - .bind(CompactionLevel::Initial) // $3 - .bind(CompactionLevel::FileNonOverlapped) // $4 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e }) - } - } - } - async fn list_by_partition_not_to_delete( &mut self, partition_id: PartitionId, @@ -1973,71 +1616,6 @@ RETURNING id; Ok(read_result.count) } - async fn count_by_overlaps_with_level_0( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - sequence_number: SequenceNumber, - ) -> Result { - let read_result = sqlx::query_as::<_, Count>( - r#" -SELECT count(1) as count -FROM parquet_file -WHERE table_id = $1 - AND shard_id = $2 - AND max_sequence_number < $3 - AND parquet_file.to_delete IS NULL - AND compaction_level = $6 - AND ((parquet_file.min_time <= $4 AND parquet_file.max_time >= $4) - OR (parquet_file.min_time > $4 AND parquet_file.min_time <= $5)); - "#, - ) - .bind(table_id) // $1 - .bind(shard_id) // $2 - .bind(sequence_number) // $3 - .bind(min_time) // $4 - .bind(max_time) // $5 - .bind(CompactionLevel::Initial) // $6 - .fetch_one(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(read_result.count) - } - - async fn count_by_overlaps_with_level_1( - &mut self, - table_id: TableId, - shard_id: ShardId, - min_time: Timestamp, - max_time: Timestamp, - ) -> Result { - let read_result = sqlx::query_as::<_, Count>( - r#" -SELECT count(1) as count -FROM parquet_file -WHERE table_id = $1 - AND shard_id = $2 - AND parquet_file.to_delete IS NULL - AND compaction_level = $5 - AND ((parquet_file.min_time <= $3 AND parquet_file.max_time >= $3) - OR (parquet_file.min_time > $3 AND parquet_file.min_time <= $4)); - "#, - ) - .bind(table_id) // $1 - .bind(shard_id) // $2 - .bind(min_time) // $3 - .bind(max_time) // $4 - .bind(CompactionLevel::FileNonOverlapped) // $5 - .fetch_one(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })?; - - Ok(read_result.count) - } - async fn get_by_object_store_id( &mut self, object_store_id: Uuid, @@ -2544,7 +2122,7 @@ mod tests { .repositories() .await .parquet_files() - .delete_old(now) + .delete_old_ids_only(now) .await .expect("parquet file deletion should succeed"); let total_file_size_bytes: i64 = diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index 19598ca44e..577ed55a37 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -6,8 +6,8 @@ use arrow::{ }; use data_types::{ Column, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceSchema, ParquetFile, - ParquetFileParams, Partition, PartitionId, QueryPool, SequenceNumber, Shard, ShardId, - ShardIndex, Table, TableId, TablePartition, TableSchema, Timestamp, TopicMetadata, + ParquetFileParams, Partition, PartitionId, QueryPool, SequenceNumber, Shard, ShardIndex, Table, + TableId, TableSchema, Timestamp, TopicMetadata, }; use datafusion::physical_plan::metrics::Count; use datafusion_util::MemoryStream; @@ -185,48 +185,6 @@ impl TestCatalog { .await } - /// List level 0 files - pub async fn list_level_0_files(self: &Arc, shard_id: ShardId) -> Vec { - self.catalog - .repositories() - .await - .parquet_files() - .level_0(shard_id) - .await - .unwrap() - } - - /// Count level 0 files - pub async fn count_level_0_files(self: &Arc, shard_id: ShardId) -> usize { - let level_0 = self - .catalog - .repositories() - .await - .parquet_files() - .level_0(shard_id) - .await - .unwrap(); - level_0.len() - } - - /// Count level 1 files - pub async fn count_level_1_files( - self: &Arc, - table_partition: TablePartition, - min_time: Timestamp, - max_time: Timestamp, - ) -> usize { - let level_1 = self - .catalog - .repositories() - .await - .parquet_files() - .level_1(table_partition, min_time, max_time) - .await - .unwrap(); - level_1.len() - } - /// List all non-deleted files pub async fn list_by_table_not_to_delete( self: &Arc,