diff --git a/compactor/src/components/partition_source/catalog.rs b/compactor/src/components/partition_source/catalog.rs index bde8351eef..b29ca839cd 100644 --- a/compactor/src/components/partition_source/catalog.rs +++ b/compactor/src/components/partition_source/catalog.rs @@ -2,8 +2,8 @@ use std::{fmt::Display, sync::Arc}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{Partition, PartitionId}; -use iox_catalog::interface::Catalog; +use data_types::{Partition, PartitionId, TransitionPartitionId}; +use iox_catalog::{interface::Catalog, partition_lookup}; use super::PartitionSource; @@ -33,12 +33,9 @@ impl PartitionSource for CatalogPartitionSource { async fn fetch_by_id(&self, partition_id: PartitionId) -> Option { Backoff::new(&self.backoff_config) .retry_all_errors("partition_by_id", || async { - self.catalog - .repositories() - .await - .partitions() - .get_by_id(partition_id) - .await + let mut repos = self.catalog.repositories().await; + let id = TransitionPartitionId::Deprecated(partition_id); + partition_lookup(repos.as_mut(), &id).await }) .await .expect("retry forever") diff --git a/ingester/src/buffer_tree/partition/resolver/catalog.rs b/ingester/src/buffer_tree/partition/resolver/catalog.rs index 9b80f8dac8..770f3a052f 100644 --- a/ingester/src/buffer_tree/partition/resolver/catalog.rs +++ b/ingester/src/buffer_tree/partition/resolver/catalog.rs @@ -100,7 +100,11 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table}; + use data_types::TransitionPartitionId; + use iox_catalog::{ + partition_lookup, + test_helpers::{arbitrary_namespace, arbitrary_table}, + }; use super::*; use crate::buffer_tree::table::TableName; @@ -161,11 +165,9 @@ mod tests { assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None)); assert!(got.lock().partition_key.ptr_eq(&callers_partition_key)); - let got = catalog - .repositories() - .await - .partitions() - .get_by_id(got.lock().partition_id) + let mut repos = catalog.repositories().await; + let id = TransitionPartitionId::Deprecated(got.lock().partition_id); + let got = partition_lookup(repos.as_mut(), &id) .await .unwrap() .expect("partition not created"); diff --git a/ingester/src/buffer_tree/partition/resolver/sort_key.rs b/ingester/src/buffer_tree/partition/resolver/sort_key.rs index 71e898f140..113b047b5d 100644 --- a/ingester/src/buffer_tree/partition/resolver/sort_key.rs +++ b/ingester/src/buffer_tree/partition/resolver/sort_key.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use backoff::{Backoff, BackoffConfig}; -use data_types::PartitionId; -use iox_catalog::interface::Catalog; +use data_types::{PartitionId, TransitionPartitionId}; +use iox_catalog::{interface::Catalog, partition_lookup}; use schema::sort::SortKey; /// A resolver of [`SortKey`] from the catalog for a given [`PartitionId`]. @@ -33,12 +33,9 @@ impl SortKeyResolver { pub(crate) async fn fetch(self) -> Option { Backoff::new(&self.backoff_config) .retry_all_errors("fetch partition sort key", || async { - let s = self - .catalog - .repositories() - .await - .partitions() - .get_by_id(self.partition_id) + let mut repos = self.catalog.repositories().await; + let id = TransitionPartitionId::Deprecated(self.partition_id); + let s = partition_lookup(repos.as_mut(), &id) .await? .unwrap_or_else(|| { panic!( diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index 7d1cb04995..d7142a6268 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -22,7 +22,7 @@ use workspace_hack as _; use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result}; use data_types::{ partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride}, - ColumnType, NamespaceId, NamespaceSchema, TableSchema, + ColumnType, NamespaceId, NamespaceSchema, Partition, TableSchema, TransitionPartitionId, }; use mutable_batch::MutableBatch; use std::{borrow::Cow, collections::HashMap}; @@ -67,6 +67,27 @@ impl TableScopedError { } } +/// Look up a partition in the catalog by either database-assigned ID or deterministic hash ID. +/// +/// The existence of this function should be temporary; it can be removed once all partition lookup +/// is happening with only the deterministic hash ID. +pub async fn partition_lookup( + repos: &mut R, + id: &TransitionPartitionId, +) -> Result, Error> +where + R: RepoCollection + ?Sized, +{ + match id { + TransitionPartitionId::Deprecated(partition_id) => { + repos.partitions().get_by_id(*partition_id).await + } + TransitionPartitionId::Deterministic(partition_hash_id) => { + repos.partitions().get_by_hash_id(partition_hash_id).await + } + } +} + /// Given an iterator of `(table_name, batch)` to validate, this function /// ensures all the columns within `batch` match the existing schema for /// `table_name` in `schema`. If the column does not already exist in `schema`, diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index fba27593e7..fd5fa52cd3 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -13,10 +13,10 @@ use cache_system::{ }; use data_types::{ partition_template::{build_column_values, ColumnValue}, - ColumnId, Partition, PartitionId, + ColumnId, Partition, PartitionId, TransitionPartitionId, }; use datafusion::scalar::ScalarValue; -use iox_catalog::interface::Catalog; +use iox_catalog::{interface::Catalog, partition_lookup}; use iox_query::chunk_statistics::{ColumnRange, ColumnRanges}; use iox_time::TimeProvider; use observability_deps::tracing::debug; @@ -66,12 +66,9 @@ impl PartitionCache { async move { let partition = Backoff::new(&backoff_config) .retry_all_errors("get partition_key", || async { - catalog - .repositories() - .await - .partitions() - .get_by_id(partition_id) - .await + let mut repos = catalog.repositories().await; + let id = TransitionPartitionId::Deprecated(partition_id); + partition_lookup(repos.as_mut(), &id).await }) .await .expect("retry forever")?;