diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 40554b5724..eeb41a7f21 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -5,8 +5,8 @@ use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName, NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, - ParquetFileParams, Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, - TableSchema, Timestamp, + ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, + Table, TableId, TableSchema, Timestamp, }; use iox_time::TimeProvider; use snafu::{OptionExt, Snafu}; @@ -372,6 +372,12 @@ pub trait PartitionRepo: Send + Sync { /// get partition by ID async fn get_by_id(&mut self, partition_id: PartitionId) -> Result>; + /// get partition by deterministic hash ID + async fn get_by_hash_id( + &mut self, + partition_hash_id: &PartitionHashId, + ) -> Result>; + /// return the partitions by table id async fn list_by_table_id(&mut self, table_id: TableId) -> Result>; @@ -1490,12 +1496,30 @@ pub(crate) mod test_helpers { .unwrap() .unwrap() ); + assert_eq!( + other_partition, + repos + .partitions() + .get_by_hash_id(other_partition.hash_id().unwrap()) + .await + .unwrap() + .unwrap() + ); assert!(repos .partitions() .get_by_id(PartitionId::new(i64::MAX)) .await .unwrap() .is_none()); + assert!(repos + .partitions() + .get_by_hash_id(&PartitionHashId::new( + TableId::new(i64::MAX), + &PartitionKey::from("arbitrary") + )) + .await + .unwrap() + .is_none()); let listed = repos .partitions() @@ -1554,6 +1578,16 @@ pub(crate) mod test_helpers { updated_other_partition.sort_key, vec!["tag2", "tag1", "time"] ); + let updated_other_partition = repos + .partitions() + .get_by_hash_id(other_partition.hash_id().unwrap()) + .await + .unwrap() + .unwrap(); + assert_eq!( + updated_other_partition.sort_key, + vec!["tag2", "tag1", "time"] + ); // test sort key CAS with no value let err = repos @@ -1610,6 +1644,16 @@ pub(crate) mod test_helpers { updated_other_partition.sort_key, vec!["tag2", "tag1", "tag3 , with comma", "time"] ); + let updated_other_partition = repos + .partitions() + .get_by_hash_id(other_partition.hash_id().unwrap()) + .await + .unwrap() + .unwrap(); + assert_eq!( + updated_other_partition.sort_key, + vec!["tag2", "tag1", "tag3 , with comma", "time"] + ); // The compactor can log why compaction was skipped let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 739937f17c..ebe8663a4d 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -18,7 +18,8 @@ use data_types::{ }, Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, - Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, + Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, + Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use snafu::ensure; @@ -585,6 +586,23 @@ impl PartitionRepo for MemTxn { .cloned()) } + async fn get_by_hash_id( + &mut self, + partition_hash_id: &PartitionHashId, + ) -> Result> { + let stage = self.stage(); + + Ok(stage + .partitions + .iter() + .find(|p| { + p.hash_id() + .map(|hash_id| hash_id == partition_hash_id) + .unwrap_or_default() + }) + .cloned()) + } + async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { let stage = self.stage(); diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index e59ff22fd6..9d6afd52e4 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -9,7 +9,8 @@ use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams, - Partition, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, + Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, + Timestamp, }; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; @@ -170,6 +171,7 @@ decorate!( methods = [ "partition_create_or_get" = create_or_get(&mut self, key: PartitionKey, table_id: TableId) -> Result; "partition_get_by_id" = get_by_id(&mut self, partition_id: PartitionId) -> Result>; + "partition_get_by_hash_id" = get_by_hash_id(&mut self, partition_hash_id: &PartitionHashId) -> Result>; "partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result>; "partition_list_ids" = list_ids(&mut self) -> Result>; "partition_update_sort_key" = cas_sort_key(&mut self, partition_id: PartitionId, old_sort_key: Option>, new_sort_key: &[&str]) -> Result>>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 64eb27813c..46ab41bea2 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1095,6 +1095,30 @@ WHERE id = $1; Ok(Some(partition)) } + async fn get_by_hash_id( + &mut self, + partition_hash_id: &PartitionHashId, + ) -> Result> { + let rec = sqlx::query_as::<_, Partition>( + r#" +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at +FROM partition +WHERE hash_id = $1; + "#, + ) + .bind(partition_hash_id) // $1 + .fetch_one(&mut self.inner) + .await; + + if let Err(sqlx::Error::RowNotFound) = rec { + return Ok(None); + } + + let partition = rec.map_err(|e| Error::SqlxError { source: e })?; + + Ok(Some(partition)) + } + async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { sqlx::query_as::<_, Partition>( r#" diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 480e562d7c..e917a0bbe8 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -891,6 +891,30 @@ WHERE id = $1; Ok(Some(partition.into())) } + async fn get_by_hash_id( + &mut self, + partition_hash_id: &PartitionHashId, + ) -> Result> { + let rec = sqlx::query_as::<_, PartitionPod>( + r#" +SELECT id, hash_id, table_id, partition_key, sort_key, new_file_at +FROM partition +WHERE hash_id = $1; + "#, + ) + .bind(partition_hash_id) // $1 + .fetch_one(self.inner.get_mut()) + .await; + + if let Err(sqlx::Error::RowNotFound) = rec { + return Ok(None); + } + + let partition = rec.map_err(|e| Error::SqlxError { source: e })?; + + Ok(Some(partition.into())) + } + async fn list_by_table_id(&mut self, table_id: TableId) -> Result> { Ok(sqlx::query_as::<_, PartitionPod>( r#"