Merge pull request #8083 from influxdata/cn/query-catalog-with-either-partition-identifier

feat: Query the catalog with PartitionHashId if you have it, otherwise with PartitionId
pull/24376/head
kodiakhq[bot] 2023-07-12 14:56:29 +00:00 committed by GitHub
commit 5f63407f24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 248 additions and 124 deletions

View File

@ -5,7 +5,7 @@ use std::{
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{ParquetFile, PartitionId};
use data_types::{ParquetFile, PartitionId, TransitionPartitionId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::warn;
@ -61,7 +61,7 @@ impl CatalogQuerier for Arc<dyn Catalog> {
self.repositories()
.await
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
.await
}
}

View File

@ -2,8 +2,8 @@ use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{Partition, PartitionId};
use iox_catalog::interface::Catalog;
use data_types::{Partition, PartitionId, TransitionPartitionId};
use iox_catalog::{interface::Catalog, partition_lookup};
use super::PartitionSource;
@ -33,12 +33,9 @@ impl PartitionSource for CatalogPartitionSource {
async fn fetch_by_id(&self, partition_id: PartitionId) -> Option<Partition> {
Backoff::new(&self.backoff_config)
.retry_all_errors("partition_by_id", || async {
self.catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
let mut repos = self.catalog.repositories().await;
let id = TransitionPartitionId::Deprecated(partition_id);
partition_lookup(repos.as_mut(), &id).await
})
.await
.expect("retry forever")

View File

@ -542,7 +542,7 @@ mod tests {
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> iox_catalog::interface::Result<Vec<ParquetFile>> {
self.inner
.list_by_partition_not_to_delete(partition_id)

View File

@ -526,7 +526,7 @@ impl RemoteImporter {
let res = repos
.partitions()
.cas_sort_key(
partition.id,
&partition.transition_partition_id(),
Some(partition.sort_key.clone()),
&new_sort_key,
)

View File

@ -928,27 +928,26 @@ mod tests {
// Populate the catalog with the namespace / table
let (_ns_id, table_id) = populate_catalog(&*catalog, "bananas", "platanos").await;
let partition_id = catalog
let partition = catalog
.repositories()
.await
.partitions()
.create_or_get("test".into(), table_id)
.await
.expect("should create")
.id;
.expect("should create");
catalog
.repositories()
.await
.partitions()
.cas_sort_key(partition_id, None, &["terrific"])
.cas_sort_key(&partition.transition_partition_id(), None, &["terrific"])
.await
.unwrap();
// Read the just-created sort key (None)
let fetcher = Arc::new(DeferredLoad::new(
Duration::from_nanos(1),
SortKeyResolver::new(partition_id, Arc::clone(&catalog), backoff_config.clone())
SortKeyResolver::new(partition.id, Arc::clone(&catalog), backoff_config.clone())
.fetch(),
&metrics,
));

View File

@ -100,7 +100,11 @@ mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use iox_catalog::test_helpers::{arbitrary_namespace, arbitrary_table};
use data_types::TransitionPartitionId;
use iox_catalog::{
partition_lookup,
test_helpers::{arbitrary_namespace, arbitrary_table},
};
use super::*;
use crate::buffer_tree::table::TableName;
@ -161,11 +165,9 @@ mod tests {
assert_matches!(got.lock().sort_key(), SortKeyState::Provided(None));
assert!(got.lock().partition_key.ptr_eq(&callers_partition_key));
let got = catalog
.repositories()
.await
.partitions()
.get_by_id(got.lock().partition_id)
let mut repos = catalog.repositories().await;
let id = TransitionPartitionId::Deprecated(got.lock().partition_id);
let got = partition_lookup(repos.as_mut(), &id)
.await
.unwrap()
.expect("partition not created");

View File

@ -3,8 +3,8 @@
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types::PartitionId;
use iox_catalog::interface::Catalog;
use data_types::{PartitionId, TransitionPartitionId};
use iox_catalog::{interface::Catalog, partition_lookup};
use schema::sort::SortKey;
/// A resolver of [`SortKey`] from the catalog for a given [`PartitionId`].
@ -33,12 +33,9 @@ impl SortKeyResolver {
pub(crate) async fn fetch(self) -> Option<SortKey> {
Backoff::new(&self.backoff_config)
.retry_all_errors("fetch partition sort key", || async {
let s = self
.catalog
.repositories()
.await
.partitions()
.get_by_id(self.partition_id)
let mut repos = self.catalog.repositories().await;
let id = TransitionPartitionId::Deprecated(self.partition_id);
let s = partition_lookup(repos.as_mut(), &id)
.await?
.unwrap_or_else(|| {
panic!(
@ -76,24 +73,27 @@ mod tests {
// Populate the catalog with the namespace / table
let (_ns_id, table_id) = populate_catalog(&*catalog, NAMESPACE_NAME, TABLE_NAME).await;
let partition_id = catalog
let partition = catalog
.repositories()
.await
.partitions()
.create_or_get(PARTITION_KEY.into(), table_id)
.await
.expect("should create")
.id;
.expect("should create");
let fetcher =
SortKeyResolver::new(partition_id, Arc::clone(&catalog), backoff_config.clone());
SortKeyResolver::new(partition.id, Arc::clone(&catalog), backoff_config.clone());
// Set the sort key
let catalog_state = catalog
.repositories()
.await
.partitions()
.cas_sort_key(partition_id, None, &["uno", "dos", "bananas"])
.cas_sort_key(
&partition.transition_partition_id(),
None,
&["uno", "dos", "bananas"],
)
.await
.expect("should update existing partition key");

View File

@ -16,7 +16,7 @@ mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{CompactionLevel, ParquetFile};
use data_types::{CompactionLevel, ParquetFile, TransitionPartitionId};
use futures::TryStreamExt;
use iox_catalog::{
interface::{get_schema_by_id, Catalog, SoftDeletedRows},
@ -243,7 +243,7 @@ mod tests {
.repositories()
.await
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
.await
.expect("query for parquet files failed");
@ -344,7 +344,7 @@ mod tests {
.await
.partitions()
.cas_sort_key(
partition_id,
&transition_partition_id,
None,
&["bananas", "are", "good", "for", "you"],
)
@ -392,7 +392,7 @@ mod tests {
.repositories()
.await
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id))
.await
.expect("query for parquet files failed");

View File

@ -376,7 +376,11 @@ where
let mut repos = catalog.repositories().await;
match repos
.partitions()
.cas_sort_key(ctx.partition_id(), old_sort_key.clone(), &new_sort_key_str)
.cas_sort_key(
&ctx.transition_partition_id(),
old_sort_key.clone(),
&new_sort_key_str,
)
.await
{
Ok(_) => ControlFlow::Break(Ok(())),

View File

@ -0,0 +1,11 @@
-- By default, we often only have 5min to finish our statements. The `CREATE INDEX CONCURRENTLY`,
-- however, can take longer.
-- IOX_NO_TRANSACTION
SET statement_timeout TO '60min';
-- IOX_STEP_BOUNDARY
-- IOX_NO_TRANSACTION
CREATE INDEX CONCURRENTLY IF NOT EXISTS parquet_file_partition_hash_id_idx
ON parquet_file (partition_hash_id)
WHERE partition_hash_id IS NOT NULL;

View File

@ -0,0 +1,3 @@
CREATE INDEX IF NOT EXISTS parquet_file_partition_hash_id_idx
ON parquet_file (partition_hash_id)
WHERE partition_hash_id IS NOT NULL;

View File

@ -6,7 +6,7 @@ use data_types::{
Column, ColumnType, ColumnsByName, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceSchema, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction,
Table, TableId, TableSchema, Timestamp,
Table, TableId, TableSchema, Timestamp, TransitionPartitionId,
};
use iox_time::TimeProvider;
use snafu::{OptionExt, Snafu};
@ -80,7 +80,7 @@ pub enum Error {
TableNotFound { id: TableId },
#[snafu(display("partition {} not found", id))]
PartitionNotFound { id: PartitionId },
PartitionNotFound { id: TransitionPartitionId },
#[snafu(display(
"couldn't create column {} in table {}; limit reached on namespace",
@ -397,7 +397,7 @@ pub trait PartitionRepo: Send + Sync {
/// concurrent writers.
async fn cas_sort_key(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
) -> Result<Partition, CasFailure<Vec<String>>>;
@ -483,7 +483,7 @@ pub trait ParquetFileRepo: Send + Sync {
/// [`to_delete`](ParquetFile::to_delete).
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>>;
/// Return the parquet file with the given object store id
@ -1549,7 +1549,11 @@ pub(crate) mod test_helpers {
// test update_sort_key from None to Some
repos
.partitions()
.cas_sort_key(other_partition.id, None, &["tag2", "tag1", "time"])
.cas_sort_key(
&other_partition.transition_partition_id(),
None,
&["tag2", "tag1", "time"],
)
.await
.unwrap();
@ -1557,7 +1561,7 @@ pub(crate) mod test_helpers {
let err = repos
.partitions()
.cas_sort_key(
other_partition.id,
&other_partition.transition_partition_id(),
Some(["bananas".to_string()].to_vec()),
&["tag2", "tag1", "tag3 , with comma", "time"],
)
@ -1593,7 +1597,7 @@ pub(crate) mod test_helpers {
let err = repos
.partitions()
.cas_sort_key(
other_partition.id,
&other_partition.transition_partition_id(),
None,
&["tag2", "tag1", "tag3 , with comma", "time"],
)
@ -1607,7 +1611,7 @@ pub(crate) mod test_helpers {
let err = repos
.partitions()
.cas_sort_key(
other_partition.id,
&other_partition.transition_partition_id(),
Some(["bananas".to_string()].to_vec()),
&["tag2", "tag1", "tag3 , with comma", "time"],
)
@ -1621,7 +1625,7 @@ pub(crate) mod test_helpers {
repos
.partitions()
.cas_sort_key(
other_partition.id,
&other_partition.transition_partition_id(),
Some(
["tag2", "tag1", "time"]
.into_iter()
@ -2676,6 +2680,7 @@ pub(crate) mod test_helpers {
let other_partition_params = ParquetFileParams {
partition_id: partition2.id,
partition_hash_id: partition2.hash_id().cloned(),
object_store_id: Uuid::new_v4(),
..parquet_file_params.clone()
};
@ -2687,14 +2692,16 @@ pub(crate) mod test_helpers {
let files = repos
.parquet_files()
.list_by_partition_not_to_delete(partition.id)
.list_by_partition_not_to_delete(&partition.transition_partition_id())
.await
.unwrap();
// not asserting against a vector literal to guard against flakiness due to uncertain
// ordering of SQL query in postgres impl
assert_eq!(files.len(), 2);
assert_matches!(files.iter().find(|f| f.id == parquet_file.id), Some(_));
assert_matches!(files.iter().find(|f| f.id == level1_file.id), Some(_));
let mut file_ids: Vec<_> = files.into_iter().map(|f| f.id).collect();
file_ids.sort();
let mut expected_ids = vec![parquet_file.id, level1_file.id];
expected_ids.sort();
assert_eq!(file_ids, expected_ids);
// remove namespace to avoid it from affecting later tests
repos

View File

@ -22,7 +22,7 @@ use workspace_hack as _;
use crate::interface::{ColumnTypeMismatchSnafu, Error, RepoCollection, Result};
use data_types::{
partition_template::{NamespacePartitionTemplateOverride, TablePartitionTemplateOverride},
ColumnType, NamespaceId, NamespaceSchema, TableSchema,
ColumnType, NamespaceId, NamespaceSchema, Partition, TableSchema, TransitionPartitionId,
};
use mutable_batch::MutableBatch;
use std::{borrow::Cow, collections::HashMap};
@ -67,6 +67,27 @@ impl TableScopedError {
}
}
/// Look up a partition in the catalog by either database-assigned ID or deterministic hash ID.
///
/// The existence of this function should be temporary; it can be removed once all partition lookup
/// is happening with only the deterministic hash ID.
pub async fn partition_lookup<R>(
repos: &mut R,
id: &TransitionPartitionId,
) -> Result<Option<Partition>, Error>
where
R: RepoCollection + ?Sized,
{
match id {
TransitionPartitionId::Deprecated(partition_id) => {
repos.partitions().get_by_id(*partition_id).await
}
TransitionPartitionId::Deterministic(partition_hash_id) => {
repos.partitions().get_by_hash_id(partition_hash_id).await
}
}
}
/// Given an iterator of `(table_name, batch)` to validate, this function
/// ensures all the columns within `batch` match the existing schema for
/// `table_name` in `schema`. If the column does not already exist in `schema`,

View File

@ -19,7 +19,7 @@ use data_types::{
Column, ColumnId, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp,
Timestamp, TransitionPartitionId,
};
use iox_time::{SystemProvider, TimeProvider};
use snafu::ensure;
@ -625,20 +625,26 @@ impl PartitionRepo for MemTxn {
async fn cas_sort_key(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
) -> Result<Partition, CasFailure<Vec<String>>> {
let stage = self.stage();
let old_sort_key = old_sort_key.unwrap_or_default();
match stage.partitions.iter_mut().find(|p| p.id == partition_id) {
match stage.partitions.iter_mut().find(|p| match partition_id {
TransitionPartitionId::Deterministic(hash_id) => {
p.hash_id().map_or(false, |h| h == hash_id)
}
TransitionPartitionId::Deprecated(id) => p.id == *id,
}) {
Some(p) if p.sort_key == old_sort_key => {
p.sort_key = new_sort_key.iter().map(|s| s.to_string()).collect();
Ok(p.clone())
}
Some(p) => return Err(CasFailure::ValueMismatch(p.sort_key.clone())),
None => Err(CasFailure::QueryError(Error::PartitionNotFound {
id: partition_id,
id: partition_id.clone(),
})),
}
}
@ -844,14 +850,20 @@ impl ParquetFileRepo for MemTxn {
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>> {
let stage = self.stage();
Ok(stage
.parquet_files
.iter()
.filter(|f| f.partition_id == partition_id && f.to_delete.is_none())
.filter(|f| match partition_id {
TransitionPartitionId::Deterministic(hash_id) => {
f.partition_hash_id.as_ref().map_or(false, |h| h == hash_id)
}
TransitionPartitionId::Deprecated(id) => f.partition_id == *id,
})
.filter(|f| f.to_delete.is_none())
.cloned()
.collect())
}
@ -962,7 +974,9 @@ async fn create_parquet_file(
.partitions
.iter_mut()
.find(|p| p.id == partition_id)
.ok_or(Error::PartitionNotFound { id: partition_id })?;
.ok_or(Error::PartitionNotFound {
id: TransitionPartitionId::Deprecated(partition_id),
})?;
partition.new_file_at = Some(created_at);
}

View File

@ -10,7 +10,7 @@ use data_types::{
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp,
Timestamp, TransitionPartitionId,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -174,7 +174,7 @@ decorate!(
"partition_get_by_hash_id" = get_by_hash_id(&mut self, partition_hash_id: &PartitionHashId) -> Result<Option<Partition>>;
"partition_list_by_table_id" = list_by_table_id(&mut self, table_id: TableId) -> Result<Vec<Partition>>;
"partition_list_ids" = list_ids(&mut self) -> Result<Vec<PartitionId>>;
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: PartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str]) -> Result<Partition, CasFailure<Vec<String>>>;
"partition_update_sort_key" = cas_sort_key(&mut self, partition_id: &TransitionPartitionId, old_sort_key: Option<Vec<String>>, new_sort_key: &[&str]) -> Result<Partition, CasFailure<Vec<String>>>;
"partition_record_skipped_compaction" = record_skipped_compaction(&mut self, partition_id: PartitionId, reason: &str, num_files: usize, limit_num_files: usize, limit_num_files_first_in_partition: usize, estimated_bytes: u64, limit_bytes: u64) -> Result<()>;
"partition_list_skipped_compactions" = list_skipped_compactions(&mut self) -> Result<Vec<SkippedCompaction>>;
"partition_delete_skipped_compactions" = delete_skipped_compactions(&mut self, partition_id: PartitionId) -> Result<Option<SkippedCompaction>>;
@ -193,7 +193,7 @@ decorate!(
"parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result<Vec<ParquetFile>>;
"parquet_delete_old_ids_only" = delete_old_ids_only(&mut self, older_than: Timestamp) -> Result<Vec<ParquetFileId>>;
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result<Vec<ParquetFile>>;
"parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: &TransitionPartitionId) -> Result<Vec<ParquetFile>>;
"parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result<Option<ParquetFile>>;
"parquet_exists_by_object_store_id_batch" = exists_by_object_store_id_batch(&mut self, object_store_ids: Vec<Uuid>) -> Result<Vec<Uuid>>;
"parquet_create_upgrade_delete" = create_upgrade_delete(&mut self, delete: &[ParquetFileId], upgrade: &[ParquetFileId], create: &[ParquetFileParams], target_level: CompactionLevel) -> Result<Vec<ParquetFileId>>;

View File

@ -23,7 +23,7 @@ use data_types::{
Column, ColumnType, CompactionLevel, Namespace, NamespaceId, NamespaceName,
NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId, ParquetFileParams,
Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId,
Timestamp,
Timestamp, TransitionPartitionId,
};
use iox_time::{SystemProvider, TimeProvider};
use observability_deps::tracing::{debug, info, warn};
@ -1153,24 +1153,38 @@ WHERE table_id = $1;
/// round trips to service a transaction in the happy path).
async fn cas_sort_key(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
) -> Result<Partition, CasFailure<Vec<String>>> {
let old_sort_key = old_sort_key.unwrap_or_default();
let res = sqlx::query_as::<_, Partition>(
r#"
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, Partition>(
r#"
UPDATE partition
SET sort_key = $1
WHERE hash_id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
"#,
)
.bind(new_sort_key) // $1
.bind(hash_id) // $2
.bind(&old_sort_key), // $3
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, Partition>(
r#"
UPDATE partition
SET sort_key = $1
WHERE id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
"#,
)
.bind(new_sort_key) // $1
.bind(partition_id) // $2
.bind(&old_sort_key) // $3
.fetch_one(&mut self.inner)
.await;
)
.bind(new_sort_key) // $1
.bind(id) // $2
.bind(&old_sort_key), // $3
};
let res = query.fetch_one(&mut self.inner).await;
let partition = match res {
Ok(v) => v,
@ -1187,11 +1201,11 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
// NOTE: this is racy, but documented - this might return "Sort
// key differs! Old key: <old sort key you provided>"
return Err(CasFailure::ValueMismatch(
PartitionRepo::get_by_id(self, partition_id)
crate::partition_lookup(self, partition_id)
.await
.map_err(CasFailure::QueryError)?
.ok_or(CasFailure::QueryError(Error::PartitionNotFound {
id: partition_id,
id: partition_id.clone(),
}))?
.sort_key,
));
@ -1458,10 +1472,23 @@ RETURNING id;
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>> {
sqlx::query_as::<_, ParquetFile>(
r#"
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
FROM parquet_file
WHERE parquet_file.partition_hash_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(hash_id), // $1
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFile>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
@ -1469,11 +1496,14 @@ FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(partition_id) // $1
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
)
.bind(id), // $1
};
query
.fetch_all(&mut self.inner)
.await
.map_err(|e| Error::SqlxError { source: e })
}
async fn get_by_object_store_id(

View File

@ -21,7 +21,7 @@ use data_types::{
Column, ColumnId, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceId,
NamespaceName, NamespaceServiceProtectionLimitsOverride, ParquetFile, ParquetFileId,
ParquetFileParams, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction,
Table, TableId, Timestamp,
Table, TableId, Timestamp, TransitionPartitionId,
};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
@ -952,24 +952,39 @@ WHERE table_id = $1;
/// round trips to service a transaction in the happy path).
async fn cas_sort_key(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
old_sort_key: Option<Vec<String>>,
new_sort_key: &[&str],
) -> Result<Partition, CasFailure<Vec<String>>> {
let old_sort_key = old_sort_key.unwrap_or_default();
let res = sqlx::query_as::<_, PartitionPod>(
r#"
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, PartitionPod>(
r#"
UPDATE partition
SET sort_key = $1
WHERE hash_id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
"#,
)
.bind(Json(new_sort_key)) // $1
.bind(hash_id) // $2
.bind(Json(&old_sort_key)), // $3
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, PartitionPod>(
r#"
UPDATE partition
SET sort_key = $1
WHERE id = $2 AND sort_key = $3
RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
"#,
)
.bind(Json(new_sort_key)) // $1
.bind(partition_id) // $2
.bind(Json(&old_sort_key)) // $3
.fetch_one(self.inner.get_mut())
.await;
)
.bind(Json(new_sort_key)) // $1
.bind(id) // $2
.bind(Json(&old_sort_key)), // $3
};
let res = query.fetch_one(self.inner.get_mut()).await;
let partition = match res {
Ok(v) => v,
@ -986,11 +1001,11 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at;
// NOTE: this is racy, but documented - this might return "Sort
// key differs! Old key: <old sort key you provided>"
return Err(CasFailure::ValueMismatch(
PartitionRepo::get_by_id(self, partition_id)
crate::partition_lookup(self, partition_id)
.await
.map_err(CasFailure::QueryError)?
.ok_or(CasFailure::QueryError(Error::PartitionNotFound {
id: partition_id,
id: partition_id.clone(),
}))?
.sort_key,
));
@ -1323,10 +1338,23 @@ RETURNING id;
async fn list_by_partition_not_to_delete(
&mut self,
partition_id: PartitionId,
partition_id: &TransitionPartitionId,
) -> Result<Vec<ParquetFile>> {
Ok(sqlx::query_as::<_, ParquetFilePod>(
r#"
// This `match` will go away when all partitions have hash IDs in the database.
let query = match partition_id {
TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFilePod>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
FROM parquet_file
WHERE parquet_file.partition_hash_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(hash_id), // $1
TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFilePod>(
r#"
SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time,
max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set,
max_l0_created_at
@ -1334,14 +1362,17 @@ FROM parquet_file
WHERE parquet_file.partition_id = $1
AND parquet_file.to_delete IS NULL;
"#,
)
.bind(partition_id) // $1
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
)
.bind(id), // $1
};
Ok(query
.fetch_all(self.inner.get_mut())
.await
.map_err(|e| Error::SqlxError { source: e })?
.into_iter()
.map(Into::into)
.collect())
}
async fn get_by_object_store_id(

View File

@ -328,7 +328,11 @@ impl TestTable {
let partition = repos
.partitions()
.cas_sort_key(partition.id, None, sort_key)
.cas_sort_key(
&TransitionPartitionId::Deprecated(partition.id),
None,
sort_key,
)
.await
.unwrap();
@ -452,7 +456,7 @@ impl TestPartition {
.await
.partitions()
.cas_sort_key(
self.partition.id,
&TransitionPartitionId::Deprecated(self.partition.id),
Some(old_sort_key),
&sort_key.to_columns().collect::<Vec<_>>(),
)
@ -786,7 +790,7 @@ async fn update_catalog_sort_key_if_needed(
);
partitions_catalog
.cas_sort_key(
partition_id,
&TransitionPartitionId::Deprecated(partition_id),
Some(
catalog_sort_key
.to_columns()
@ -803,7 +807,11 @@ async fn update_catalog_sort_key_if_needed(
let new_columns = sort_key.to_columns().collect::<Vec<_>>();
debug!("Updating sort key from None to {:?}", &new_columns);
partitions_catalog
.cas_sort_key(partition_id, None, &new_columns)
.cas_sort_key(
&TransitionPartitionId::Deprecated(partition_id),
None,
&new_columns,
)
.await
.unwrap();
}

View File

@ -13,10 +13,10 @@ use cache_system::{
};
use data_types::{
partition_template::{build_column_values, ColumnValue},
ColumnId, Partition, PartitionId,
ColumnId, Partition, PartitionId, TransitionPartitionId,
};
use datafusion::scalar::ScalarValue;
use iox_catalog::interface::Catalog;
use iox_catalog::{interface::Catalog, partition_lookup};
use iox_query::chunk_statistics::{ColumnRange, ColumnRanges};
use iox_time::TimeProvider;
use observability_deps::tracing::debug;
@ -66,12 +66,9 @@ impl PartitionCache {
async move {
let partition = Backoff::new(&backoff_config)
.retry_all_errors("get partition_key", || async {
catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
let mut repos = catalog.repositories().await;
let id = TransitionPartitionId::Deprecated(partition_id);
partition_lookup(repos.as_mut(), &id).await
})
.await
.expect("retry forever")?;

View File

@ -18,7 +18,7 @@
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
use data_types::{PartitionId, TableId};
use data_types::{PartitionId, TableId, TransitionPartitionId};
use generated_types::influxdata::iox::catalog::v1::*;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
@ -47,11 +47,11 @@ impl catalog_service_server::CatalogService for CatalogService {
) -> Result<Response<GetParquetFilesByPartitionIdResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let partition_id = PartitionId::new(req.partition_id);
let partition_id = TransitionPartitionId::Deprecated(PartitionId::new(req.partition_id));
let parquet_files = repos
.parquet_files()
.list_by_partition_not_to_delete(partition_id)
.list_by_partition_not_to_delete(&partition_id)
.await
.map_err(|e| {
warn!(error=%e, %req.partition_id, "failed to get parquet_files for partition");