feat: Abstract over which partition ID type we're using to get a partition from the catalog
parent
5521310005
commit
eec31b7f00
|
@ -2,8 +2,8 @@ use std::{fmt::Display, sync::Arc};
|
|||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::{Partition, PartitionId};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use data_types::{Partition, PartitionId, TransitionPartitionId};
|
||||
use iox_catalog::{interface::Catalog, partition_lookup};
|
||||
|
||||
use super::PartitionSource;
|
||||
|
||||
|
@ -33,12 +33,9 @@ impl PartitionSource for CatalogPartitionSource {
|
|||
async fn fetch_by_id(&self, partition_id: PartitionId) -> Option<Partition> {
|
||||
Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("partition_by_id", || async {
|
||||
self.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(partition_id)
|
||||
.await
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
let id = TransitionPartitionId::Deprecated(partition_id);
|
||||
partition_lookup(repos.as_mut(), &id).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")
|
||||
|
|
|
@ -100,7 +100,11 @@ mod tests {
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table};
|
||||
use data_types::TransitionPartitionId;
|
||||
use iox_catalog::{
|
||||
partition_lookup,
|
||||
test_helpers::{arbitrary_namespace, arbitrary_table},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
use crate::buffer_tree::table::TableName;
|
||||
|
@ -161,11 +165,9 @@ mod tests {
|
|||
assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None));
|
||||
assert!(got.lock().partition_key.ptr_eq(&callers_partition_key));
|
||||
|
||||
let got = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(got.lock().partition_id)
|
||||
let mut repos = catalog.repositories().await;
|
||||
let id = TransitionPartitionId::Deprecated(got.lock().partition_id);
|
||||
let got = partition_lookup(repos.as_mut(), &id)
|
||||
.await
|
||||
.unwrap()
|
||||
.expect("partition not created");
|
||||
|
|
|
@ -3,8 +3,8 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::PartitionId;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use data_types::{PartitionId, TransitionPartitionId};
|
||||
use iox_catalog::{interface::Catalog, partition_lookup};
|
||||
use schema::sort::SortKey;
|
||||
|
||||
/// A resolver of [`SortKey`] from the catalog for a given [`PartitionId`].
|
||||
|
@ -33,12 +33,9 @@ impl SortKeyResolver {
|
|||
pub(crate) async fn fetch(self) -> Option<SortKey> {
|
||||
Backoff::new(&self.backoff_config)
|
||||
.retry_all_errors("fetch partition sort key", || async {
|
||||
let s = self
|
||||
.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(self.partition_id)
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
let id = TransitionPartitionId::Deprecated(self.partition_id);
|
||||
let s = partition_lookup(repos.as_mut(), &id)
|
||||
.await?
|
||||
.unwrap_or_else(|| {
|
||||
panic!(
|
||||
|
|
|
@ -22,7 +22,7 @@ use workspace_hack as _;
|
|||
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result};
|
||||
use data_types::{
|
||||
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
|
||||
ColumnType, NamespaceId, NamespaceSchema, TableSchema,
|
||||
ColumnType, NamespaceId, NamespaceSchema, Partition, TableSchema, TransitionPartitionId,
|
||||
};
|
||||
use mutable_batch::MutableBatch;
|
||||
use std::{borrow::Cow, collections::HashMap};
|
||||
|
@ -67,6 +67,27 @@ impl TableScopedError {
|
|||
}
|
||||
}
|
||||
|
||||
/// Look up a partition in the catalog by either database-assigned ID or deterministic hash ID.
|
||||
///
|
||||
/// The existence of this function should be temporary; it can be removed once all partition lookup
|
||||
/// is happening with only the deterministic hash ID.
|
||||
pub async fn partition_lookup<R>(
|
||||
repos: &mut R,
|
||||
id: &TransitionPartitionId,
|
||||
) -> Result<Option<Partition>, Error>
|
||||
where
|
||||
R: RepoCollection + ?Sized,
|
||||
{
|
||||
match id {
|
||||
TransitionPartitionId::Deprecated(partition_id) => {
|
||||
repos.partitions().get_by_id(*partition_id).await
|
||||
}
|
||||
TransitionPartitionId::Deterministic(partition_hash_id) => {
|
||||
repos.partitions().get_by_hash_id(partition_hash_id).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Given an iterator of `(table_name, batch)` to validate, this function
|
||||
/// ensures all the columns within `batch` match the existing schema for
|
||||
/// `table_name` in `schema`. If the column does not already exist in `schema`,
|
||||
|
|
|
@ -13,10 +13,10 @@ use cache_system::{
|
|||
};
|
||||
use data_types::{
|
||||
partition_template::{build_column_values, ColumnValue},
|
||||
ColumnId, Partition, PartitionId,
|
||||
ColumnId, Partition, PartitionId, TransitionPartitionId,
|
||||
};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_catalog::{interface::Catalog, partition_lookup};
|
||||
use iox_query::chunk_statistics::{ColumnRange, ColumnRanges};
|
||||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::debug;
|
||||
|
@ -66,12 +66,9 @@ impl PartitionCache {
|
|||
async move {
|
||||
let partition = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("get partition_key", || async {
|
||||
catalog
|
||||
.repositories()
|
||||
.await
|
||||
.partitions()
|
||||
.get_by_id(partition_id)
|
||||
.await
|
||||
let mut repos = catalog.repositories().await;
|
||||
let id = TransitionPartitionId::Deprecated(partition_id);
|
||||
partition_lookup(repos.as_mut(), &id).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")?;
|
||||
|
|
Loading…
Reference in New Issue