diff --git a/compactor/src/components/partition_files_source/catalog.rs b/compactor/src/components/partition_files_source/catalog.rs index 4280643f0c..33918c1213 100644 --- a/compactor/src/components/partition_files_source/catalog.rs +++ b/compactor/src/components/partition_files_source/catalog.rs @@ -5,7 +5,7 @@ use std::{ use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{ParquetFile, PartitionId}; +use data_types::{ParquetFile, PartitionId, TransitionPartitionId}; use iox_catalog::interface::Catalog; use observability_deps::tracing::warn; @@ -61,7 +61,7 @@ impl CatalogQuerier for Arc { self.repositories() .await .parquet_files() - .list_by_partition_not_to_delete(partition_id) + .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id)) .await } } diff --git a/garbage_collector/src/objectstore/checker.rs b/garbage_collector/src/objectstore/checker.rs index 7d8fe1ceaf..a8e7a4bb24 100644 --- a/garbage_collector/src/objectstore/checker.rs +++ b/garbage_collector/src/objectstore/checker.rs @@ -542,7 +542,7 @@ mod tests { async fn list_by_partition_not_to_delete( &mut self, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, ) -> iox_catalog::interface::Result> { self.inner .list_by_partition_not_to_delete(partition_id) diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index 291ed0c60b..ca0f829fc9 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -16,7 +16,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use data_types::{CompactionLevel, ParquetFile}; + use data_types::{CompactionLevel, ParquetFile, TransitionPartitionId}; use futures::TryStreamExt; use iox_catalog::{ interface::{get_schema_by_id, Catalog, SoftDeletedRows}, @@ -243,7 +243,7 @@ mod tests { .repositories() .await .parquet_files() - .list_by_partition_not_to_delete(partition_id) + .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id)) .await .expect("query for parquet files failed"); @@ -392,7 +392,7 @@ mod tests { .repositories() .await .parquet_files() - .list_by_partition_not_to_delete(partition_id) + .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id)) .await .expect("query for parquet files failed"); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 0320c274a3..2d819f7f71 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -483,7 +483,7 @@ pub trait ParquetFileRepo: Send + Sync { /// [`to_delete`](ParquetFile::to_delete). async fn list_by_partition_not_to_delete( &mut self, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, ) -> Result>; /// Return the parquet file with the given object store id @@ -2680,6 +2680,7 @@ pub(crate) mod test_helpers { let other_partition_params = ParquetFileParams { partition_id: partition2.id, + partition_hash_id: partition2.hash_id().cloned(), object_store_id: Uuid::new_v4(), ..parquet_file_params.clone() }; @@ -2691,14 +2692,16 @@ pub(crate) mod test_helpers { let files = repos .parquet_files() - .list_by_partition_not_to_delete(partition.id) + .list_by_partition_not_to_delete(&partition.transition_partition_id()) .await .unwrap(); - // not asserting against a vector literal to guard against flakiness due to uncertain - // ordering of SQL query in postgres impl assert_eq!(files.len(), 2); - assert_matches!(files.iter().find(|f| f.id == parquet_file.id), Some(_)); - assert_matches!(files.iter().find(|f| f.id == level1_file.id), Some(_)); + + let mut file_ids: Vec<_> = files.into_iter().map(|f| f.id).collect(); + file_ids.sort(); + let mut expected_ids = vec![parquet_file.id, level1_file.id]; + expected_ids.sort(); + assert_eq!(file_ids, expected_ids); // remove namespace to avoid it from affecting later tests repos diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 35e9b242bc..ab5937f31a 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -850,14 +850,20 @@ impl ParquetFileRepo for MemTxn { async fn list_by_partition_not_to_delete( &mut self, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, ) -> Result> { let stage = self.stage(); Ok(stage .parquet_files .iter() - .filter(|f| f.partition_id == partition_id && f.to_delete.is_none()) + .filter(|f| match partition_id { + TransitionPartitionId::Deterministic(hash_id) => { + f.partition_hash_id.as_ref().map_or(false, |h| h == hash_id) + } + TransitionPartitionId::Deprecated(id) => f.partition_id == *id, + }) + .filter(|f| f.to_delete.is_none()) .cloned() .collect()) } diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index bc8a3850ba..039db71e59 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -193,7 +193,7 @@ decorate!( "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; "parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; "parquet_delete_old_ids_only" = delete_old_ids_only(&mut self, older_than: Timestamp) -> Result>; - "parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result>; + "parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: &TransitionPartitionId) -> Result>; "parquet_get_by_object_store_id" = get_by_object_store_id(&mut self, object_store_id: Uuid) -> Result>; "parquet_exists_by_object_store_id_batch" = exists_by_object_store_id_batch(&mut self, object_store_ids: Vec) -> Result>; "parquet_create_upgrade_delete" = create_upgrade_delete(&mut self, delete: &[ParquetFileId], upgrade: &[ParquetFileId], create: &[ParquetFileParams], target_level: CompactionLevel) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 3409963b10..846aaf7975 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1472,10 +1472,23 @@ RETURNING id; async fn list_by_partition_not_to_delete( &mut self, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, ) -> Result> { - sqlx::query_as::<_, ParquetFile>( - r#" + // This `match` will go away when all partitions have hash IDs in the database. + let query = match partition_id { + TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFile>( + r#" +SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, + max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, + max_l0_created_at +FROM parquet_file +WHERE parquet_file.partition_hash_id = $1 + AND parquet_file.to_delete IS NULL; + "#, + ) + .bind(hash_id), // $1 + TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFile>( + r#" SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, max_l0_created_at @@ -1483,11 +1496,14 @@ FROM parquet_file WHERE parquet_file.partition_id = $1 AND parquet_file.to_delete IS NULL; "#, - ) - .bind(partition_id) // $1 - .fetch_all(&mut self.inner) - .await - .map_err(|e| Error::SqlxError { source: e }) + ) + .bind(id), // $1 + }; + + query + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e }) } async fn get_by_object_store_id( diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index e83ad6bd6d..c35a9ef4b2 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -1338,10 +1338,23 @@ RETURNING id; async fn list_by_partition_not_to_delete( &mut self, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, ) -> Result> { - Ok(sqlx::query_as::<_, ParquetFilePod>( - r#" + // This `match` will go away when all partitions have hash IDs in the database. + let query = match partition_id { + TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFilePod>( + r#" +SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, + max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, + max_l0_created_at +FROM parquet_file +WHERE parquet_file.partition_hash_id = $1 + AND parquet_file.to_delete IS NULL; + "#, + ) + .bind(hash_id), // $1 + TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFilePod>( + r#" SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, max_l0_created_at @@ -1349,14 +1362,17 @@ FROM parquet_file WHERE parquet_file.partition_id = $1 AND parquet_file.to_delete IS NULL; "#, - ) - .bind(partition_id) // $1 - .fetch_all(self.inner.get_mut()) - .await - .map_err(|e| Error::SqlxError { source: e })? - .into_iter() - .map(Into::into) - .collect()) + ) + .bind(id), // $1 + }; + + Ok(query + .fetch_all(self.inner.get_mut()) + .await + .map_err(|e| Error::SqlxError { source: e })? + .into_iter() + .map(Into::into) + .collect()) } async fn get_by_object_store_id( diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index 60c8cd9d06..356539060d 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -18,7 +18,7 @@ // Workaround for "unused crate" lint false positives. use workspace_hack as _; -use data_types::{PartitionId, TableId}; +use data_types::{PartitionId, TableId, TransitionPartitionId}; use generated_types::influxdata::iox::catalog::v1::*; use iox_catalog::interface::{Catalog, SoftDeletedRows}; use observability_deps::tracing::*; @@ -47,11 +47,11 @@ impl catalog_service_server::CatalogService for CatalogService { ) -> Result, Status> { let mut repos = self.catalog.repositories().await; let req = request.into_inner(); - let partition_id = PartitionId::new(req.partition_id); + let partition_id = TransitionPartitionId::Deprecated(PartitionId::new(req.partition_id)); let parquet_files = repos .parquet_files() - .list_by_partition_not_to_delete(partition_id) + .list_by_partition_not_to_delete(&partition_id) .await .map_err(|e| { warn!(error=%e, %req.partition_id, "failed to get parquet_files for partition");