diff --git a/compactor/src/components/df_planner/query_chunk.rs b/compactor/src/components/df_planner/query_chunk.rs index af69fe471f..7c930a7e12 100644 --- a/compactor/src/components/df_planner/query_chunk.rs +++ b/compactor/src/components/df_planner/query_chunk.rs @@ -171,7 +171,7 @@ fn to_queryable_parquet_chunk( parquet_file_id = file.file.id.get(), parquet_file_namespace_id = file.file.namespace_id.get(), parquet_file_table_id = file.file.table_id.get(), - parquet_file_partition_id = file.file.partition_id.get(), + parquet_file_partition_id = %file.file.partition_id, parquet_file_object_store_id = uuid.to_string().as_str(), "built parquet chunk from metadata" ); diff --git a/compactor/src/components/parquet_file_sink/mock.rs b/compactor/src/components/parquet_file_sink/mock.rs index 977f8c810e..b67e9b9615 100644 --- a/compactor/src/components/parquet_file_sink/mock.rs +++ b/compactor/src/components/parquet_file_sink/mock.rs @@ -70,8 +70,7 @@ impl ParquetFileSink for MockParquetFileSink { let out = ((row_count > 0) || !self.filter_empty_files).then(|| ParquetFileParams { namespace_id: partition.namespace_id, table_id: partition.table.id, - partition_id: partition.partition_id, - partition_hash_id: partition.partition_hash_id.clone(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::from_u128(guard.len() as u128), min_time: Timestamp::new(0), max_time: Timestamp::new(0), @@ -95,7 +94,7 @@ impl ParquetFileSink for MockParquetFileSink { #[cfg(test)] mod tests { use arrow_util::assert_batches_eq; - use data_types::{NamespaceId, PartitionId, TableId}; + use data_types::{NamespaceId, TableId}; use datafusion::{ arrow::{array::new_null_array, datatypes::DataType}, physical_plan::stream::RecordBatchStreamAdapter, @@ -159,7 +158,7 @@ mod tests { Arc::clone(&schema), futures::stream::once(async move { Ok(record_batch_captured) }), )); - let partition_hash_id = partition.partition_hash_id.clone(); + let partition_id = partition.transition_partition_id(); assert_eq!( sink.store(stream, Arc::clone(&partition), level, max_l0_created_at) .await @@ -167,8 +166,7 @@ mod tests { Some(ParquetFileParams { namespace_id: NamespaceId::new(2), table_id: TableId::new(3), - partition_id: PartitionId::new(1), - partition_hash_id, + partition_id, object_store_id: Uuid::from_u128(2), min_time: Timestamp::new(0), max_time: Timestamp::new(0), @@ -223,7 +221,7 @@ mod tests { Arc::clone(&schema), futures::stream::empty(), )); - let partition_hash_id = partition.partition_hash_id.clone(); + let partition_id = partition.transition_partition_id(); assert_eq!( sink.store(stream, Arc::clone(&partition), level, max_l0_created_at) .await @@ -231,8 +229,7 @@ mod tests { Some(ParquetFileParams { namespace_id: NamespaceId::new(2), table_id: TableId::new(3), - partition_id: PartitionId::new(1), - partition_hash_id, + partition_id, object_store_id: Uuid::from_u128(0), min_time: Timestamp::new(0), max_time: Timestamp::new(0), diff --git a/compactor/src/components/partition_files_source/mock.rs b/compactor/src/components/partition_files_source/mock.rs index ee82915bcf..7393d21f36 100644 --- a/compactor/src/components/partition_files_source/mock.rs +++ b/compactor/src/components/partition_files_source/mock.rs @@ -1,19 +1,35 @@ use std::{collections::HashMap, fmt::Display}; -use async_trait::async_trait; -use data_types::{ParquetFile, PartitionId}; - use super::PartitionFilesSource; +use async_trait::async_trait; +use data_types::{ParquetFile, PartitionId, TransitionPartitionId}; #[derive(Debug)] pub struct MockPartitionFilesSource { - files: HashMap>, + // This complexity is because we're in the process of moving to partition hash IDs rather than + // partition catalog IDs, and Parquet files might only have the partition hash ID on their + // record, but the compactor deals with partition catalog IDs because we haven't transitioned + // it yet. This should become simpler when the transition is complete. + partition_lookup: HashMap, + file_lookup: HashMap>, } impl MockPartitionFilesSource { - #[allow(dead_code)] // not used anywhere - pub fn new(files: HashMap>) -> Self { - Self { files } + #[cfg(test)] + pub fn new( + partition_lookup: HashMap, + parquet_files: Vec, + ) -> Self { + let mut file_lookup: HashMap> = HashMap::new(); + for file in parquet_files { + let files = file_lookup.entry(file.partition_id.clone()).or_default(); + files.push(file); + } + + Self { + partition_lookup, + file_lookup, + } } } @@ -25,46 +41,60 @@ impl Display for MockPartitionFilesSource { #[async_trait] impl PartitionFilesSource for MockPartitionFilesSource { - async fn fetch(&self, partition: PartitionId) -> Vec { - self.files.get(&partition).cloned().unwrap_or_default() + async fn fetch(&self, partition_id: PartitionId) -> Vec { + self.partition_lookup + .get(&partition_id) + .and_then(|partition_hash_id| self.file_lookup.get(partition_hash_id).cloned()) + .unwrap_or_default() } } #[cfg(test)] mod tests { - use iox_tests::ParquetFileBuilder; - use super::*; + use iox_tests::{partition_identifier, ParquetFileBuilder}; #[test] fn test_display() { assert_eq!( - MockPartitionFilesSource::new(HashMap::default()).to_string(), + MockPartitionFilesSource::new(Default::default(), Default::default()).to_string(), "mock", ) } #[tokio::test] async fn test_fetch() { - let f_1_1 = ParquetFileBuilder::new(1).with_partition(1).build(); - let f_1_2 = ParquetFileBuilder::new(2).with_partition(1).build(); - let f_2_1 = ParquetFileBuilder::new(3).with_partition(2).build(); + let partition_id_1 = PartitionId::new(1); + let partition_id_2 = PartitionId::new(2); + let partition_identifier_1 = partition_identifier(1); + let partition_identifier_2 = partition_identifier(2); + let f_1_1 = ParquetFileBuilder::new(1) + .with_partition(partition_identifier_1.clone()) + .build(); + let f_1_2 = ParquetFileBuilder::new(2) + .with_partition(partition_identifier_1.clone()) + .build(); + let f_2_1 = ParquetFileBuilder::new(3) + .with_partition(partition_identifier_2.clone()) + .build(); - let files = HashMap::from([ - (PartitionId::new(1), vec![f_1_1.clone(), f_1_2.clone()]), - (PartitionId::new(2), vec![f_2_1.clone()]), + let partition_lookup = HashMap::from([ + (partition_id_1, partition_identifier_1.clone()), + (partition_id_2, partition_identifier_2.clone()), ]); - let source = MockPartitionFilesSource::new(files); + + let files = vec![f_1_1.clone(), f_1_2.clone(), f_2_1.clone()]; + let source = MockPartitionFilesSource::new(partition_lookup, files); // different partitions assert_eq!( - source.fetch(PartitionId::new(1)).await, + source.fetch(partition_id_1).await, vec![f_1_1.clone(), f_1_2.clone()], ); - assert_eq!(source.fetch(PartitionId::new(2)).await, vec![f_2_1],); + assert_eq!(source.fetch(partition_id_2).await, vec![f_2_1],); // fetching does not drain - assert_eq!(source.fetch(PartitionId::new(1)).await, vec![f_1_1, f_1_2],); + assert_eq!(source.fetch(partition_id_1).await, vec![f_1_1, f_1_2],); // unknown partition => empty result assert_eq!(source.fetch(PartitionId::new(3)).await, vec![],); diff --git a/compactor_scheduler/src/commit/logging.rs b/compactor_scheduler/src/commit/logging.rs index 439e928d45..00d8e732f3 100644 --- a/compactor_scheduler/src/commit/logging.rs +++ b/compactor_scheduler/src/commit/logging.rs @@ -78,14 +78,12 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - - use assert_matches::assert_matches; - use test_helpers::tracing::TracingCapture; - use super::*; use crate::commit::mock::{CommitHistoryEntry, MockCommit}; - use iox_tests::ParquetFileBuilder; + use assert_matches::assert_matches; + use iox_tests::{partition_identifier, ParquetFileBuilder}; + use std::sync::Arc; + use test_helpers::tracing::TracingCapture; #[test] fn test_display() { @@ -111,14 +109,21 @@ mod tests { .with_row_count(105) .build(); - let created_1 = ParquetFileBuilder::new(1000).with_partition(1).build(); - let created_2 = ParquetFileBuilder::new(1001).with_partition(1).build(); + let partition_id_1 = PartitionId::new(1); + let transition_partition_id_1 = partition_identifier(1); + + let created_1 = ParquetFileBuilder::new(1000) + .with_partition(transition_partition_id_1.clone()) + .build(); + let created_2 = ParquetFileBuilder::new(1001) + .with_partition(transition_partition_id_1) + .build(); let capture = TracingCapture::new(); let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_1.clone()], &[], &[created_1.clone().into(), created_2.clone().into()], @@ -130,9 +135,11 @@ mod tests { Ok(res) if res == vec![ParquetFileId::new(1000), ParquetFileId::new(1001)] ); + let partition_id_2 = PartitionId::new(2); + let ids = commit .commit( - PartitionId::new(2), + partition_id_2, &[existing_2.clone(), existing_3.clone()], &[existing_1.clone()], &[], @@ -151,14 +158,14 @@ level = INFO; message = committed parquet file change; target_level = Final; par inner.history(), vec![ CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_1.clone()], upgrade: vec![], created: vec![created_1, created_2], target_level: CompactionLevel::Final, }, CommitHistoryEntry { - partition_id: PartitionId::new(2), + partition_id: partition_id_2, delete: vec![existing_2, existing_3], upgrade: vec![existing_1], created: vec![], diff --git a/compactor_scheduler/src/commit/metrics.rs b/compactor_scheduler/src/commit/metrics.rs index 0f0b73d2cc..3890eb46c4 100644 --- a/compactor_scheduler/src/commit/metrics.rs +++ b/compactor_scheduler/src/commit/metrics.rs @@ -303,15 +303,12 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - - use assert_matches::assert_matches; - use metric::{assert_histogram, Attributes}; - - use crate::commit::mock::{CommitHistoryEntry, MockCommit}; - use iox_tests::ParquetFileBuilder; - use super::*; + use crate::commit::mock::{CommitHistoryEntry, MockCommit}; + use assert_matches::assert_matches; + use iox_tests::{partition_identifier, ParquetFileBuilder}; + use metric::{assert_histogram, Attributes}; + use std::sync::Arc; #[test] fn test_display() { @@ -326,6 +323,9 @@ mod tests { let inner = Arc::new(MockCommit::new()); let commit = MetricsCommitWrapper::new(Arc::clone(&inner), ®istry); + let partition_id_1 = PartitionId::new(1); + let transition_partition_id_1 = partition_identifier(1); + let existing_1 = ParquetFileBuilder::new(1) .with_file_size_bytes(10_001) .with_row_count(1_001) @@ -350,7 +350,7 @@ mod tests { let created = ParquetFileBuilder::new(1000) .with_file_size_bytes(10_016) .with_row_count(1_016) - .with_partition(1) + .with_partition(transition_partition_id_1) .with_compaction_level(CompactionLevel::Initial) .build(); @@ -392,7 +392,7 @@ mod tests { let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_1.clone()], &[existing_2a.clone()], &[created.clone().into()], @@ -401,9 +401,11 @@ mod tests { .await; assert_matches!(ids, Ok(res) if res == vec![ParquetFileId::new(1000)]); + let partition_id_2 = PartitionId::new(2); + let ids = commit .commit( - PartitionId::new(2), + partition_id_2, &[existing_2b.clone(), existing_3.clone()], &[existing_4.clone()], &[], @@ -449,14 +451,14 @@ mod tests { inner.history(), vec![ CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_1], upgrade: vec![existing_2a.clone()], created: vec![created], target_level: CompactionLevel::FileNonOverlapped, }, CommitHistoryEntry { - partition_id: PartitionId::new(2), + partition_id: partition_id_2, delete: vec![existing_2b, existing_3], upgrade: vec![existing_4], created: vec![], diff --git a/compactor_scheduler/src/commit/mock.rs b/compactor_scheduler/src/commit/mock.rs index a2e6e43631..8ae20df3d3 100644 --- a/compactor_scheduler/src/commit/mock.rs +++ b/compactor_scheduler/src/commit/mock.rs @@ -78,10 +78,9 @@ impl Commit for MockCommit { #[cfg(test)] mod tests { - use assert_matches::assert_matches; - use iox_tests::ParquetFileBuilder; - use super::*; + use assert_matches::assert_matches; + use iox_tests::{partition_identifier, ParquetFileBuilder}; #[test] fn test_display() { @@ -92,6 +91,11 @@ mod tests { async fn test_commit() { let commit = MockCommit::new(); + let partition_id_1 = PartitionId::new(1); + let transition_partition_id_1 = partition_identifier(1); + let partition_id_2 = PartitionId::new(2); + let transition_partition_id_2 = partition_identifier(2); + let existing_1 = ParquetFileBuilder::new(1).build(); let existing_2 = ParquetFileBuilder::new(2).build(); let existing_3 = ParquetFileBuilder::new(3).build(); @@ -101,14 +105,22 @@ mod tests { let existing_7 = ParquetFileBuilder::new(7).build(); let existing_8 = ParquetFileBuilder::new(8).build(); - let created_1_1 = ParquetFileBuilder::new(1000).with_partition(1).build(); - let created_1_2 = ParquetFileBuilder::new(1001).with_partition(1).build(); - let created_1_3 = ParquetFileBuilder::new(1003).with_partition(1).build(); - let created_2_1 = ParquetFileBuilder::new(1002).with_partition(2).build(); + let created_1_1 = ParquetFileBuilder::new(1000) + .with_partition(transition_partition_id_1.clone()) + .build(); + let created_1_2 = ParquetFileBuilder::new(1001) + .with_partition(transition_partition_id_1.clone()) + .build(); + let created_1_3 = ParquetFileBuilder::new(1003) + .with_partition(transition_partition_id_1) + .build(); + let created_2_1 = ParquetFileBuilder::new(1002) + .with_partition(transition_partition_id_2) + .build(); let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_1.clone(), existing_2.clone()], &[existing_3.clone(), existing_4.clone()], &[created_1_1.clone().into(), created_1_2.clone().into()], @@ -122,7 +134,7 @@ mod tests { let ids = commit .commit( - PartitionId::new(2), + partition_id_2, &[existing_3.clone()], &[], &[created_2_1.clone().into()], @@ -136,7 +148,7 @@ mod tests { let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_5.clone(), existing_6.clone(), existing_7.clone()], &[], &[created_1_3.clone().into()], @@ -151,7 +163,7 @@ mod tests { // simulate fill implosion of the file (this may happen w/ delete predicates) let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_8.clone()], &[], &[], @@ -167,28 +179,28 @@ mod tests { commit.history(), vec![ CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_1, existing_2], upgrade: vec![existing_3.clone(), existing_4.clone()], created: vec![created_1_1, created_1_2], target_level: CompactionLevel::FileNonOverlapped, }, CommitHistoryEntry { - partition_id: PartitionId::new(2), + partition_id: partition_id_2, delete: vec![existing_3], upgrade: vec![], created: vec![created_2_1], target_level: CompactionLevel::Final, }, CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_5, existing_6, existing_7,], upgrade: vec![], created: vec![created_1_3], target_level: CompactionLevel::FileNonOverlapped, }, CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_8], upgrade: vec![], created: vec![], diff --git a/compactor_scheduler/tests/local_scheduler/mod.rs b/compactor_scheduler/tests/local_scheduler/mod.rs index a2638be1b5..279ff51702 100644 --- a/compactor_scheduler/tests/local_scheduler/mod.rs +++ b/compactor_scheduler/tests/local_scheduler/mod.rs @@ -4,7 +4,7 @@ use assert_matches::assert_matches; use compactor_scheduler::{ create_scheduler, CompactionJob, LocalSchedulerConfig, Scheduler, SchedulerConfig, }; -use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId}; +use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId, TransitionPartitionId}; use iox_tests::{ParquetFileBuilder, TestCatalog, TestParquetFileBuilder, TestPartition}; mod end_job; @@ -65,7 +65,7 @@ impl TestLocalScheduler { pub async fn create_params_for_new_parquet_file(&self) -> ParquetFileParams { ParquetFileBuilder::new(42) - .with_partition(self.get_partition_id().get()) + .with_partition(self.get_transition_partition_id()) .build() .into() } @@ -81,4 +81,8 @@ impl TestLocalScheduler { pub fn get_partition_id(&self) -> PartitionId { self.test_partition.partition.id } + + pub fn get_transition_partition_id(&self) -> TransitionPartitionId { + self.test_partition.partition.transition_partition_id() + } } diff --git a/compactor_test_utils/src/simulator.rs b/compactor_test_utils/src/simulator.rs index 2f466fcdd4..cf731f5ed5 100644 --- a/compactor_test_utils/src/simulator.rs +++ b/compactor_test_utils/src/simulator.rs @@ -202,8 +202,7 @@ impl SimulatedFile { ParquetFileParams { namespace_id: partition_info.namespace_id, table_id: partition_info.table.id, - partition_id: partition_info.partition_id, - partition_hash_id: partition_info.partition_hash_id.clone(), + partition_id: partition_info.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time, max_time, diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 97815e1939..a77c1af8d0 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -527,10 +527,9 @@ pub struct ParquetFile { pub namespace_id: NamespaceId, /// the table pub table_id: TableId, - /// the partition - pub partition_id: PartitionId, - /// the partition hash ID, if generated - pub partition_hash_id: Option, + /// the partition identifier + #[sqlx(flatten)] + pub partition_id: TransitionPartitionId, /// the uuid used in the object store path for this file pub object_store_id: Uuid, /// the min timestamp of data in this file @@ -588,7 +587,6 @@ impl ParquetFile { namespace_id: params.namespace_id, table_id: params.table_id, partition_id: params.partition_id, - partition_hash_id: params.partition_hash_id, object_store_id: params.object_store_id, min_time: params.min_time, max_time: params.max_time, @@ -602,21 +600,9 @@ impl ParquetFile { } } - /// If this parquet file params will be storing a `PartitionHashId` in the catalog, use that. - /// Otherwise, use the database-assigned `PartitionId`. - pub fn transition_partition_id(&self) -> TransitionPartitionId { - TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref())) - } - /// Estimate the memory consumption of this object and its contents pub fn size(&self) -> usize { - std::mem::size_of_val(self) - + self - .partition_hash_id - .as_ref() - .map(|id| id.size() - std::mem::size_of_val(id)) - .unwrap_or_default() - + self.column_set.size() + std::mem::size_of_val(self) + self.partition_id.size() + self.column_set.size() - std::mem::size_of_val(&self.column_set) } @@ -638,10 +624,8 @@ pub struct ParquetFileParams { pub namespace_id: NamespaceId, /// the table pub table_id: TableId, - /// the partition - pub partition_id: PartitionId, - /// the partition hash ID, if generated - pub partition_hash_id: Option, + /// the partition identifier + pub partition_id: TransitionPartitionId, /// the uuid used in the object store path for this file pub object_store_id: Uuid, /// the min timestamp of data in this file @@ -662,21 +646,12 @@ pub struct ParquetFileParams { pub max_l0_created_at: Timestamp, } -impl ParquetFileParams { - /// If this parquet file params will be storing a `PartitionHashId` in the catalog, use that. - /// Otherwise, use the database-assigned `PartitionId`. - pub fn transition_partition_id(&self) -> TransitionPartitionId { - TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref())) - } -} - impl From for ParquetFileParams { fn from(value: ParquetFile) -> Self { Self { namespace_id: value.namespace_id, table_id: value.table_id, partition_id: value.partition_id, - partition_hash_id: value.partition_hash_id, object_store_id: value.object_store_id, min_time: value.min_time, max_time: value.max_time, diff --git a/data_types/src/partition.rs b/data_types/src/partition.rs index b8bfde8f56..c370cb3cea 100644 --- a/data_types/src/partition.rs +++ b/data_types/src/partition.rs @@ -31,6 +31,34 @@ impl TransitionPartitionId { } } +impl<'a, R> sqlx::FromRow<'a, R> for TransitionPartitionId +where + R: sqlx::Row, + &'static str: sqlx::ColumnIndex, + PartitionId: sqlx::decode::Decode<'a, R::Database>, + PartitionId: sqlx::types::Type, + Option: sqlx::decode::Decode<'a, R::Database>, + Option: sqlx::types::Type, +{ + fn from_row(row: &'a R) -> sqlx::Result { + let partition_id: Option = row.try_get("partition_id")?; + let partition_hash_id: Option = row.try_get("partition_hash_id")?; + + let transition_partition_id = match (partition_id, partition_hash_id) { + (_, Some(hash_id)) => TransitionPartitionId::Deterministic(hash_id), + (Some(id), _) => TransitionPartitionId::Deprecated(id), + (None, None) => { + return Err(sqlx::Error::ColumnDecode { + index: "partition_id".into(), + source: "Both partition_id and partition_hash_id were NULL".into(), + }) + } + }; + + Ok(transition_partition_id) + } +} + impl From<(PartitionId, Option<&PartitionHashId>)> for TransitionPartitionId { fn from((partition_id, partition_hash_id): (PartitionId, Option<&PartitionHashId>)) -> Self { partition_hash_id diff --git a/garbage_collector/src/objectstore/checker.rs b/garbage_collector/src/objectstore/checker.rs index a8e7a4bb24..0a91f0f6a6 100644 --- a/garbage_collector/src/objectstore/checker.rs +++ b/garbage_collector/src/objectstore/checker.rs @@ -267,8 +267,7 @@ mod tests { let parquet_file_params = ParquetFileParams { namespace_id: namespace.id, table_id: partition.table_id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(10), @@ -298,7 +297,7 @@ mod tests { let location = ParquetFilePath::new( file_in_catalog.namespace_id, file_in_catalog.table_id, - &file_in_catalog.transition_partition_id(), + &file_in_catalog.partition_id.clone(), file_in_catalog.object_store_id, ) .object_store_path(); @@ -376,7 +375,7 @@ mod tests { let location = ParquetFilePath::new( file_in_catalog.namespace_id, file_in_catalog.table_id, - &file_in_catalog.transition_partition_id(), + &file_in_catalog.partition_id.clone(), file_in_catalog.object_store_id, ) .object_store_path(); @@ -469,7 +468,7 @@ mod tests { let loc = ParquetFilePath::new( file_in_catalog.namespace_id, file_in_catalog.table_id, - &file_in_catalog.transition_partition_id(), + &file_in_catalog.partition_id.clone(), file_in_catalog.object_store_id, ) .object_store_path(); diff --git a/generated_types/build.rs b/generated_types/build.rs index 370eb59e8f..409d0c5140 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -52,6 +52,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { let proto_files = vec![ authz_path.join("authz.proto"), catalog_path.join("parquet_file.proto"), + catalog_path.join("partition_identifier.proto"), catalog_path.join("service.proto"), compactor_path.join("service.proto"), delete_path.join("service.proto"), diff --git a/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto b/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto index a7be9b84e8..e61c4430a8 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package influxdata.iox.catalog.v1; option go_package = "github.com/influxdata/iox/catalog/v1"; +import "influxdata/iox/catalog/v1/partition_identifier.proto"; + message ParquetFile { reserved 7; reserved "min_sequence_number"; @@ -11,6 +13,8 @@ message ParquetFile { reserved "shard_id"; reserved 8; reserved "max_sequence_number"; + reserved 5; + reserved "partition_id"; // the id of the file in the catalog int64 id = 1; @@ -18,8 +22,9 @@ message ParquetFile { int64 namespace_id = 3; // the table id int64 table_id = 4; - // the partition id - int64 partition_id = 5; + + PartitionIdentifier partition_identifier = 19; + // the object store uuid string object_store_id = 6; // the min timestamp of data in this file diff --git a/generated_types/protos/influxdata/iox/catalog/v1/partition_identifier.proto b/generated_types/protos/influxdata/iox/catalog/v1/partition_identifier.proto new file mode 100644 index 0000000000..eb11c388d2 --- /dev/null +++ b/generated_types/protos/influxdata/iox/catalog/v1/partition_identifier.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package influxdata.iox.catalog.v1; +option go_package = "github.com/influxdata/iox/catalog/v1"; + +message PartitionIdentifier { + // Either the catalog-assigned partition ID or the deterministic identifier created from the + // table ID and partition key. + oneof id { + int64 catalog_id = 1; + bytes hash_id = 2; + } +} diff --git a/generated_types/protos/influxdata/iox/catalog/v1/service.proto b/generated_types/protos/influxdata/iox/catalog/v1/service.proto index bdecee8ec6..3e370874dc 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/service.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/service.proto @@ -3,6 +3,7 @@ package influxdata.iox.catalog.v1; option go_package = "github.com/influxdata/iox/catalog/v1"; import "influxdata/iox/catalog/v1/parquet_file.proto"; +import "influxdata/iox/catalog/v1/partition_identifier.proto"; service CatalogService { // Get the parquet_file catalog records in the given partition @@ -19,8 +20,11 @@ service CatalogService { } message GetParquetFilesByPartitionIdRequest { - // the partition id - int64 partition_id = 1; + // Was the catalog-assigned partition ID. + reserved 1; + reserved "partition_id"; + + PartitionIdentifier partition_identifier = 2; } message GetParquetFilesByPartitionIdResponse { @@ -35,15 +39,17 @@ message Partition { reserved "sequencer_id"; reserved 7; reserved "shard_id"; + reserved 1; + reserved "id"; - // the partition id - int64 id = 1; // the table id the partition is in int64 table_id = 3; // the partition key string key = 4; // the sort key for data in parquet files in the partition repeated string array_sort_key = 6; + + PartitionIdentifier identifier = 8; } message GetPartitionsByTableIdRequest { diff --git a/import_export/src/file/export.rs b/import_export/src/file/export.rs index 4d1a1c6418..a9be88f064 100644 --- a/import_export/src/file/export.rs +++ b/import_export/src/file/export.rs @@ -1,10 +1,13 @@ +use data_types::{PartitionHashId, PartitionId, TransitionPartitionId}; use futures_util::TryStreamExt; use influxdb_iox_client::{ - catalog::{self, generated_types::ParquetFile}, + catalog::{ + self, + generated_types::{partition_identifier, ParquetFile, PartitionIdentifier}, + }, connection::Connection, store, }; -use observability_deps::tracing::{debug, info}; use std::path::{Path, PathBuf}; use thiserror::Error; use tokio::{ @@ -35,10 +38,6 @@ type Result = std::result::Result; pub struct RemoteExporter { catalog_client: catalog::Client, store_client: store::Client, - - /// Optional partition filter. If `Some(partition_id)`, only these - /// files with that `partition_id` are downloaded. - partition_filter: Option, } impl RemoteExporter { @@ -46,19 +45,9 @@ impl RemoteExporter { Self { catalog_client: catalog::Client::new(connection.clone()), store_client: store::Client::new(connection), - partition_filter: None, } } - /// Specify that only files and metadata for the specific - /// partition id should be exported. - pub fn with_partition_filter(mut self, partition_id: i64) -> Self { - info!(partition_id, "Filtering by partition"); - - self.partition_filter = Some(partition_id); - self - } - /// Exports all data and metadata for `table_name` in /// `namespace` to local files. /// @@ -95,39 +84,14 @@ impl RemoteExporter { let indexed_parquet_file_metadata = parquet_files.into_iter().enumerate(); for (index, parquet_file) in indexed_parquet_file_metadata { - if self.should_export(parquet_file.partition_id) { - self.export_parquet_file( - &output_directory, - index, - num_parquet_files, - &parquet_file, - ) + self.export_parquet_file(&output_directory, index, num_parquet_files, &parquet_file) .await?; - } else { - debug!( - "skipping file {} of {num_parquet_files} ({} does not match request)", - index + 1, - parquet_file.partition_id - ); - } } println!("Done."); Ok(()) } - /// Return true if this partition should be exported - fn should_export(&self, partition_id: i64) -> bool { - self.partition_filter - .map(|partition_filter| { - // if a partition filter was specified, only export - // the file if the partition matches - partition_filter == partition_id - }) - // export files if there is no partition - .unwrap_or(true) - } - /// Exports table and partition information for the specified /// table. Overwrites existing files, if any, to ensure it has the /// latest catalog information. @@ -158,13 +122,11 @@ impl RemoteExporter { .await?; for partition in partitions { - let partition_id = partition.id; - if self.should_export(partition_id) { - let partition_json = serde_json::to_string_pretty(&partition)?; - let filename = format!("partition.{partition_id}.json"); - let file_path = output_directory.join(&filename); - write_string_to_file(&partition_json, &file_path).await?; - } + let partition_id = to_partition_id(partition.identifier.as_ref()); + let partition_json = serde_json::to_string_pretty(&partition)?; + let filename = format!("partition.{partition_id}.json"); + let file_path = output_directory.join(&filename); + write_string_to_file(&partition_json, &file_path).await?; } Ok(()) @@ -183,9 +145,10 @@ impl RemoteExporter { parquet_file: &ParquetFile, ) -> Result<()> { let uuid = &parquet_file.object_store_id; - let partition_id = parquet_file.partition_id; let file_size_bytes = parquet_file.file_size_bytes as u64; + let partition_id = to_partition_id(parquet_file.partition_identifier.as_ref()); + // copy out the metadata as pbjson encoded data always (to // ensure we have the most up to date version) { @@ -230,6 +193,21 @@ impl RemoteExporter { } } +fn to_partition_id(partition_identifier: Option<&PartitionIdentifier>) -> TransitionPartitionId { + match partition_identifier + .and_then(|pi| pi.id.as_ref()) + .expect("Catalog service should send the partition identifier") + { + partition_identifier::Id::HashId(bytes) => TransitionPartitionId::Deterministic( + PartitionHashId::try_from(&bytes[..]) + .expect("Catalog service should send valid hash_id bytes"), + ), + partition_identifier::Id::CatalogId(id) => { + TransitionPartitionId::Deprecated(PartitionId::new(*id)) + } + } +} + /// writes the contents of a string to a file, overwriting the previous contents, if any async fn write_string_to_file(contents: &str, path: &Path) -> Result<()> { let mut file = OpenOptions::new() diff --git a/import_export/src/file/import.rs b/import_export/src/file/import.rs index 3056317498..c6530e824d 100644 --- a/import_export/src/file/import.rs +++ b/import_export/src/file/import.rs @@ -7,7 +7,7 @@ use data_types::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO, }, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError, - ParquetFileParams, Partition, PartitionHashId, Statistics, Table, TableId, Timestamp, + ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp, }; use generated_types::influxdata::iox::catalog::v1 as proto; // ParquetFile as ProtoParquetFile, Partition as ProtoPartition, @@ -567,9 +567,6 @@ impl RemoteImporter { // need to make columns in the target catalog let column_set = insert_columns(table.id, decoded_iox_parquet_metadata, repos).await?; - // Create the the partition_hash_id - let partition_hash_id = Some(PartitionHashId::new(table.id, &partition.partition_key)); - let params = if let Some(proto_parquet_file) = &parquet_metadata { let compaction_level = proto_parquet_file .compaction_level @@ -579,8 +576,7 @@ impl RemoteImporter { ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_hash_id, - partition_id: partition.id, + partition_id: partition.transition_partition_id(), object_store_id, min_time: Timestamp::new(proto_parquet_file.min_time), max_time: Timestamp::new(proto_parquet_file.max_time), @@ -599,8 +595,7 @@ impl RemoteImporter { ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_hash_id, - partition_id: partition.id, + partition_id: partition.transition_partition_id(), object_store_id, min_time, max_time, diff --git a/influxdb_iox/src/commands/remote/store.rs b/influxdb_iox/src/commands/remote/store.rs index eddae4c15b..8c3ff29b4b 100644 --- a/influxdb_iox/src/commands/remote/store.rs +++ b/influxdb_iox/src/commands/remote/store.rs @@ -55,10 +55,6 @@ struct GetTable { #[clap(action)] table: String, - /// If specified, only files from the specified partitions are downloaded - #[clap(action, short, long)] - partition_id: Option, - /// The output directory to use. If not specified, files will be placed in a directory named /// after the table in the current working directory. #[clap(action, short)] @@ -91,13 +87,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { Command::GetTable(GetTable { namespace, table, - partition_id, output_directory, }) => { let mut exporter = RemoteExporter::new(connection); - if let Some(partition_id) = partition_id { - exporter = exporter.with_partition_filter(partition_id); - } Ok(exporter .export_table(output_directory, namespace, table) .await?) diff --git a/influxdb_iox/tests/end_to_end_cases/compactor.rs b/influxdb_iox/tests/end_to_end_cases/compactor.rs index 1ec8ba38ba..cb856f5bcd 100644 --- a/influxdb_iox/tests/end_to_end_cases/compactor.rs +++ b/influxdb_iox/tests/end_to_end_cases/compactor.rs @@ -157,10 +157,12 @@ async fn sharded_compactor_0_always_compacts_partition_1() { .assert() .success() .stdout( - // Important parts are the expected partition ID - predicate::str::contains(r#""partitionId": "1","#) - // and compaction level - .and(predicate::str::contains(r#""compactionLevel": 1"#)), + // Important parts are the expected partition identifier + predicate::str::contains( + r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#, + ) + // and compaction level + .and(predicate::str::contains(r#""compactionLevel": 1"#)), ); } .boxed() @@ -240,10 +242,12 @@ async fn sharded_compactor_1_never_compacts_partition_1() { .assert() .success() .stdout( - // Important parts are the expected partition ID - predicate::str::contains(r#""partitionId": "1","#) - // and compaction level is 0 so it's not returned - .and(predicate::str::contains("compactionLevel").not()), + // Important parts are the expected partition identifier + predicate::str::contains( + r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#, + ) + // and compaction level is 0 so it's not returned + .and(predicate::str::contains("compactionLevel").not()), ); } .boxed() diff --git a/influxdb_iox/tests/end_to_end_cases/remote.rs b/influxdb_iox/tests/end_to_end_cases/remote.rs index 7a3339d52a..9a31d70e72 100644 --- a/influxdb_iox/tests/end_to_end_cases/remote.rs +++ b/influxdb_iox/tests/end_to_end_cases/remote.rs @@ -280,10 +280,9 @@ async fn remote_partition_and_get_from_store_and_pull() { .arg("1") .assert() .success() - .stdout( - predicate::str::contains(r#""id": "1""#) - .and(predicate::str::contains(r#""partitionId": "1","#)), - ) + .stdout(predicate::str::contains( + r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#, + )) .get_output() .stdout .clone(); diff --git a/influxdb_iox_client/src/client/catalog.rs b/influxdb_iox_client/src/client/catalog.rs index f65ea4f154..249b2dec50 100644 --- a/influxdb_iox_client/src/client/catalog.rs +++ b/influxdb_iox_client/src/client/catalog.rs @@ -29,9 +29,15 @@ impl Client { &mut self, partition_id: i64, ) -> Result, Error> { + let partition_identifier = PartitionIdentifier { + id: Some(partition_identifier::Id::CatalogId(partition_id)), + }; + let response = self .inner - .get_parquet_files_by_partition_id(GetParquetFilesByPartitionIdRequest { partition_id }) + .get_parquet_files_by_partition_id(GetParquetFilesByPartitionIdRequest { + partition_identifier: Some(partition_identifier), + }) .await?; Ok(response.into_inner().parquet_files) diff --git a/ingester/src/persist/completion_observer.rs b/ingester/src/persist/completion_observer.rs index a3dfae0c0b..4978398daf 100644 --- a/ingester/src/persist/completion_observer.rs +++ b/ingester/src/persist/completion_observer.rs @@ -2,7 +2,8 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; use async_trait::async_trait; use data_types::{ - sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, PartitionId, TableId, + sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, TableId, + TransitionPartitionId, }; use crate::wal::reference_tracker::WalReferenceHandle; @@ -54,9 +55,9 @@ impl CompletedPersist { self.meta.table_id } - /// Returns the [`PartitionId`] of the persisted data. - pub(crate) fn partition_id(&self) -> PartitionId { - self.meta.partition_id + /// Returns the [`TransitionPartitionId`] of the persisted data. + pub(crate) fn partition_id(&self) -> &TransitionPartitionId { + &self.meta.partition_id } /// Returns the [`SequenceNumberSet`] of the persisted data. @@ -166,15 +167,16 @@ pub(crate) mod mock { #[cfg(test)] mod tests { use super::*; - use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID}; + use crate::test_util::{ + ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID, + }; use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp}; fn arbitrary_file_meta() -> ParquetFileParams { ParquetFileParams { namespace_id: ARBITRARY_NAMESPACE_ID, table_id: ARBITRARY_TABLE_ID, - partition_id: ARBITRARY_PARTITION_ID, - partition_hash_id: None, + partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(), object_store_id: Default::default(), min_time: Timestamp::new(42), max_time: Timestamp::new(42), @@ -226,7 +228,7 @@ mod tests { assert_eq!(note.namespace_id(), meta.namespace_id); assert_eq!(note.table_id(), meta.table_id); - assert_eq!(note.partition_id(), meta.partition_id); + assert_eq!(note.partition_id(), &meta.partition_id); assert_eq!(note.column_count(), meta.column_set.len()); assert_eq!(note.row_count(), meta.row_count as usize); diff --git a/ingester/src/persist/file_metrics.rs b/ingester/src/persist/file_metrics.rs index 54f268f926..05353f9ccb 100644 --- a/ingester/src/persist/file_metrics.rs +++ b/ingester/src/persist/file_metrics.rs @@ -151,7 +151,9 @@ mod tests { use super::*; use crate::{ persist::completion_observer::mock::MockCompletionObserver, - test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID}, + test_util::{ + ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID, + }, }; use data_types::{ sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, ParquetFileParams, Timestamp, @@ -169,8 +171,7 @@ mod tests { let meta = ParquetFileParams { namespace_id: ARBITRARY_NAMESPACE_ID, table_id: ARBITRARY_TABLE_ID, - partition_id: ARBITRARY_PARTITION_ID, - partition_hash_id: None, + partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(), object_store_id: Default::default(), min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _), max_time: Timestamp::new(Duration::from_secs(1_042).as_nanos() as _), // 42 seconds later diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index ca0f829fc9..a7e5d9d951 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -16,7 +16,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use data_types::{CompactionLevel, ParquetFile, TransitionPartitionId}; + use data_types::{CompactionLevel, ParquetFile}; use futures::TryStreamExt; use iox_catalog::{ interface::{get_schema_by_id, Catalog, SoftDeletedRows}, @@ -190,7 +190,7 @@ mod tests { // Generate a partition with data let partition = partition_with_write(Arc::clone(&catalog)).await; let table_id = partition.lock().table_id(); - let partition_id = partition.lock().partition_id(); + let partition_id = partition.lock().transition_partition_id(); let namespace_id = partition.lock().namespace_id(); assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(None)); @@ -221,7 +221,7 @@ mod tests { assert_matches!(&completion_observer.calls().as_slice(), &[n] => { assert_eq!(n.namespace_id(), namespace_id); assert_eq!(n.table_id(), table_id); - assert_eq!(n.partition_id(), partition_id); + assert_eq!(n.partition_id(), &partition_id); assert_eq!(n.sequence_numbers().len(), 1); }); @@ -243,12 +243,12 @@ mod tests { .repositories() .await .parquet_files() - .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id)) + .list_by_partition_not_to_delete(&partition_id) .await .expect("query for parquet files failed"); // Validate a single file was inserted with the expected properties. - let (object_store_id, file_size_bytes) = assert_matches!(&*files, &[ParquetFile { + let (object_store_id, file_size_bytes) = assert_matches!(&*files, [ParquetFile { namespace_id: got_namespace_id, table_id: got_table_id, partition_id: got_partition_id, @@ -263,12 +263,12 @@ mod tests { { assert_eq!(created_at.get(), max_l0_created_at.get()); - assert_eq!(got_namespace_id, namespace_id); - assert_eq!(got_table_id, table_id); - assert_eq!(got_partition_id, partition_id); + assert_eq!(got_namespace_id, &namespace_id); + assert_eq!(got_table_id, &table_id); + assert_eq!(got_partition_id, &partition_id); - assert_eq!(row_count, 1); - assert_eq!(compaction_level, CompactionLevel::Initial); + assert_eq!(*row_count, 1); + assert_eq!(compaction_level, &CompactionLevel::Initial); (object_store_id, file_size_bytes) } @@ -292,7 +292,7 @@ mod tests { }] => { let want_path = format!("{object_store_id}.parquet"); assert!(location.as_ref().ends_with(&want_path)); - assert_eq!(size, file_size_bytes as usize); + assert_eq!(size, *file_size_bytes as usize); } ) } @@ -326,8 +326,7 @@ mod tests { // Generate a partition with data let partition = partition_with_write(Arc::clone(&catalog)).await; let table_id = partition.lock().table_id(); - let partition_id = partition.lock().partition_id(); - let transition_partition_id = partition.lock().transition_partition_id(); + let partition_id = partition.lock().transition_partition_id(); let namespace_id = partition.lock().namespace_id(); assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(None)); @@ -344,7 +343,7 @@ mod tests { .await .partitions() .cas_sort_key( - &transition_partition_id, + &partition_id, None, &["bananas", "are", "good", "for", "you"], ) @@ -367,7 +366,7 @@ mod tests { assert_matches!(&completion_observer.calls().as_slice(), &[n] => { assert_eq!(n.namespace_id(), namespace_id); assert_eq!(n.table_id(), table_id); - assert_eq!(n.partition_id(), partition_id); + assert_eq!(n.partition_id(), &partition_id); assert_eq!(n.sequence_numbers().len(), 1); }); @@ -392,12 +391,12 @@ mod tests { .repositories() .await .parquet_files() - .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id)) + .list_by_partition_not_to_delete(&partition_id) .await .expect("query for parquet files failed"); // Validate a single file was inserted with the expected properties. - let (object_store_id, file_size_bytes) = assert_matches!(&*files, &[ParquetFile { + let (object_store_id, file_size_bytes) = assert_matches!(&*files, [ParquetFile { namespace_id: got_namespace_id, table_id: got_table_id, partition_id: got_partition_id, @@ -412,12 +411,12 @@ mod tests { { assert_eq!(created_at.get(), max_l0_created_at.get()); - assert_eq!(got_namespace_id, namespace_id); - assert_eq!(got_table_id, table_id); - assert_eq!(got_partition_id, partition_id); + assert_eq!(got_namespace_id, &namespace_id); + assert_eq!(got_table_id, &table_id); + assert_eq!(got_partition_id, &partition_id); - assert_eq!(row_count, 1); - assert_eq!(compaction_level, CompactionLevel::Initial); + assert_eq!(*row_count, 1); + assert_eq!(compaction_level, &CompactionLevel::Initial); (object_store_id, file_size_bytes) } @@ -438,18 +437,14 @@ mod tests { assert_eq!(files.len(), 2, "expected two uploaded files"); // Ensure the catalog record points at a valid file in object storage. - let want_path = ParquetFilePath::new( - namespace_id, - table_id, - &transition_partition_id, - object_store_id, - ) - .object_store_path(); + let want_path = + ParquetFilePath::new(namespace_id, table_id, &partition_id, *object_store_id) + .object_store_path(); let file = files .into_iter() .find(|f| f.location == want_path) .expect("did not find final file in object storage"); - assert_eq!(file.size, file_size_bytes as usize); + assert_eq!(file.size, *file_size_bytes as usize); } } diff --git a/ingester/src/persist/queue.rs b/ingester/src/persist/queue.rs index dbb9714e5c..af03d08c86 100644 --- a/ingester/src/persist/queue.rs +++ b/ingester/src/persist/queue.rs @@ -55,7 +55,8 @@ pub(crate) mod mock { use std::{sync::Arc, time::Duration}; use data_types::{ - ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp, + ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionHashId, PartitionKey, + TableId, Timestamp, TransitionPartitionId, }; use test_helpers::timeout::FutureTimeout; use tokio::task::JoinHandle; @@ -155,13 +156,16 @@ pub(crate) mod mock { let wait_ms: u64 = rand::random::() % 100; tokio::time::sleep(Duration::from_millis(wait_ms)).await; let sequence_numbers = partition.lock().mark_persisted(data); + let table_id = TableId::new(2); + let partition_hash_id = + PartitionHashId::new(table_id, &PartitionKey::from("arbitrary")); + let partition_id = TransitionPartitionId::Deterministic(partition_hash_id); completion_observer .persist_complete(Arc::new(CompletedPersist::new( ParquetFileParams { namespace_id: NamespaceId::new(1), - table_id: TableId::new(2), - partition_id: PartitionId::new(3), - partition_hash_id: None, + table_id, + partition_id, object_store_id: Default::default(), min_time: Timestamp::new(42), max_time: Timestamp::new(42), diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 0ecef0bd9d..96ae85b59b 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -394,8 +394,7 @@ where ParquetFileParams { namespace_id: NamespaceId::new(1), table_id: TableId::new(2), - partition_id: PartitionId::new(3), - partition_hash_id: None, + partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(), object_store_id: Default::default(), min_time: Timestamp::new(42), max_time: Timestamp::new(42), diff --git a/iox_catalog/migrations/20230726175943_make_parquet_file_partition_id_optional.sql b/iox_catalog/migrations/20230726175943_make_parquet_file_partition_id_optional.sql new file mode 100644 index 0000000000..b95b8dad83 --- /dev/null +++ b/iox_catalog/migrations/20230726175943_make_parquet_file_partition_id_optional.sql @@ -0,0 +1,24 @@ +DROP TRIGGER IF EXISTS update_partition ON parquet_file; + +ALTER TABLE parquet_file +ALTER COLUMN partition_id +DROP NOT NULL; + +CREATE OR REPLACE FUNCTION update_partition_on_new_file_at() +RETURNS TRIGGER +LANGUAGE PLPGSQL +AS $$ +BEGIN + UPDATE partition + SET new_file_at = NEW.created_at + WHERE (NEW.partition_id IS NULL OR id = NEW.partition_id) + AND (NEW.partition_hash_id IS NULL OR hash_id = NEW.partition_hash_id); + + RETURN NEW; +END; +$$; + +CREATE TRIGGER update_partition + AFTER INSERT ON parquet_file + FOR EACH ROW + EXECUTE PROCEDURE update_partition_on_new_file_at(); diff --git a/iox_catalog/sqlite/migrations/20230726175943_make_parquet_file_partition_id_optional.sql b/iox_catalog/sqlite/migrations/20230726175943_make_parquet_file_partition_id_optional.sql new file mode 100644 index 0000000000..1adcbe3371 --- /dev/null +++ b/iox_catalog/sqlite/migrations/20230726175943_make_parquet_file_partition_id_optional.sql @@ -0,0 +1,98 @@ +CREATE TABLE parquet_file_temp +AS SELECT * FROM parquet_file; + +DROP TABLE parquet_file; + +CREATE TABLE parquet_file +( + id INTEGER + constraint parquet_file_pkey + primary key autoincrement, + shard_id numeric not null + constraint parquet_file_sequencer_id_fkey + references shard, + table_id numeric not null + references table_name, + partition_id numeric + references partition, + partition_hash_id bytea + references partition (hash_id), + + object_store_id uuid not null + constraint parquet_location_unique + unique, + max_sequence_number numeric, + min_time numeric, + max_time numeric, + to_delete numeric, + row_count numeric default 0 not null, + file_size_bytes numeric default 0 not null, + compaction_level smallint default 0 not null, + created_at numeric, + namespace_id numeric not null + references namespace + on delete cascade, + column_set numeric[] not null, + max_l0_created_at numeric default 0 not null +); + +create index if not exists parquet_file_deleted_at_idx + on parquet_file (to_delete); + +create index if not exists parquet_file_partition_idx + on parquet_file (partition_id); + +create index if not exists parquet_file_table_idx + on parquet_file (table_id); + +create index if not exists parquet_file_shard_compaction_delete_idx + on parquet_file (shard_id, compaction_level, to_delete); + +create index if not exists parquet_file_shard_compaction_delete_created_idx + on parquet_file (shard_id, compaction_level, to_delete, created_at); + +create index if not exists parquet_file_partition_created_idx + on parquet_file (partition_id, created_at); + +CREATE INDEX IF NOT EXISTS parquet_file_partition_hash_id_idx +ON parquet_file (partition_hash_id) +WHERE partition_hash_id IS NOT NULL; + +create trigger if not exists update_partition + after insert + on parquet_file + for each row +begin + UPDATE partition + SET new_file_at = NEW.created_at + WHERE (NEW.partition_id IS NULL OR id = NEW.partition_id) + AND (NEW.partition_hash_id IS NULL OR hash_id = NEW.partition_hash_id); +end; + +create trigger if not exists update_billing + after insert + on parquet_file + for each row +begin + INSERT INTO billing_summary (namespace_id, total_file_size_bytes) + VALUES (NEW.namespace_id, NEW.file_size_bytes) + ON CONFLICT (namespace_id) DO UPDATE + SET total_file_size_bytes = billing_summary.total_file_size_bytes + NEW.file_size_bytes + WHERE billing_summary.namespace_id = NEW.namespace_id; +end; + +create trigger if not exists decrement_summary + after update + on parquet_file + for each row + when OLD.to_delete IS NULL AND NEW.to_delete IS NOT NULL +begin + UPDATE billing_summary + SET total_file_size_bytes = billing_summary.total_file_size_bytes - OLD.file_size_bytes + WHERE billing_summary.namespace_id = OLD.namespace_id; +end; + +INSERT INTO parquet_file +SELECT * FROM parquet_file_temp; + +DROP TABLE parquet_file_temp; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index a56bf9edda..cf7b85556e 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -1865,7 +1865,7 @@ pub(crate) mod test_helpers { let other_params = ParquetFileParams { table_id: other_partition.table_id, - partition_id: other_partition.id, + partition_id: other_partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(50), max_time: Timestamp::new(60), @@ -1978,7 +1978,7 @@ pub(crate) mod test_helpers { let f1_params = ParquetFileParams { table_id: partition2.table_id, - partition_id: partition2.id, + partition_id: partition2.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(10), @@ -2449,7 +2449,7 @@ pub(crate) mod test_helpers { let l0_five_hour_ago_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_five_hour_ago, - partition_id: partition2.id, + partition_id: partition2.transition_partition_id(), ..parquet_file_params.clone() }; repos @@ -2492,7 +2492,7 @@ pub(crate) mod test_helpers { let l1_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_now, - partition_id: partition2.id, + partition_id: partition2.transition_partition_id(), compaction_level: CompactionLevel::FileNonOverlapped, ..parquet_file_params.clone() }; @@ -2578,7 +2578,7 @@ pub(crate) mod test_helpers { let l2_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_now, - partition_id: partition3.id, + partition_id: partition3.transition_partition_id(), compaction_level: CompactionLevel::Final, ..parquet_file_params.clone() }; @@ -2619,7 +2619,7 @@ pub(crate) mod test_helpers { let l0_one_hour_ago_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_one_hour_ago, - partition_id: partition3.id, + partition_id: partition3.transition_partition_id(), ..parquet_file_params.clone() }; repos @@ -2720,8 +2720,7 @@ pub(crate) mod test_helpers { level1_file.compaction_level = CompactionLevel::FileNonOverlapped; let other_partition_params = ParquetFileParams { - partition_id: partition2.id, - partition_hash_id: partition2.hash_id().cloned(), + partition_id: partition2.transition_partition_id(), object_store_id: Uuid::new_v4(), ..parquet_file_params.clone() }; @@ -2744,12 +2743,20 @@ pub(crate) mod test_helpers { expected_ids.sort(); assert_eq!(file_ids, expected_ids); - // remove namespace to avoid it from affecting later tests - repos - .namespaces() - .soft_delete("namespace_parquet_file_test_list_by_partiton_not_to_delete") + // Using the catalog partition ID should return the same files, even if the Parquet file + // records don't have the partition ID on them (which is the default now) + let files = repos + .parquet_files() + .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition.id)) .await - .expect("delete namespace should succeed"); + .unwrap(); + assert_eq!(files.len(), 2); + + let mut file_ids: Vec<_> = files.into_iter().map(|f| f.id).collect(); + file_ids.sort(); + let mut expected_ids = vec![parquet_file.id, level1_file.id]; + expected_ids.sort(); + assert_eq!(file_ids, expected_ids); } async fn test_update_to_compaction_level_1(catalog: Arc) { diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index e45f4f59a8..ec6102f363 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -396,8 +396,7 @@ pub mod test_helpers { ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(10), diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index da8ec69e18..dfca594e6d 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -887,14 +887,28 @@ impl ParquetFileRepo for MemTxn { ) -> Result> { let stage = self.stage(); + let partition = stage + .partitions + .iter() + .find(|p| match partition_id { + TransitionPartitionId::Deterministic(hash_id) => p + .hash_id() + .map(|p_hash_id| p_hash_id == hash_id) + .unwrap_or(false), + TransitionPartitionId::Deprecated(id) => id == &p.id, + }) + .unwrap() + .clone(); + Ok(stage .parquet_files .iter() - .filter(|f| match partition_id { - TransitionPartitionId::Deterministic(hash_id) => { - f.partition_hash_id.as_ref().map_or(false, |h| h == hash_id) - } - TransitionPartitionId::Deprecated(id) => f.partition_id == *id, + .filter(|f| match &f.partition_id { + TransitionPartitionId::Deterministic(hash_id) => partition + .hash_id() + .map(|p_hash_id| p_hash_id == hash_id) + .unwrap_or(false), + TransitionPartitionId::Deprecated(id) => id == &partition.id, }) .filter(|f| f.to_delete.is_none()) .cloned() @@ -996,17 +1010,15 @@ async fn create_parquet_file( ParquetFileId::new(stage.parquet_files.len() as i64 + 1), ); let created_at = parquet_file.created_at; - let partition_id = parquet_file.partition_id; + let partition_id = parquet_file.partition_id.clone(); stage.parquet_files.push(parquet_file); // Update the new_file_at field its partition to the time of created_at let partition = stage .partitions .iter_mut() - .find(|p| p.id == partition_id) - .ok_or(Error::PartitionNotFound { - id: TransitionPartitionId::Deprecated(partition_id), - })?; + .find(|p| p.transition_partition_id() == partition_id) + .ok_or(Error::PartitionNotFound { id: partition_id })?; partition.new_file_at = Some(created_at); Ok(stage.parquet_files.last().unwrap().clone()) diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 9714a5496f..ea2f99c8f1 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1627,22 +1627,26 @@ RETURNING id; let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_hash_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.hash_id = $1 AND parquet_file.to_delete IS NULL; "#, ) .bind(hash_id), // $1 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.id = $1 AND parquet_file.to_delete IS NULL; "#, ) @@ -1754,7 +1758,6 @@ where namespace_id, table_id, partition_id, - partition_hash_id, object_store_id, min_time, max_time, @@ -1766,6 +1769,11 @@ where max_l0_created_at, } = parquet_file_params; + let (partition_id, partition_hash_id) = match partition_id { + TransitionPartitionId::Deterministic(hash_id) => (None, Some(hash_id)), + TransitionPartitionId::Deprecated(id) => (Some(id), None), + }; + let partition_hash_id_ref = &partition_hash_id.as_ref(); let query = sqlx::query_scalar::<_, ParquetFileId>( r#" @@ -2203,7 +2211,10 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; .create(parquet_file_params) .await .unwrap(); - assert!(parquet_file.partition_hash_id.is_none()); + assert_matches!( + parquet_file.partition_id, + TransitionPartitionId::Deprecated(_) + ); } #[test] diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 23ecd696f6..509fe5db10 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -1221,8 +1221,8 @@ struct ParquetFilePod { id: ParquetFileId, namespace_id: NamespaceId, table_id: TableId, - partition_id: PartitionId, - partition_hash_id: Option, + #[sqlx(flatten)] + partition_id: TransitionPartitionId, object_store_id: Uuid, min_time: Timestamp, max_time: Timestamp, @@ -1242,7 +1242,6 @@ impl From for ParquetFile { namespace_id: value.namespace_id, table_id: value.table_id, partition_id: value.partition_id, - partition_hash_id: value.partition_hash_id, object_store_id: value.object_store_id, min_time: value.min_time, max_time: value.max_time, @@ -1395,22 +1394,26 @@ RETURNING id; let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFilePod>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_hash_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.hash_id = $1 AND parquet_file.to_delete IS NULL; "#, ) .bind(hash_id), // $1 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFilePod>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.id = $1 AND parquet_file.to_delete IS NULL; "#, ) @@ -1533,7 +1536,6 @@ where namespace_id, table_id, partition_id, - partition_hash_id, object_store_id, min_time, max_time, @@ -1545,7 +1547,10 @@ where max_l0_created_at, } = parquet_file_params; - let partition_hash_id_ref = &partition_hash_id.as_ref(); + let (partition_id, partition_hash_id) = match partition_id { + TransitionPartitionId::Deterministic(hash_id) => (None, Some(hash_id)), + TransitionPartitionId::Deprecated(id) => (Some(id), None), + }; let res = sqlx::query_as::<_, ParquetFilePod>( r#" INSERT INTO parquet_file ( @@ -1562,7 +1567,7 @@ RETURNING .bind(TRANSITION_SHARD_ID) // $1 .bind(table_id) // $2 .bind(partition_id) // $3 - .bind(partition_hash_id_ref) // $4 + .bind(partition_hash_id.as_ref()) // $4 .bind(object_store_id) // $5 .bind(min_time) // $6 .bind(max_time) // $7 @@ -1811,7 +1816,10 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; .create(parquet_file_params) .await .unwrap(); - assert!(parquet_file.partition_hash_id.is_none()); + assert_matches!( + parquet_file.partition_id, + TransitionPartitionId::Deprecated(_) + ); } macro_rules! test_column_create_or_get_many_unchecked { diff --git a/iox_tests/src/builders.rs b/iox_tests/src/builders.rs index db62514f89..fd147aa077 100644 --- a/iox_tests/src/builders.rs +++ b/iox_tests/src/builders.rs @@ -1,6 +1,7 @@ use data_types::{ ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, + TransitionPartitionId, }; use uuid::Uuid; @@ -20,8 +21,7 @@ impl ParquetFileBuilder { id: ParquetFileId::new(id), namespace_id: NamespaceId::new(0), table_id, - partition_id: PartitionId::new(0), - partition_hash_id: Some(PartitionHashId::new( + partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new( table_id, &PartitionKey::from("arbitrary"), )), @@ -39,11 +39,11 @@ impl ParquetFileBuilder { } } - /// Set the partition id - pub fn with_partition(self, id: i64) -> Self { + /// Set the partition identifier + pub fn with_partition(self, partition_id: TransitionPartitionId) -> Self { Self { file: ParquetFile { - partition_id: PartitionId::new(id), + partition_id, ..self.file }, } diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index c24c1f6b3e..e1be414f1f 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -602,8 +602,7 @@ impl TestPartition { let parquet_file_params = ParquetFileParams { namespace_id: self.namespace.namespace.id, table_id: self.table.table.id, - partition_id: self.partition.id, - partition_hash_id: self.partition.hash_id().cloned(), + partition_id: self.partition.transition_partition_id(), object_store_id: object_store_id.unwrap_or_else(Uuid::new_v4), min_time: Timestamp::new(min_time), max_time: Timestamp::new(max_time), diff --git a/iox_tests/src/lib.rs b/iox_tests/src/lib.rs index 60cdf73bab..79be4553ba 100644 --- a/iox_tests/src/lib.rs +++ b/iox_tests/src/lib.rs @@ -17,6 +17,8 @@ // Workaround for "unused crate" lint false positives. use workspace_hack as _; +use data_types::{PartitionHashId, PartitionKey, TableId, TransitionPartitionId}; + mod catalog; pub use catalog::{ TestCatalog, TestNamespace, TestParquetFile, TestParquetFileBuilder, TestPartition, TestTable, @@ -24,3 +26,14 @@ pub use catalog::{ mod builders; pub use builders::{ParquetFileBuilder, PartitionBuilder, SkippedCompactionBuilder, TableBuilder}; + +/// Create a partition identifier from an int (which gets used as the table ID) and a partition key +/// with the string "arbitrary". Most useful in cases where there isn't any actual catalog +/// interaction (that is, in mocks) and when the important property of the partition identifiers is +/// that they're either the same or different than other partition identifiers. +pub fn partition_identifier(table_id: i64) -> TransitionPartitionId { + TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(table_id), + &PartitionKey::from("arbitrary"), + )) +} diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index ff78087adf..9a852729d5 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -108,7 +108,7 @@ impl From<&ParquetFile> for ParquetFilePath { Self { namespace_id: f.namespace_id, table_id: f.table_id, - partition_id: f.transition_partition_id(), + partition_id: f.partition_id.clone(), object_store_id: f.object_store_id, } } @@ -119,7 +119,7 @@ impl From<&ParquetFileParams> for ParquetFilePath { Self { namespace_id: f.namespace_id, table_id: f.table_id, - partition_id: f.transition_partition_id(), + partition_id: f.partition_id.clone(), object_store_id: f.object_store_id, } } diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 389f0e45c3..7a9dcfa75b 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -91,7 +91,7 @@ use bytes::Bytes; use data_types::{ ColumnId, ColumnSet, ColumnSummary, CompactionLevel, InfluxDbType, NamespaceId, ParquetFileParams, PartitionHashId, PartitionId, PartitionKey, StatValues, Statistics, TableId, - Timestamp, + Timestamp, TransitionPartitionId, }; use generated_types::influxdata::iox::ingester::v1 as proto; use iox_time::Time; @@ -443,6 +443,7 @@ impl IoxMetadata { where F: for<'a> Fn(&'a str) -> ColumnId, { + let partition_id = TransitionPartitionId::from((partition_id, partition_hash_id.as_ref())); let decoded = metadata.decode().expect("invalid IOx metadata"); trace!( ?partition_id, @@ -487,7 +488,6 @@ impl IoxMetadata { namespace_id: self.namespace_id, table_id: self.table_id, partition_id, - partition_hash_id, object_store_id: self.object_store_id, min_time, max_time, diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index 0cca211320..a6e3bade51 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -361,8 +361,8 @@ mod tests { partition.create_parquet_file(builder).await; let table_id = table.table.id; - let single_file_size = 240; - let two_file_size = 448; + let single_file_size = 256; + let two_file_size = 480; assert!(single_file_size < two_file_size); let cache = make_cache(&catalog); diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 7226ecd4cb..ed90f25530 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -17,7 +17,7 @@ use cache_system::{ }; use data_types::{ partition_template::{build_column_values, ColumnValue}, - ColumnId, Partition, PartitionId, TransitionPartitionId, + ColumnId, Partition, TransitionPartitionId, }; use datafusion::scalar::ScalarValue; use iox_catalog::{interface::Catalog, partition_lookup_batch}; @@ -38,7 +38,7 @@ const CACHE_ID: &str = "partition"; type CacheT = Box< dyn Cache< - K = PartitionId, + K = TransitionPartitionId, V = Option, GetExtra = (Arc, Option), PeekExtra = ((), Option), @@ -49,7 +49,7 @@ type CacheT = Box< #[derive(Debug)] pub struct PartitionCache { cache: CacheT, - remove_if_handle: RemoveIfHandle>, + remove_if_handle: RemoveIfHandle>, flusher: Arc, } @@ -64,7 +64,8 @@ impl PartitionCache { testing: bool, ) -> Self { let loader = FunctionLoader::new( - move |partition_ids: Vec, cached_tables: Vec>| { + move |partition_ids: Vec, + cached_tables: Vec>| { // sanity checks assert_eq!(partition_ids.len(), cached_tables.len()); @@ -75,23 +76,20 @@ impl PartitionCache { // prepare output buffer let mut out = (0..partition_ids.len()).map(|_| None).collect::>(); let mut out_map = - HashMap::::with_capacity(partition_ids.len()); + HashMap::::with_capacity(partition_ids.len()); for (idx, id) in partition_ids.iter().enumerate() { - match out_map.entry(*id) { - Entry::Occupied(_) => unreachable!("cache system requested same partition from loader concurrently, this should have been prevented by the CacheDriver"), + match out_map.entry(id.clone()) { + Entry::Occupied(_) => unreachable!( + "cache system requested same partition from loader concurrently, \ + this should have been prevented by the CacheDriver" + ), Entry::Vacant(v) => { v.insert(idx); } } } - // build `&[&TransitionPartitionId]` for batch catalog request - let ids = partition_ids - .iter() - .copied() - .map(TransitionPartitionId::Deprecated) - .collect::>(); - let ids = ids.iter().collect::>(); + let ids: Vec<&TransitionPartitionId> = partition_ids.iter().collect(); // fetch catalog data let partitions = Backoff::new(&backoff_config) @@ -104,7 +102,7 @@ impl PartitionCache { // build output for p in partitions { - let idx = out_map[&p.id]; + let idx = out_map[&p.transition_partition_id()]; let cached_table = &cached_tables[idx]; let p = CachedPartition::new(p, cached_table); out[idx] = Some(p); @@ -180,7 +178,7 @@ impl PartitionCache { self.remove_if_handle.remove_if_and_get( &self.cache, - partition_id, + partition_id.clone(), move |cached_partition| { let invalidates = if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) @@ -195,7 +193,7 @@ impl PartitionCache { if invalidates { debug!( - partition_id = partition_id.get(), + partition_id = %partition_id, "invalidate partition cache", ); } @@ -217,13 +215,13 @@ impl PartitionCache { /// Request for [`PartitionCache::get`]. #[derive(Debug)] pub struct PartitionRequest { - pub partition_id: PartitionId, + pub partition_id: TransitionPartitionId, pub sort_key_should_cover: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct CachedPartition { - pub id: PartitionId, + pub id: TransitionPartitionId, pub sort_key: Option>, pub column_ranges: ColumnRanges, } @@ -299,7 +297,7 @@ impl CachedPartition { column_ranges.shrink_to_fit(); Self { - id: partition.id, + id: partition.transition_partition_id(), sort_key, column_ranges: Arc::new(column_ranges), } @@ -368,7 +366,10 @@ mod tests { ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count, }; use async_trait::async_trait; - use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType}; + use data_types::{ + partition_template::TablePartitionTemplateOverride, ColumnType, PartitionHashId, + PartitionId, PartitionKey, TableId, + }; use futures::StreamExt; use generated_types::influxdata::iox::partition_template::v1::{ template_part::Part, PartitionTemplate, TemplatePart, @@ -419,8 +420,11 @@ mod tests { true, ); + let p1_id = p1.transition_partition_id(); + let p2_id = p2.transition_partition_id(); + let sort_key1a = cache - .get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .get_one(Arc::clone(&cached_table), &p1_id, &Vec::new(), None) .await .unwrap() .sort_key; @@ -434,24 +438,24 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); let sort_key2 = cache - .get_one(Arc::clone(&cached_table), p2.id, &Vec::new(), None) + .get_one(Arc::clone(&cached_table), &p2_id, &Vec::new(), None) .await .unwrap() .sort_key; assert_eq!(sort_key2, None); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); let sort_key1b = cache - .get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .get_one(Arc::clone(&cached_table), &p1_id, &Vec::new(), None) .await .unwrap() .sort_key; @@ -461,16 +465,37 @@ mod tests { )); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); // non-existing partition for _ in 0..2 { + // Non-existing partition identified by partition hash ID let res = cache .get_one( Arc::clone(&cached_table), - PartitionId::new(i64::MAX), + &TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(i64::MAX), + &PartitionKey::from("bananas_not_found"), + )), + &[], + None, + ) + .await; + assert_eq!(res, None); + assert_catalog_access_metric_count( + &catalog.metric_registry, + "partition_get_by_hash_id_batch", + 3, + ); + + // Non-existing partition identified by deprecated catalog IDs; this part can be + // removed when partition identification is fully transitioned to partition hash IDs + let res = cache + .get_one( + Arc::clone(&cached_table), + &TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)), &Vec::new(), None, ) @@ -479,7 +504,7 @@ mod tests { assert_catalog_access_metric_count( &catalog.metric_registry, "partition_get_by_id_batch", - 3, + 1, ); } } @@ -548,8 +573,14 @@ mod tests { true, ); + let p1_id = p1.transition_partition_id(); + let p2_id = p2.transition_partition_id(); + let p3_id = p3.transition_partition_id(); + let p4_id = p4.transition_partition_id(); + let p5_id = p5.transition_partition_id(); + let ranges1a = cache - .get_one(Arc::clone(&cached_table), p1.id, &[], None) + .get_one(Arc::clone(&cached_table), &p1_id, &[], None) .await .unwrap() .column_ranges; @@ -578,12 +609,12 @@ mod tests { )); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); let ranges2 = cache - .get_one(Arc::clone(&cached_table), p2.id, &[], None) + .get_one(Arc::clone(&cached_table), &p2_id, &[], None) .await .unwrap() .column_ranges; @@ -599,12 +630,12 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); let ranges3 = cache - .get_one(Arc::clone(&cached_table), p3.id, &[], None) + .get_one(Arc::clone(&cached_table), &p3_id, &[], None) .await .unwrap() .column_ranges; @@ -629,12 +660,12 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 3, ); let ranges4 = cache - .get_one(Arc::clone(&cached_table), p4.id, &[], None) + .get_one(Arc::clone(&cached_table), &p4_id, &[], None) .await .unwrap() .column_ranges; @@ -659,12 +690,12 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 4, ); let ranges5 = cache - .get_one(Arc::clone(&cached_table), p5.id, &[], None) + .get_one(Arc::clone(&cached_table), &p5_id, &[], None) .await .unwrap() .column_ranges; @@ -680,28 +711,48 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 5, ); let ranges1b = cache - .get_one(Arc::clone(&cached_table), p1.id, &[], None) + .get_one(Arc::clone(&cached_table), &p1_id, &[], None) .await .unwrap() .column_ranges; assert!(Arc::ptr_eq(&ranges1a, &ranges1b)); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 5, ); - // non-existing partition for _ in 0..2 { + // Non-existing partition identified by partition hash ID let res = cache .get_one( Arc::clone(&cached_table), - PartitionId::new(i64::MAX), + &TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(i64::MAX), + &PartitionKey::from("bananas_not_found"), + )), + &[], + None, + ) + .await; + assert_eq!(res, None); + assert_catalog_access_metric_count( + &catalog.metric_registry, + "partition_get_by_hash_id_batch", + 6, + ); + + // Non-existing partition identified by deprecated catalog IDs; this part can be + // removed when partition identification is fully transitioned to partition hash IDs + let res = cache + .get_one( + Arc::clone(&cached_table), + &TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)), &[], None, ) @@ -710,7 +761,7 @@ mod tests { assert_catalog_access_metric_count( &catalog.metric_registry, "partition_get_by_id_batch", - 6, + 1, ); } } @@ -724,7 +775,7 @@ mod tests { let c1 = t.create_column("foo", ColumnType::Tag).await; let c2 = t.create_column("time", ColumnType::Time).await; let p = t.create_partition("k1").await; - let p_id = p.partition.id; + let p_id = p.partition.transition_partition_id(); let p_sort_key = p.partition.sort_key(); let cached_table = Arc::new(CachedTable { id: t.table.id, @@ -751,41 +802,41 @@ mod tests { ); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[], None) + .get_one(Arc::clone(&cached_table), &p_id, &[], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); // requesting nother will not expire assert!(p_sort_key.is_none()); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[], None) + .get_one(Arc::clone(&cached_table), &p_id, &[], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); // but requesting something will expire let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .get_one(Arc::clone(&cached_table), &p_id, &[c1.column.id], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); @@ -801,7 +852,7 @@ mod tests { // expire & fetch let p_sort_key = p.partition.sort_key(); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .get_one(Arc::clone(&cached_table), &p_id, &[c1.column.id], None) .await .unwrap() .sort_key; @@ -815,7 +866,7 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 3, ); @@ -827,7 +878,7 @@ mod tests { vec![c1.column.id, c2.column.id], ] { let sort_key_2 = cache - .get_one(Arc::clone(&cached_table), p_id, &should_cover, None) + .get_one(Arc::clone(&cached_table), &p_id, &should_cover, None) .await .unwrap() .sort_key; @@ -837,7 +888,7 @@ mod tests { )); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 3, ); } @@ -847,7 +898,7 @@ mod tests { let sort_key_2 = cache .get_one( Arc::clone(&cached_table), - p_id, + &p_id, &[c1.column.id, c3.column.id], None, ) @@ -861,7 +912,7 @@ mod tests { assert_eq!(sort_key, sort_key_2); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 4, ); } @@ -892,34 +943,45 @@ mod tests { true, ); + let p1_id = p1.transition_partition_id(); + let p2_id = p2.transition_partition_id(); + let mut res = cache .get( Arc::clone(&cached_table), vec![ PartitionRequest { - partition_id: p1.id, + partition_id: p1_id.clone(), sort_key_should_cover: vec![], }, PartitionRequest { - partition_id: p2.id, + partition_id: p2_id.clone(), sort_key_should_cover: vec![], }, PartitionRequest { - partition_id: p1.id, + partition_id: p1_id.clone(), + sort_key_should_cover: vec![], + }, + // requesting non-existing partitions is fine, they just don't appear in + // the output + PartitionRequest { + partition_id: TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)), sort_key_should_cover: vec![], }, PartitionRequest { - // requesting non-existing partitions is fine, they just don't appear in the output - partition_id: PartitionId::new(i64::MAX), + partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(i64::MAX), + &PartitionKey::from("bananas_not_found"), + )), sort_key_should_cover: vec![], }, ], None, ) .await; - res.sort_by_key(|p| p.id); - let ids = res.iter().map(|p| p.id).collect::>(); - assert_eq!(ids, vec![p1.id, p1.id, p2.id]); + res.sort_by(|a, b| a.id.cmp(&b.id)); + let ids = res.into_iter().map(|p| p.id).collect::>(); + assert_eq!(ids, vec![p1_id.clone(), p1_id, p2_id]); assert_catalog_access_metric_count( &catalog.metric_registry, "partition_get_by_id_batch", @@ -1008,7 +1070,7 @@ mod tests { c_id: ColumnId, /// Partitions within that table. - partitions: Vec, + partitions: Vec, } impl ConcurrencyTestState { @@ -1032,7 +1094,7 @@ mod tests { t.create_partition_with_sort_key(&format!("p{i}"), &["time"]) .await .partition - .id + .transition_partition_id() } }) .collect::>() @@ -1046,7 +1108,8 @@ mod tests { } } - /// Perform the actual [`PartitionCache::get`] call and run some basic sanity checks on the result. + /// Perform the actual [`PartitionCache::get`] call and run some basic sanity checks on the + /// result. async fn run(self, cache: Arc) { let Self { cached_table, @@ -1060,15 +1123,15 @@ mod tests { partitions .iter() .map(|p| PartitionRequest { - partition_id: *p, + partition_id: p.clone(), sort_key_should_cover: vec![], }) .collect(), None, ) .await; - results.sort_by_key(|p| p.id); - let partitions_res = results.iter().map(|p| p.id).collect::>(); + results.sort_by(|a, b| a.id.cmp(&b.id)); + let partitions_res = results.iter().map(|p| p.id.clone()).collect::>(); assert_eq!(partitions, partitions_res); assert!(results .iter() @@ -1086,7 +1149,7 @@ mod tests { async fn get_one( &self, cached_table: Arc, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, sort_key_should_cover: &[ColumnId], span: Option, ) -> Option; @@ -1097,14 +1160,14 @@ mod tests { async fn get_one( &self, cached_table: Arc, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, sort_key_should_cover: &[ColumnId], span: Option, ) -> Option { self.get( cached_table, vec![PartitionRequest { - partition_id, + partition_id: partition_id.clone(), sort_key_should_cover: sort_key_should_cover.to_vec(), }], span, diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 01c95f3dbf..7f7bb69037 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -859,10 +859,6 @@ impl IngesterPartition { } } - pub(crate) fn partition_id(&self) -> PartitionId { - self.partition_id - } - pub(crate) fn transition_partition_id(&self) -> TransitionPartitionId { TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref())) } diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index f76a98ed68..3e4a93d0d4 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, PartitionId, TransitionPartitionId}; +use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, TransitionPartitionId}; use futures::StreamExt; use hashbrown::HashSet; use iox_catalog::interface::Catalog; @@ -56,7 +56,7 @@ impl ChunkAdapter { &self, cached_table: Arc, files: Arc<[Arc]>, - cached_partitions: &HashMap, + cached_partitions: &HashMap, span: Option, ) -> Vec { let span_recorder = SpanRecorder::new(span); @@ -170,18 +170,13 @@ impl ChunkAdapter { let order = ChunkOrder::new(parquet_file.file.max_l0_created_at.get()); - let partition_id = parquet_file.file.partition_id; - let transition_partition_id = TransitionPartitionId::from(( - partition_id, - parquet_file.file.partition_hash_id.as_ref(), - )); + let partition_id = parquet_file.file.partition_id.clone(); let meta = Arc::new(QuerierParquetChunkMeta { chunk_id, order, sort_key: Some(sort_key), partition_id, - transition_partition_id, }); let parquet_chunk = Arc::new(ParquetChunk::new( diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index c3794a82dc..cf2100fcd0 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -1,6 +1,6 @@ //! Querier Chunks -use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId}; +use data_types::{ChunkId, ChunkOrder, TransitionPartitionId}; use datafusion::physical_plan::Statistics; use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRanges}; use parquet_file::chunk::ParquetChunk; @@ -25,10 +25,7 @@ pub struct QuerierParquetChunkMeta { sort_key: Option, /// Partition ID. - partition_id: PartitionId, - - /// Transition partition ID. - transition_partition_id: TransitionPartitionId, + partition_id: TransitionPartitionId, } impl QuerierParquetChunkMeta { @@ -43,13 +40,8 @@ impl QuerierParquetChunkMeta { } /// Partition ID. - pub fn partition_id(&self) -> PartitionId { - self.partition_id - } - - /// Partition ID. - pub fn transition_partition_id(&self) -> &TransitionPartitionId { - &self.transition_partition_id + pub fn partition_id(&self) -> &TransitionPartitionId { + &self.partition_id } } @@ -251,7 +243,7 @@ pub mod tests { .get( Arc::clone(&self.cached_table), vec![PartitionRequest { - partition_id: self.parquet_file.partition_id, + partition_id: self.parquet_file.partition_id.clone(), sort_key_should_cover: vec![], }], None, @@ -261,7 +253,7 @@ pub mod tests { .next() .unwrap(); let cached_partitions = - HashMap::from([(self.parquet_file.partition_id, cached_partition)]); + HashMap::from([(self.parquet_file.partition_id.clone(), cached_partition)]); self.adapter .new_chunks( Arc::clone(&self.cached_table), diff --git a/querier/src/parquet/query_access.rs b/querier/src/parquet/query_access.rs index 1e9cf44743..fe87f4c9ce 100644 --- a/querier/src/parquet/query_access.rs +++ b/querier/src/parquet/query_access.rs @@ -15,11 +15,11 @@ impl QueryChunk for QuerierParquetChunk { } fn partition_id(&self) -> PartitionId { - self.meta().partition_id() + unimplemented!() } fn transition_partition_id(&self) -> &TransitionPartitionId { - self.meta().transition_partition_id() + self.meta().partition_id() } fn sort_key(&self) -> Option<&SortKey> { diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 52750ec47e..26268ae49c 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -8,7 +8,7 @@ use crate::{ parquet::ChunkAdapter, IngesterConnection, }; -use data_types::{ColumnId, NamespaceId, ParquetFile, PartitionId, TableId}; +use data_types::{ColumnId, NamespaceId, ParquetFile, TableId, TransitionPartitionId}; use datafusion::error::DataFusionError; use futures::join; use iox_query::{provider, provider::ChunkPruner, QueryChunk}; @@ -282,7 +282,7 @@ impl QuerierTable { let chunks = partitions .into_iter() .filter_map(|mut c| { - let cached_partition = cached_partitions.get(&c.partition_id())?; + let cached_partition = cached_partitions.get(&c.transition_partition_id())?; c.set_partition_column_ranges(&cached_partition.column_ranges); Some(c) }) @@ -322,16 +322,16 @@ impl QuerierTable { ingester_partitions: &[IngesterPartition], parquet_files: &[Arc], span: Option, - ) -> HashMap { + ) -> HashMap { let span_recorder = SpanRecorder::new(span); - let mut should_cover: HashMap> = + let mut should_cover: HashMap> = HashMap::with_capacity(ingester_partitions.len()); // For ingester partitions we only need the column ranges -- which are static -- not the sort key. So it is // sufficient to collect the partition IDs. for p in ingester_partitions { - should_cover.entry(p.partition_id()).or_default(); + should_cover.entry(p.transition_partition_id()).or_default(); } // For parquet files we must ensure that the -- potentially evolving -- sort key coveres the primary key. @@ -342,7 +342,7 @@ impl QuerierTable { .collect::>(); for f in parquet_files { should_cover - .entry(f.partition_id) + .entry(f.partition_id.clone()) .or_default() .extend(f.column_set.iter().copied().filter(|id| pk.contains(id))); } @@ -366,7 +366,7 @@ impl QuerierTable { ) .await; - partitions.into_iter().map(|p| (p.id, p)).collect() + partitions.into_iter().map(|p| (p.id.clone(), p)).collect() } /// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks) @@ -889,7 +889,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 2); @@ -899,7 +899,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 4); @@ -912,7 +912,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); @@ -922,7 +922,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 6); @@ -936,7 +936,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 8); diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index 356539060d..c5787ac07e 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -18,7 +18,7 @@ // Workaround for "unused crate" lint false positives. use workspace_hack as _; -use data_types::{PartitionId, TableId, TransitionPartitionId}; +use data_types::{PartitionHashId, PartitionId, TableId, TransitionPartitionId}; use generated_types::influxdata::iox::catalog::v1::*; use iox_catalog::interface::{Catalog, SoftDeletedRows}; use observability_deps::tracing::*; @@ -47,14 +47,14 @@ impl catalog_service_server::CatalogService for CatalogService { ) -> Result, Status> { let mut repos = self.catalog.repositories().await; let req = request.into_inner(); - let partition_id = TransitionPartitionId::Deprecated(PartitionId::new(req.partition_id)); + let partition_id = to_partition_id(req.partition_identifier)?; let parquet_files = repos .parquet_files() .list_by_partition_not_to_delete(&partition_id) .await .map_err(|e| { - warn!(error=%e, %req.partition_id, "failed to get parquet_files for partition"); + warn!(error=%e, %partition_id, "failed to get parquet_files for partition"); Status::not_found(e.to_string()) })?; @@ -169,13 +169,52 @@ impl catalog_service_server::CatalogService for CatalogService { } } +fn to_partition_identifier(partition_id: &TransitionPartitionId) -> PartitionIdentifier { + match partition_id { + TransitionPartitionId::Deterministic(hash_id) => PartitionIdentifier { + id: Some(partition_identifier::Id::HashId( + hash_id.as_bytes().to_owned(), + )), + }, + TransitionPartitionId::Deprecated(id) => PartitionIdentifier { + id: Some(partition_identifier::Id::CatalogId(id.get())), + }, + } +} + +fn to_partition_id( + partition_identifier: Option, +) -> Result { + let partition_id = + match partition_identifier + .and_then(|pi| pi.id) + .ok_or(Status::invalid_argument( + "No partition identifier specified", + ))? { + partition_identifier::Id::HashId(bytes) => TransitionPartitionId::Deterministic( + PartitionHashId::try_from(&bytes[..]).map_err(|e| { + Status::invalid_argument(format!( + "Could not parse bytes as a `PartitionHashId`: {e}" + )) + })?, + ), + partition_identifier::Id::CatalogId(id) => { + TransitionPartitionId::Deprecated(PartitionId::new(id)) + } + }; + + Ok(partition_id) +} + // converts the catalog ParquetFile to protobuf fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile { + let partition_identifier = to_partition_identifier(&p.partition_id); + ParquetFile { id: p.id.get(), namespace_id: p.namespace_id.get(), table_id: p.table_id.get(), - partition_id: p.partition_id.get(), + partition_identifier: Some(partition_identifier), object_store_id: p.object_store_id.to_string(), min_time: p.min_time.get(), max_time: p.max_time.get(), @@ -191,8 +230,10 @@ fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile { // converts the catalog Partition to protobuf fn to_partition(p: data_types::Partition) -> Partition { + let identifier = to_partition_identifier(&p.transition_partition_id()); + Partition { - id: p.id.get(), + identifier: Some(identifier), key: p.partition_key.to_string(), table_id: p.table_id.get(), array_sort_key: p.sort_key, @@ -230,8 +271,7 @@ mod tests { let p1params = ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(5), @@ -248,13 +288,15 @@ mod tests { }; p1 = repos.parquet_files().create(p1params).await.unwrap(); p2 = repos.parquet_files().create(p2params).await.unwrap(); - partition_id = partition.id; + partition_id = partition.transition_partition_id(); Arc::clone(&catalog) }; + let partition_identifier = to_partition_identifier(&partition_id); + let grpc = super::CatalogService::new(catalog); let request = GetParquetFilesByPartitionIdRequest { - partition_id: partition_id.get(), + partition_identifier: Some(partition_identifier), }; let tonic_response = grpc diff --git a/service_grpc_object_store/src/lib.rs b/service_grpc_object_store/src/lib.rs index 70059c3b2f..8ab7b7adcb 100644 --- a/service_grpc_object_store/src/lib.rs +++ b/service_grpc_object_store/src/lib.rs @@ -75,7 +75,7 @@ impl object_store_service_server::ObjectStoreService for ObjectStoreService { let path = ParquetFilePath::new( parquet_file.namespace_id, parquet_file.table_id, - &parquet_file.transition_partition_id(), + &parquet_file.partition_id.clone(), parquet_file.object_store_id, ); let path = path.object_store_path(); @@ -128,8 +128,7 @@ mod tests { let p1params = ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(5), @@ -150,7 +149,7 @@ mod tests { let path = ParquetFilePath::new( p1.namespace_id, p1.table_id, - &p1.transition_partition_id(), + &p1.partition_id.clone(), p1.object_store_id, ); let path = path.object_store_path();