From 4a9e76b8b777ddc41819aa12f6f776f4fb399bb4 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" <193874+carols10cents@users.noreply.github.com> Date: Mon, 31 Jul 2023 08:40:56 -0400 Subject: [PATCH] feat: Make parquet_file.partition_id optional in the catalog (#8339) * feat: Make parquet_file.partition_id optional in the catalog This will acquire a short lock on the table in postgres, per: This allows us to persist data for new partitions and associate the Parquet file catalog records with the partition records using only the partition hash ID, rather than both that are used now. * fix: Support transition partition ID in the catalog service * fix: Use transition partition ID in import/export This commit also removes support for the `--partition-id` flag of the `influxdb_iox remote store get-table` command, which Andrew approved. The `--partition-id` filter was getting the results of the catalog gRPC service's query for Parquet files of a table and then keeping only the files whose partition IDs matched. The gRPC query is no longer returning the partition ID from the Parquet file table, and really, this command should instead be using `GetParquetFilesByPartitionId` to only request what's needed rather than filtering. * feat: Support looking up Parquet files by either kind of Partition id Regardless of which is actually stored on the Parquet file record. That is, say there's a Partition in the catalog with: Partition { id: 3, hash_id: abcdefg, } and a Parquet file that has: ParquetFile { partition_hash_id: abcdefg, } calling `list_by_partition_not_to_delete(PartitionId(3))` should still return this Parquet file because it is associated with the partition that has ID 3. This is important for the compactor, which is currently only dealing in PartitionIds, and I'd like to keep it that way for now to avoid having to change Even More in this PR. * fix: Use and set new partition ID fields everywhere they want to be --------- Co-authored-by: Dom --- .../src/components/df_planner/query_chunk.rs | 2 +- .../src/components/parquet_file_sink/mock.rs | 15 +- .../components/partition_files_source/mock.rs | 74 ++++-- compactor_scheduler/src/commit/logging.rs | 31 ++- compactor_scheduler/src/commit/metrics.rs | 28 +-- compactor_scheduler/src/commit/mock.rs | 42 ++-- .../tests/local_scheduler/mod.rs | 8 +- compactor_test_utils/src/simulator.rs | 3 +- data_types/src/lib.rs | 37 +-- data_types/src/partition.rs | 28 +++ garbage_collector/src/objectstore/checker.rs | 9 +- generated_types/build.rs | 1 + .../iox/catalog/v1/parquet_file.proto | 9 +- .../iox/catalog/v1/partition_identifier.proto | 12 + .../influxdata/iox/catalog/v1/service.proto | 14 +- import_export/src/file/export.rs | 78 +++---- import_export/src/file/import.rs | 11 +- influxdb_iox/src/commands/remote/store.rs | 8 - .../tests/end_to_end_cases/compactor.rs | 20 +- influxdb_iox/tests/end_to_end_cases/remote.rs | 7 +- influxdb_iox_client/src/client/catalog.rs | 8 +- ingester/src/persist/completion_observer.rs | 18 +- ingester/src/persist/file_metrics.rs | 7 +- ingester/src/persist/mod.rs | 55 +++-- ingester/src/persist/queue.rs | 12 +- ingester/src/test_util.rs | 3 +- ...ake_parquet_file_partition_id_optional.sql | 24 ++ ...ake_parquet_file_partition_id_optional.sql | 98 ++++++++ iox_catalog/src/interface.rs | 33 +-- iox_catalog/src/lib.rs | 3 +- iox_catalog/src/mem.rs | 32 ++- iox_catalog/src/postgres.rs | 31 ++- iox_catalog/src/sqlite.rs | 38 ++-- iox_tests/src/builders.rs | 10 +- iox_tests/src/catalog.rs | 3 +- iox_tests/src/lib.rs | 13 ++ parquet_file/src/lib.rs | 4 +- parquet_file/src/metadata.rs | 4 +- querier/src/cache/parquet_file.rs | 4 +- querier/src/cache/partition.rs | 211 ++++++++++++------ querier/src/ingester/mod.rs | 4 - querier/src/parquet/creation.rs | 11 +- querier/src/parquet/mod.rs | 20 +- querier/src/parquet/query_access.rs | 4 +- querier/src/table/mod.rs | 24 +- service_grpc_catalog/src/lib.rs | 60 ++++- service_grpc_object_store/src/lib.rs | 7 +- 47 files changed, 744 insertions(+), 434 deletions(-) create mode 100644 generated_types/protos/influxdata/iox/catalog/v1/partition_identifier.proto create mode 100644 iox_catalog/migrations/20230726175943_make_parquet_file_partition_id_optional.sql create mode 100644 iox_catalog/sqlite/migrations/20230726175943_make_parquet_file_partition_id_optional.sql diff --git a/compactor/src/components/df_planner/query_chunk.rs b/compactor/src/components/df_planner/query_chunk.rs index af69fe471f..7c930a7e12 100644 --- a/compactor/src/components/df_planner/query_chunk.rs +++ b/compactor/src/components/df_planner/query_chunk.rs @@ -171,7 +171,7 @@ fn to_queryable_parquet_chunk( parquet_file_id = file.file.id.get(), parquet_file_namespace_id = file.file.namespace_id.get(), parquet_file_table_id = file.file.table_id.get(), - parquet_file_partition_id = file.file.partition_id.get(), + parquet_file_partition_id = %file.file.partition_id, parquet_file_object_store_id = uuid.to_string().as_str(), "built parquet chunk from metadata" ); diff --git a/compactor/src/components/parquet_file_sink/mock.rs b/compactor/src/components/parquet_file_sink/mock.rs index 977f8c810e..b67e9b9615 100644 --- a/compactor/src/components/parquet_file_sink/mock.rs +++ b/compactor/src/components/parquet_file_sink/mock.rs @@ -70,8 +70,7 @@ impl ParquetFileSink for MockParquetFileSink { let out = ((row_count > 0) || !self.filter_empty_files).then(|| ParquetFileParams { namespace_id: partition.namespace_id, table_id: partition.table.id, - partition_id: partition.partition_id, - partition_hash_id: partition.partition_hash_id.clone(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::from_u128(guard.len() as u128), min_time: Timestamp::new(0), max_time: Timestamp::new(0), @@ -95,7 +94,7 @@ impl ParquetFileSink for MockParquetFileSink { #[cfg(test)] mod tests { use arrow_util::assert_batches_eq; - use data_types::{NamespaceId, PartitionId, TableId}; + use data_types::{NamespaceId, TableId}; use datafusion::{ arrow::{array::new_null_array, datatypes::DataType}, physical_plan::stream::RecordBatchStreamAdapter, @@ -159,7 +158,7 @@ mod tests { Arc::clone(&schema), futures::stream::once(async move { Ok(record_batch_captured) }), )); - let partition_hash_id = partition.partition_hash_id.clone(); + let partition_id = partition.transition_partition_id(); assert_eq!( sink.store(stream, Arc::clone(&partition), level, max_l0_created_at) .await @@ -167,8 +166,7 @@ mod tests { Some(ParquetFileParams { namespace_id: NamespaceId::new(2), table_id: TableId::new(3), - partition_id: PartitionId::new(1), - partition_hash_id, + partition_id, object_store_id: Uuid::from_u128(2), min_time: Timestamp::new(0), max_time: Timestamp::new(0), @@ -223,7 +221,7 @@ mod tests { Arc::clone(&schema), futures::stream::empty(), )); - let partition_hash_id = partition.partition_hash_id.clone(); + let partition_id = partition.transition_partition_id(); assert_eq!( sink.store(stream, Arc::clone(&partition), level, max_l0_created_at) .await @@ -231,8 +229,7 @@ mod tests { Some(ParquetFileParams { namespace_id: NamespaceId::new(2), table_id: TableId::new(3), - partition_id: PartitionId::new(1), - partition_hash_id, + partition_id, object_store_id: Uuid::from_u128(0), min_time: Timestamp::new(0), max_time: Timestamp::new(0), diff --git a/compactor/src/components/partition_files_source/mock.rs b/compactor/src/components/partition_files_source/mock.rs index ee82915bcf..7393d21f36 100644 --- a/compactor/src/components/partition_files_source/mock.rs +++ b/compactor/src/components/partition_files_source/mock.rs @@ -1,19 +1,35 @@ use std::{collections::HashMap, fmt::Display}; -use async_trait::async_trait; -use data_types::{ParquetFile, PartitionId}; - use super::PartitionFilesSource; +use async_trait::async_trait; +use data_types::{ParquetFile, PartitionId, TransitionPartitionId}; #[derive(Debug)] pub struct MockPartitionFilesSource { - files: HashMap>, + // This complexity is because we're in the process of moving to partition hash IDs rather than + // partition catalog IDs, and Parquet files might only have the partition hash ID on their + // record, but the compactor deals with partition catalog IDs because we haven't transitioned + // it yet. This should become simpler when the transition is complete. + partition_lookup: HashMap, + file_lookup: HashMap>, } impl MockPartitionFilesSource { - #[allow(dead_code)] // not used anywhere - pub fn new(files: HashMap>) -> Self { - Self { files } + #[cfg(test)] + pub fn new( + partition_lookup: HashMap, + parquet_files: Vec, + ) -> Self { + let mut file_lookup: HashMap> = HashMap::new(); + for file in parquet_files { + let files = file_lookup.entry(file.partition_id.clone()).or_default(); + files.push(file); + } + + Self { + partition_lookup, + file_lookup, + } } } @@ -25,46 +41,60 @@ impl Display for MockPartitionFilesSource { #[async_trait] impl PartitionFilesSource for MockPartitionFilesSource { - async fn fetch(&self, partition: PartitionId) -> Vec { - self.files.get(&partition).cloned().unwrap_or_default() + async fn fetch(&self, partition_id: PartitionId) -> Vec { + self.partition_lookup + .get(&partition_id) + .and_then(|partition_hash_id| self.file_lookup.get(partition_hash_id).cloned()) + .unwrap_or_default() } } #[cfg(test)] mod tests { - use iox_tests::ParquetFileBuilder; - use super::*; + use iox_tests::{partition_identifier, ParquetFileBuilder}; #[test] fn test_display() { assert_eq!( - MockPartitionFilesSource::new(HashMap::default()).to_string(), + MockPartitionFilesSource::new(Default::default(), Default::default()).to_string(), "mock", ) } #[tokio::test] async fn test_fetch() { - let f_1_1 = ParquetFileBuilder::new(1).with_partition(1).build(); - let f_1_2 = ParquetFileBuilder::new(2).with_partition(1).build(); - let f_2_1 = ParquetFileBuilder::new(3).with_partition(2).build(); + let partition_id_1 = PartitionId::new(1); + let partition_id_2 = PartitionId::new(2); + let partition_identifier_1 = partition_identifier(1); + let partition_identifier_2 = partition_identifier(2); + let f_1_1 = ParquetFileBuilder::new(1) + .with_partition(partition_identifier_1.clone()) + .build(); + let f_1_2 = ParquetFileBuilder::new(2) + .with_partition(partition_identifier_1.clone()) + .build(); + let f_2_1 = ParquetFileBuilder::new(3) + .with_partition(partition_identifier_2.clone()) + .build(); - let files = HashMap::from([ - (PartitionId::new(1), vec![f_1_1.clone(), f_1_2.clone()]), - (PartitionId::new(2), vec![f_2_1.clone()]), + let partition_lookup = HashMap::from([ + (partition_id_1, partition_identifier_1.clone()), + (partition_id_2, partition_identifier_2.clone()), ]); - let source = MockPartitionFilesSource::new(files); + + let files = vec![f_1_1.clone(), f_1_2.clone(), f_2_1.clone()]; + let source = MockPartitionFilesSource::new(partition_lookup, files); // different partitions assert_eq!( - source.fetch(PartitionId::new(1)).await, + source.fetch(partition_id_1).await, vec![f_1_1.clone(), f_1_2.clone()], ); - assert_eq!(source.fetch(PartitionId::new(2)).await, vec![f_2_1],); + assert_eq!(source.fetch(partition_id_2).await, vec![f_2_1],); // fetching does not drain - assert_eq!(source.fetch(PartitionId::new(1)).await, vec![f_1_1, f_1_2],); + assert_eq!(source.fetch(partition_id_1).await, vec![f_1_1, f_1_2],); // unknown partition => empty result assert_eq!(source.fetch(PartitionId::new(3)).await, vec![],); diff --git a/compactor_scheduler/src/commit/logging.rs b/compactor_scheduler/src/commit/logging.rs index 439e928d45..00d8e732f3 100644 --- a/compactor_scheduler/src/commit/logging.rs +++ b/compactor_scheduler/src/commit/logging.rs @@ -78,14 +78,12 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - - use assert_matches::assert_matches; - use test_helpers::tracing::TracingCapture; - use super::*; use crate::commit::mock::{CommitHistoryEntry, MockCommit}; - use iox_tests::ParquetFileBuilder; + use assert_matches::assert_matches; + use iox_tests::{partition_identifier, ParquetFileBuilder}; + use std::sync::Arc; + use test_helpers::tracing::TracingCapture; #[test] fn test_display() { @@ -111,14 +109,21 @@ mod tests { .with_row_count(105) .build(); - let created_1 = ParquetFileBuilder::new(1000).with_partition(1).build(); - let created_2 = ParquetFileBuilder::new(1001).with_partition(1).build(); + let partition_id_1 = PartitionId::new(1); + let transition_partition_id_1 = partition_identifier(1); + + let created_1 = ParquetFileBuilder::new(1000) + .with_partition(transition_partition_id_1.clone()) + .build(); + let created_2 = ParquetFileBuilder::new(1001) + .with_partition(transition_partition_id_1) + .build(); let capture = TracingCapture::new(); let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_1.clone()], &[], &[created_1.clone().into(), created_2.clone().into()], @@ -130,9 +135,11 @@ mod tests { Ok(res) if res == vec![ParquetFileId::new(1000), ParquetFileId::new(1001)] ); + let partition_id_2 = PartitionId::new(2); + let ids = commit .commit( - PartitionId::new(2), + partition_id_2, &[existing_2.clone(), existing_3.clone()], &[existing_1.clone()], &[], @@ -151,14 +158,14 @@ level = INFO; message = committed parquet file change; target_level = Final; par inner.history(), vec![ CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_1.clone()], upgrade: vec![], created: vec![created_1, created_2], target_level: CompactionLevel::Final, }, CommitHistoryEntry { - partition_id: PartitionId::new(2), + partition_id: partition_id_2, delete: vec![existing_2, existing_3], upgrade: vec![existing_1], created: vec![], diff --git a/compactor_scheduler/src/commit/metrics.rs b/compactor_scheduler/src/commit/metrics.rs index 0f0b73d2cc..3890eb46c4 100644 --- a/compactor_scheduler/src/commit/metrics.rs +++ b/compactor_scheduler/src/commit/metrics.rs @@ -303,15 +303,12 @@ where #[cfg(test)] mod tests { - use std::sync::Arc; - - use assert_matches::assert_matches; - use metric::{assert_histogram, Attributes}; - - use crate::commit::mock::{CommitHistoryEntry, MockCommit}; - use iox_tests::ParquetFileBuilder; - use super::*; + use crate::commit::mock::{CommitHistoryEntry, MockCommit}; + use assert_matches::assert_matches; + use iox_tests::{partition_identifier, ParquetFileBuilder}; + use metric::{assert_histogram, Attributes}; + use std::sync::Arc; #[test] fn test_display() { @@ -326,6 +323,9 @@ mod tests { let inner = Arc::new(MockCommit::new()); let commit = MetricsCommitWrapper::new(Arc::clone(&inner), ®istry); + let partition_id_1 = PartitionId::new(1); + let transition_partition_id_1 = partition_identifier(1); + let existing_1 = ParquetFileBuilder::new(1) .with_file_size_bytes(10_001) .with_row_count(1_001) @@ -350,7 +350,7 @@ mod tests { let created = ParquetFileBuilder::new(1000) .with_file_size_bytes(10_016) .with_row_count(1_016) - .with_partition(1) + .with_partition(transition_partition_id_1) .with_compaction_level(CompactionLevel::Initial) .build(); @@ -392,7 +392,7 @@ mod tests { let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_1.clone()], &[existing_2a.clone()], &[created.clone().into()], @@ -401,9 +401,11 @@ mod tests { .await; assert_matches!(ids, Ok(res) if res == vec![ParquetFileId::new(1000)]); + let partition_id_2 = PartitionId::new(2); + let ids = commit .commit( - PartitionId::new(2), + partition_id_2, &[existing_2b.clone(), existing_3.clone()], &[existing_4.clone()], &[], @@ -449,14 +451,14 @@ mod tests { inner.history(), vec![ CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_1], upgrade: vec![existing_2a.clone()], created: vec![created], target_level: CompactionLevel::FileNonOverlapped, }, CommitHistoryEntry { - partition_id: PartitionId::new(2), + partition_id: partition_id_2, delete: vec![existing_2b, existing_3], upgrade: vec![existing_4], created: vec![], diff --git a/compactor_scheduler/src/commit/mock.rs b/compactor_scheduler/src/commit/mock.rs index a2e6e43631..8ae20df3d3 100644 --- a/compactor_scheduler/src/commit/mock.rs +++ b/compactor_scheduler/src/commit/mock.rs @@ -78,10 +78,9 @@ impl Commit for MockCommit { #[cfg(test)] mod tests { - use assert_matches::assert_matches; - use iox_tests::ParquetFileBuilder; - use super::*; + use assert_matches::assert_matches; + use iox_tests::{partition_identifier, ParquetFileBuilder}; #[test] fn test_display() { @@ -92,6 +91,11 @@ mod tests { async fn test_commit() { let commit = MockCommit::new(); + let partition_id_1 = PartitionId::new(1); + let transition_partition_id_1 = partition_identifier(1); + let partition_id_2 = PartitionId::new(2); + let transition_partition_id_2 = partition_identifier(2); + let existing_1 = ParquetFileBuilder::new(1).build(); let existing_2 = ParquetFileBuilder::new(2).build(); let existing_3 = ParquetFileBuilder::new(3).build(); @@ -101,14 +105,22 @@ mod tests { let existing_7 = ParquetFileBuilder::new(7).build(); let existing_8 = ParquetFileBuilder::new(8).build(); - let created_1_1 = ParquetFileBuilder::new(1000).with_partition(1).build(); - let created_1_2 = ParquetFileBuilder::new(1001).with_partition(1).build(); - let created_1_3 = ParquetFileBuilder::new(1003).with_partition(1).build(); - let created_2_1 = ParquetFileBuilder::new(1002).with_partition(2).build(); + let created_1_1 = ParquetFileBuilder::new(1000) + .with_partition(transition_partition_id_1.clone()) + .build(); + let created_1_2 = ParquetFileBuilder::new(1001) + .with_partition(transition_partition_id_1.clone()) + .build(); + let created_1_3 = ParquetFileBuilder::new(1003) + .with_partition(transition_partition_id_1) + .build(); + let created_2_1 = ParquetFileBuilder::new(1002) + .with_partition(transition_partition_id_2) + .build(); let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_1.clone(), existing_2.clone()], &[existing_3.clone(), existing_4.clone()], &[created_1_1.clone().into(), created_1_2.clone().into()], @@ -122,7 +134,7 @@ mod tests { let ids = commit .commit( - PartitionId::new(2), + partition_id_2, &[existing_3.clone()], &[], &[created_2_1.clone().into()], @@ -136,7 +148,7 @@ mod tests { let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_5.clone(), existing_6.clone(), existing_7.clone()], &[], &[created_1_3.clone().into()], @@ -151,7 +163,7 @@ mod tests { // simulate fill implosion of the file (this may happen w/ delete predicates) let ids = commit .commit( - PartitionId::new(1), + partition_id_1, &[existing_8.clone()], &[], &[], @@ -167,28 +179,28 @@ mod tests { commit.history(), vec![ CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_1, existing_2], upgrade: vec![existing_3.clone(), existing_4.clone()], created: vec![created_1_1, created_1_2], target_level: CompactionLevel::FileNonOverlapped, }, CommitHistoryEntry { - partition_id: PartitionId::new(2), + partition_id: partition_id_2, delete: vec![existing_3], upgrade: vec![], created: vec![created_2_1], target_level: CompactionLevel::Final, }, CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_5, existing_6, existing_7,], upgrade: vec![], created: vec![created_1_3], target_level: CompactionLevel::FileNonOverlapped, }, CommitHistoryEntry { - partition_id: PartitionId::new(1), + partition_id: partition_id_1, delete: vec![existing_8], upgrade: vec![], created: vec![], diff --git a/compactor_scheduler/tests/local_scheduler/mod.rs b/compactor_scheduler/tests/local_scheduler/mod.rs index a2638be1b5..279ff51702 100644 --- a/compactor_scheduler/tests/local_scheduler/mod.rs +++ b/compactor_scheduler/tests/local_scheduler/mod.rs @@ -4,7 +4,7 @@ use assert_matches::assert_matches; use compactor_scheduler::{ create_scheduler, CompactionJob, LocalSchedulerConfig, Scheduler, SchedulerConfig, }; -use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId}; +use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId, TransitionPartitionId}; use iox_tests::{ParquetFileBuilder, TestCatalog, TestParquetFileBuilder, TestPartition}; mod end_job; @@ -65,7 +65,7 @@ impl TestLocalScheduler { pub async fn create_params_for_new_parquet_file(&self) -> ParquetFileParams { ParquetFileBuilder::new(42) - .with_partition(self.get_partition_id().get()) + .with_partition(self.get_transition_partition_id()) .build() .into() } @@ -81,4 +81,8 @@ impl TestLocalScheduler { pub fn get_partition_id(&self) -> PartitionId { self.test_partition.partition.id } + + pub fn get_transition_partition_id(&self) -> TransitionPartitionId { + self.test_partition.partition.transition_partition_id() + } } diff --git a/compactor_test_utils/src/simulator.rs b/compactor_test_utils/src/simulator.rs index 2f466fcdd4..cf731f5ed5 100644 --- a/compactor_test_utils/src/simulator.rs +++ b/compactor_test_utils/src/simulator.rs @@ -202,8 +202,7 @@ impl SimulatedFile { ParquetFileParams { namespace_id: partition_info.namespace_id, table_id: partition_info.table.id, - partition_id: partition_info.partition_id, - partition_hash_id: partition_info.partition_hash_id.clone(), + partition_id: partition_info.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time, max_time, diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index 97815e1939..a77c1af8d0 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -527,10 +527,9 @@ pub struct ParquetFile { pub namespace_id: NamespaceId, /// the table pub table_id: TableId, - /// the partition - pub partition_id: PartitionId, - /// the partition hash ID, if generated - pub partition_hash_id: Option, + /// the partition identifier + #[sqlx(flatten)] + pub partition_id: TransitionPartitionId, /// the uuid used in the object store path for this file pub object_store_id: Uuid, /// the min timestamp of data in this file @@ -588,7 +587,6 @@ impl ParquetFile { namespace_id: params.namespace_id, table_id: params.table_id, partition_id: params.partition_id, - partition_hash_id: params.partition_hash_id, object_store_id: params.object_store_id, min_time: params.min_time, max_time: params.max_time, @@ -602,21 +600,9 @@ impl ParquetFile { } } - /// If this parquet file params will be storing a `PartitionHashId` in the catalog, use that. - /// Otherwise, use the database-assigned `PartitionId`. - pub fn transition_partition_id(&self) -> TransitionPartitionId { - TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref())) - } - /// Estimate the memory consumption of this object and its contents pub fn size(&self) -> usize { - std::mem::size_of_val(self) - + self - .partition_hash_id - .as_ref() - .map(|id| id.size() - std::mem::size_of_val(id)) - .unwrap_or_default() - + self.column_set.size() + std::mem::size_of_val(self) + self.partition_id.size() + self.column_set.size() - std::mem::size_of_val(&self.column_set) } @@ -638,10 +624,8 @@ pub struct ParquetFileParams { pub namespace_id: NamespaceId, /// the table pub table_id: TableId, - /// the partition - pub partition_id: PartitionId, - /// the partition hash ID, if generated - pub partition_hash_id: Option, + /// the partition identifier + pub partition_id: TransitionPartitionId, /// the uuid used in the object store path for this file pub object_store_id: Uuid, /// the min timestamp of data in this file @@ -662,21 +646,12 @@ pub struct ParquetFileParams { pub max_l0_created_at: Timestamp, } -impl ParquetFileParams { - /// If this parquet file params will be storing a `PartitionHashId` in the catalog, use that. - /// Otherwise, use the database-assigned `PartitionId`. - pub fn transition_partition_id(&self) -> TransitionPartitionId { - TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref())) - } -} - impl From for ParquetFileParams { fn from(value: ParquetFile) -> Self { Self { namespace_id: value.namespace_id, table_id: value.table_id, partition_id: value.partition_id, - partition_hash_id: value.partition_hash_id, object_store_id: value.object_store_id, min_time: value.min_time, max_time: value.max_time, diff --git a/data_types/src/partition.rs b/data_types/src/partition.rs index b8bfde8f56..c370cb3cea 100644 --- a/data_types/src/partition.rs +++ b/data_types/src/partition.rs @@ -31,6 +31,34 @@ impl TransitionPartitionId { } } +impl<'a, R> sqlx::FromRow<'a, R> for TransitionPartitionId +where + R: sqlx::Row, + &'static str: sqlx::ColumnIndex, + PartitionId: sqlx::decode::Decode<'a, R::Database>, + PartitionId: sqlx::types::Type, + Option: sqlx::decode::Decode<'a, R::Database>, + Option: sqlx::types::Type, +{ + fn from_row(row: &'a R) -> sqlx::Result { + let partition_id: Option = row.try_get("partition_id")?; + let partition_hash_id: Option = row.try_get("partition_hash_id")?; + + let transition_partition_id = match (partition_id, partition_hash_id) { + (_, Some(hash_id)) => TransitionPartitionId::Deterministic(hash_id), + (Some(id), _) => TransitionPartitionId::Deprecated(id), + (None, None) => { + return Err(sqlx::Error::ColumnDecode { + index: "partition_id".into(), + source: "Both partition_id and partition_hash_id were NULL".into(), + }) + } + }; + + Ok(transition_partition_id) + } +} + impl From<(PartitionId, Option<&PartitionHashId>)> for TransitionPartitionId { fn from((partition_id, partition_hash_id): (PartitionId, Option<&PartitionHashId>)) -> Self { partition_hash_id diff --git a/garbage_collector/src/objectstore/checker.rs b/garbage_collector/src/objectstore/checker.rs index a8e7a4bb24..0a91f0f6a6 100644 --- a/garbage_collector/src/objectstore/checker.rs +++ b/garbage_collector/src/objectstore/checker.rs @@ -267,8 +267,7 @@ mod tests { let parquet_file_params = ParquetFileParams { namespace_id: namespace.id, table_id: partition.table_id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(10), @@ -298,7 +297,7 @@ mod tests { let location = ParquetFilePath::new( file_in_catalog.namespace_id, file_in_catalog.table_id, - &file_in_catalog.transition_partition_id(), + &file_in_catalog.partition_id.clone(), file_in_catalog.object_store_id, ) .object_store_path(); @@ -376,7 +375,7 @@ mod tests { let location = ParquetFilePath::new( file_in_catalog.namespace_id, file_in_catalog.table_id, - &file_in_catalog.transition_partition_id(), + &file_in_catalog.partition_id.clone(), file_in_catalog.object_store_id, ) .object_store_path(); @@ -469,7 +468,7 @@ mod tests { let loc = ParquetFilePath::new( file_in_catalog.namespace_id, file_in_catalog.table_id, - &file_in_catalog.transition_partition_id(), + &file_in_catalog.partition_id.clone(), file_in_catalog.object_store_id, ) .object_store_path(); diff --git a/generated_types/build.rs b/generated_types/build.rs index 370eb59e8f..409d0c5140 100644 --- a/generated_types/build.rs +++ b/generated_types/build.rs @@ -52,6 +52,7 @@ fn generate_grpc_types(root: &Path) -> Result<()> { let proto_files = vec![ authz_path.join("authz.proto"), catalog_path.join("parquet_file.proto"), + catalog_path.join("partition_identifier.proto"), catalog_path.join("service.proto"), compactor_path.join("service.proto"), delete_path.join("service.proto"), diff --git a/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto b/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto index a7be9b84e8..e61c4430a8 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/parquet_file.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package influxdata.iox.catalog.v1; option go_package = "github.com/influxdata/iox/catalog/v1"; +import "influxdata/iox/catalog/v1/partition_identifier.proto"; + message ParquetFile { reserved 7; reserved "min_sequence_number"; @@ -11,6 +13,8 @@ message ParquetFile { reserved "shard_id"; reserved 8; reserved "max_sequence_number"; + reserved 5; + reserved "partition_id"; // the id of the file in the catalog int64 id = 1; @@ -18,8 +22,9 @@ message ParquetFile { int64 namespace_id = 3; // the table id int64 table_id = 4; - // the partition id - int64 partition_id = 5; + + PartitionIdentifier partition_identifier = 19; + // the object store uuid string object_store_id = 6; // the min timestamp of data in this file diff --git a/generated_types/protos/influxdata/iox/catalog/v1/partition_identifier.proto b/generated_types/protos/influxdata/iox/catalog/v1/partition_identifier.proto new file mode 100644 index 0000000000..eb11c388d2 --- /dev/null +++ b/generated_types/protos/influxdata/iox/catalog/v1/partition_identifier.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package influxdata.iox.catalog.v1; +option go_package = "github.com/influxdata/iox/catalog/v1"; + +message PartitionIdentifier { + // Either the catalog-assigned partition ID or the deterministic identifier created from the + // table ID and partition key. + oneof id { + int64 catalog_id = 1; + bytes hash_id = 2; + } +} diff --git a/generated_types/protos/influxdata/iox/catalog/v1/service.proto b/generated_types/protos/influxdata/iox/catalog/v1/service.proto index bdecee8ec6..3e370874dc 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/service.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/service.proto @@ -3,6 +3,7 @@ package influxdata.iox.catalog.v1; option go_package = "github.com/influxdata/iox/catalog/v1"; import "influxdata/iox/catalog/v1/parquet_file.proto"; +import "influxdata/iox/catalog/v1/partition_identifier.proto"; service CatalogService { // Get the parquet_file catalog records in the given partition @@ -19,8 +20,11 @@ service CatalogService { } message GetParquetFilesByPartitionIdRequest { - // the partition id - int64 partition_id = 1; + // Was the catalog-assigned partition ID. + reserved 1; + reserved "partition_id"; + + PartitionIdentifier partition_identifier = 2; } message GetParquetFilesByPartitionIdResponse { @@ -35,15 +39,17 @@ message Partition { reserved "sequencer_id"; reserved 7; reserved "shard_id"; + reserved 1; + reserved "id"; - // the partition id - int64 id = 1; // the table id the partition is in int64 table_id = 3; // the partition key string key = 4; // the sort key for data in parquet files in the partition repeated string array_sort_key = 6; + + PartitionIdentifier identifier = 8; } message GetPartitionsByTableIdRequest { diff --git a/import_export/src/file/export.rs b/import_export/src/file/export.rs index 4d1a1c6418..a9be88f064 100644 --- a/import_export/src/file/export.rs +++ b/import_export/src/file/export.rs @@ -1,10 +1,13 @@ +use data_types::{PartitionHashId, PartitionId, TransitionPartitionId}; use futures_util::TryStreamExt; use influxdb_iox_client::{ - catalog::{self, generated_types::ParquetFile}, + catalog::{ + self, + generated_types::{partition_identifier, ParquetFile, PartitionIdentifier}, + }, connection::Connection, store, }; -use observability_deps::tracing::{debug, info}; use std::path::{Path, PathBuf}; use thiserror::Error; use tokio::{ @@ -35,10 +38,6 @@ type Result = std::result::Result; pub struct RemoteExporter { catalog_client: catalog::Client, store_client: store::Client, - - /// Optional partition filter. If `Some(partition_id)`, only these - /// files with that `partition_id` are downloaded. - partition_filter: Option, } impl RemoteExporter { @@ -46,19 +45,9 @@ impl RemoteExporter { Self { catalog_client: catalog::Client::new(connection.clone()), store_client: store::Client::new(connection), - partition_filter: None, } } - /// Specify that only files and metadata for the specific - /// partition id should be exported. - pub fn with_partition_filter(mut self, partition_id: i64) -> Self { - info!(partition_id, "Filtering by partition"); - - self.partition_filter = Some(partition_id); - self - } - /// Exports all data and metadata for `table_name` in /// `namespace` to local files. /// @@ -95,39 +84,14 @@ impl RemoteExporter { let indexed_parquet_file_metadata = parquet_files.into_iter().enumerate(); for (index, parquet_file) in indexed_parquet_file_metadata { - if self.should_export(parquet_file.partition_id) { - self.export_parquet_file( - &output_directory, - index, - num_parquet_files, - &parquet_file, - ) + self.export_parquet_file(&output_directory, index, num_parquet_files, &parquet_file) .await?; - } else { - debug!( - "skipping file {} of {num_parquet_files} ({} does not match request)", - index + 1, - parquet_file.partition_id - ); - } } println!("Done."); Ok(()) } - /// Return true if this partition should be exported - fn should_export(&self, partition_id: i64) -> bool { - self.partition_filter - .map(|partition_filter| { - // if a partition filter was specified, only export - // the file if the partition matches - partition_filter == partition_id - }) - // export files if there is no partition - .unwrap_or(true) - } - /// Exports table and partition information for the specified /// table. Overwrites existing files, if any, to ensure it has the /// latest catalog information. @@ -158,13 +122,11 @@ impl RemoteExporter { .await?; for partition in partitions { - let partition_id = partition.id; - if self.should_export(partition_id) { - let partition_json = serde_json::to_string_pretty(&partition)?; - let filename = format!("partition.{partition_id}.json"); - let file_path = output_directory.join(&filename); - write_string_to_file(&partition_json, &file_path).await?; - } + let partition_id = to_partition_id(partition.identifier.as_ref()); + let partition_json = serde_json::to_string_pretty(&partition)?; + let filename = format!("partition.{partition_id}.json"); + let file_path = output_directory.join(&filename); + write_string_to_file(&partition_json, &file_path).await?; } Ok(()) @@ -183,9 +145,10 @@ impl RemoteExporter { parquet_file: &ParquetFile, ) -> Result<()> { let uuid = &parquet_file.object_store_id; - let partition_id = parquet_file.partition_id; let file_size_bytes = parquet_file.file_size_bytes as u64; + let partition_id = to_partition_id(parquet_file.partition_identifier.as_ref()); + // copy out the metadata as pbjson encoded data always (to // ensure we have the most up to date version) { @@ -230,6 +193,21 @@ impl RemoteExporter { } } +fn to_partition_id(partition_identifier: Option<&PartitionIdentifier>) -> TransitionPartitionId { + match partition_identifier + .and_then(|pi| pi.id.as_ref()) + .expect("Catalog service should send the partition identifier") + { + partition_identifier::Id::HashId(bytes) => TransitionPartitionId::Deterministic( + PartitionHashId::try_from(&bytes[..]) + .expect("Catalog service should send valid hash_id bytes"), + ), + partition_identifier::Id::CatalogId(id) => { + TransitionPartitionId::Deprecated(PartitionId::new(*id)) + } + } +} + /// writes the contents of a string to a file, overwriting the previous contents, if any async fn write_string_to_file(contents: &str, path: &Path) -> Result<()> { let mut file = OpenOptions::new() diff --git a/import_export/src/file/import.rs b/import_export/src/file/import.rs index 3056317498..c6530e824d 100644 --- a/import_export/src/file/import.rs +++ b/import_export/src/file/import.rs @@ -7,7 +7,7 @@ use data_types::{ NamespacePartitionTemplateOverride, TablePartitionTemplateOverride, PARTITION_BY_DAY_PROTO, }, ColumnSet, ColumnType, CompactionLevel, Namespace, NamespaceName, NamespaceNameError, - ParquetFileParams, Partition, PartitionHashId, Statistics, Table, TableId, Timestamp, + ParquetFileParams, Partition, Statistics, Table, TableId, Timestamp, }; use generated_types::influxdata::iox::catalog::v1 as proto; // ParquetFile as ProtoParquetFile, Partition as ProtoPartition, @@ -567,9 +567,6 @@ impl RemoteImporter { // need to make columns in the target catalog let column_set = insert_columns(table.id, decoded_iox_parquet_metadata, repos).await?; - // Create the the partition_hash_id - let partition_hash_id = Some(PartitionHashId::new(table.id, &partition.partition_key)); - let params = if let Some(proto_parquet_file) = &parquet_metadata { let compaction_level = proto_parquet_file .compaction_level @@ -579,8 +576,7 @@ impl RemoteImporter { ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_hash_id, - partition_id: partition.id, + partition_id: partition.transition_partition_id(), object_store_id, min_time: Timestamp::new(proto_parquet_file.min_time), max_time: Timestamp::new(proto_parquet_file.max_time), @@ -599,8 +595,7 @@ impl RemoteImporter { ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_hash_id, - partition_id: partition.id, + partition_id: partition.transition_partition_id(), object_store_id, min_time, max_time, diff --git a/influxdb_iox/src/commands/remote/store.rs b/influxdb_iox/src/commands/remote/store.rs index eddae4c15b..8c3ff29b4b 100644 --- a/influxdb_iox/src/commands/remote/store.rs +++ b/influxdb_iox/src/commands/remote/store.rs @@ -55,10 +55,6 @@ struct GetTable { #[clap(action)] table: String, - /// If specified, only files from the specified partitions are downloaded - #[clap(action, short, long)] - partition_id: Option, - /// The output directory to use. If not specified, files will be placed in a directory named /// after the table in the current working directory. #[clap(action, short)] @@ -91,13 +87,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { Command::GetTable(GetTable { namespace, table, - partition_id, output_directory, }) => { let mut exporter = RemoteExporter::new(connection); - if let Some(partition_id) = partition_id { - exporter = exporter.with_partition_filter(partition_id); - } Ok(exporter .export_table(output_directory, namespace, table) .await?) diff --git a/influxdb_iox/tests/end_to_end_cases/compactor.rs b/influxdb_iox/tests/end_to_end_cases/compactor.rs index 1ec8ba38ba..cb856f5bcd 100644 --- a/influxdb_iox/tests/end_to_end_cases/compactor.rs +++ b/influxdb_iox/tests/end_to_end_cases/compactor.rs @@ -157,10 +157,12 @@ async fn sharded_compactor_0_always_compacts_partition_1() { .assert() .success() .stdout( - // Important parts are the expected partition ID - predicate::str::contains(r#""partitionId": "1","#) - // and compaction level - .and(predicate::str::contains(r#""compactionLevel": 1"#)), + // Important parts are the expected partition identifier + predicate::str::contains( + r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#, + ) + // and compaction level + .and(predicate::str::contains(r#""compactionLevel": 1"#)), ); } .boxed() @@ -240,10 +242,12 @@ async fn sharded_compactor_1_never_compacts_partition_1() { .assert() .success() .stdout( - // Important parts are the expected partition ID - predicate::str::contains(r#""partitionId": "1","#) - // and compaction level is 0 so it's not returned - .and(predicate::str::contains("compactionLevel").not()), + // Important parts are the expected partition identifier + predicate::str::contains( + r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#, + ) + // and compaction level is 0 so it's not returned + .and(predicate::str::contains("compactionLevel").not()), ); } .boxed() diff --git a/influxdb_iox/tests/end_to_end_cases/remote.rs b/influxdb_iox/tests/end_to_end_cases/remote.rs index 7a3339d52a..9a31d70e72 100644 --- a/influxdb_iox/tests/end_to_end_cases/remote.rs +++ b/influxdb_iox/tests/end_to_end_cases/remote.rs @@ -280,10 +280,9 @@ async fn remote_partition_and_get_from_store_and_pull() { .arg("1") .assert() .success() - .stdout( - predicate::str::contains(r#""id": "1""#) - .and(predicate::str::contains(r#""partitionId": "1","#)), - ) + .stdout(predicate::str::contains( + r#""hashId": "uGKn6bMp7mpBjN4ZEZjq6xUSdT8ZuHqB3vKubD0O0jc=""#, + )) .get_output() .stdout .clone(); diff --git a/influxdb_iox_client/src/client/catalog.rs b/influxdb_iox_client/src/client/catalog.rs index f65ea4f154..249b2dec50 100644 --- a/influxdb_iox_client/src/client/catalog.rs +++ b/influxdb_iox_client/src/client/catalog.rs @@ -29,9 +29,15 @@ impl Client { &mut self, partition_id: i64, ) -> Result, Error> { + let partition_identifier = PartitionIdentifier { + id: Some(partition_identifier::Id::CatalogId(partition_id)), + }; + let response = self .inner - .get_parquet_files_by_partition_id(GetParquetFilesByPartitionIdRequest { partition_id }) + .get_parquet_files_by_partition_id(GetParquetFilesByPartitionIdRequest { + partition_identifier: Some(partition_identifier), + }) .await?; Ok(response.into_inner().parquet_files) diff --git a/ingester/src/persist/completion_observer.rs b/ingester/src/persist/completion_observer.rs index a3dfae0c0b..4978398daf 100644 --- a/ingester/src/persist/completion_observer.rs +++ b/ingester/src/persist/completion_observer.rs @@ -2,7 +2,8 @@ use std::{fmt::Debug, sync::Arc, time::Duration}; use async_trait::async_trait; use data_types::{ - sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, PartitionId, TableId, + sequence_number_set::SequenceNumberSet, NamespaceId, ParquetFileParams, TableId, + TransitionPartitionId, }; use crate::wal::reference_tracker::WalReferenceHandle; @@ -54,9 +55,9 @@ impl CompletedPersist { self.meta.table_id } - /// Returns the [`PartitionId`] of the persisted data. - pub(crate) fn partition_id(&self) -> PartitionId { - self.meta.partition_id + /// Returns the [`TransitionPartitionId`] of the persisted data. + pub(crate) fn partition_id(&self) -> &TransitionPartitionId { + &self.meta.partition_id } /// Returns the [`SequenceNumberSet`] of the persisted data. @@ -166,15 +167,16 @@ pub(crate) mod mock { #[cfg(test)] mod tests { use super::*; - use crate::test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID}; + use crate::test_util::{ + ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID, + }; use data_types::{ColumnId, ColumnSet, SequenceNumber, Timestamp}; fn arbitrary_file_meta() -> ParquetFileParams { ParquetFileParams { namespace_id: ARBITRARY_NAMESPACE_ID, table_id: ARBITRARY_TABLE_ID, - partition_id: ARBITRARY_PARTITION_ID, - partition_hash_id: None, + partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(), object_store_id: Default::default(), min_time: Timestamp::new(42), max_time: Timestamp::new(42), @@ -226,7 +228,7 @@ mod tests { assert_eq!(note.namespace_id(), meta.namespace_id); assert_eq!(note.table_id(), meta.table_id); - assert_eq!(note.partition_id(), meta.partition_id); + assert_eq!(note.partition_id(), &meta.partition_id); assert_eq!(note.column_count(), meta.column_set.len()); assert_eq!(note.row_count(), meta.row_count as usize); diff --git a/ingester/src/persist/file_metrics.rs b/ingester/src/persist/file_metrics.rs index 54f268f926..05353f9ccb 100644 --- a/ingester/src/persist/file_metrics.rs +++ b/ingester/src/persist/file_metrics.rs @@ -151,7 +151,9 @@ mod tests { use super::*; use crate::{ persist::completion_observer::mock::MockCompletionObserver, - test_util::{ARBITRARY_NAMESPACE_ID, ARBITRARY_PARTITION_ID, ARBITRARY_TABLE_ID}, + test_util::{ + ARBITRARY_NAMESPACE_ID, ARBITRARY_TABLE_ID, ARBITRARY_TRANSITION_PARTITION_ID, + }, }; use data_types::{ sequence_number_set::SequenceNumberSet, ColumnId, ColumnSet, ParquetFileParams, Timestamp, @@ -169,8 +171,7 @@ mod tests { let meta = ParquetFileParams { namespace_id: ARBITRARY_NAMESPACE_ID, table_id: ARBITRARY_TABLE_ID, - partition_id: ARBITRARY_PARTITION_ID, - partition_hash_id: None, + partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(), object_store_id: Default::default(), min_time: Timestamp::new(Duration::from_secs(1_000).as_nanos() as _), max_time: Timestamp::new(Duration::from_secs(1_042).as_nanos() as _), // 42 seconds later diff --git a/ingester/src/persist/mod.rs b/ingester/src/persist/mod.rs index ca0f829fc9..a7e5d9d951 100644 --- a/ingester/src/persist/mod.rs +++ b/ingester/src/persist/mod.rs @@ -16,7 +16,7 @@ mod tests { use std::{sync::Arc, time::Duration}; use assert_matches::assert_matches; - use data_types::{CompactionLevel, ParquetFile, TransitionPartitionId}; + use data_types::{CompactionLevel, ParquetFile}; use futures::TryStreamExt; use iox_catalog::{ interface::{get_schema_by_id, Catalog, SoftDeletedRows}, @@ -190,7 +190,7 @@ mod tests { // Generate a partition with data let partition = partition_with_write(Arc::clone(&catalog)).await; let table_id = partition.lock().table_id(); - let partition_id = partition.lock().partition_id(); + let partition_id = partition.lock().transition_partition_id(); let namespace_id = partition.lock().namespace_id(); assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(None)); @@ -221,7 +221,7 @@ mod tests { assert_matches!(&completion_observer.calls().as_slice(), &[n] => { assert_eq!(n.namespace_id(), namespace_id); assert_eq!(n.table_id(), table_id); - assert_eq!(n.partition_id(), partition_id); + assert_eq!(n.partition_id(), &partition_id); assert_eq!(n.sequence_numbers().len(), 1); }); @@ -243,12 +243,12 @@ mod tests { .repositories() .await .parquet_files() - .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id)) + .list_by_partition_not_to_delete(&partition_id) .await .expect("query for parquet files failed"); // Validate a single file was inserted with the expected properties. - let (object_store_id, file_size_bytes) = assert_matches!(&*files, &[ParquetFile { + let (object_store_id, file_size_bytes) = assert_matches!(&*files, [ParquetFile { namespace_id: got_namespace_id, table_id: got_table_id, partition_id: got_partition_id, @@ -263,12 +263,12 @@ mod tests { { assert_eq!(created_at.get(), max_l0_created_at.get()); - assert_eq!(got_namespace_id, namespace_id); - assert_eq!(got_table_id, table_id); - assert_eq!(got_partition_id, partition_id); + assert_eq!(got_namespace_id, &namespace_id); + assert_eq!(got_table_id, &table_id); + assert_eq!(got_partition_id, &partition_id); - assert_eq!(row_count, 1); - assert_eq!(compaction_level, CompactionLevel::Initial); + assert_eq!(*row_count, 1); + assert_eq!(compaction_level, &CompactionLevel::Initial); (object_store_id, file_size_bytes) } @@ -292,7 +292,7 @@ mod tests { }] => { let want_path = format!("{object_store_id}.parquet"); assert!(location.as_ref().ends_with(&want_path)); - assert_eq!(size, file_size_bytes as usize); + assert_eq!(size, *file_size_bytes as usize); } ) } @@ -326,8 +326,7 @@ mod tests { // Generate a partition with data let partition = partition_with_write(Arc::clone(&catalog)).await; let table_id = partition.lock().table_id(); - let partition_id = partition.lock().partition_id(); - let transition_partition_id = partition.lock().transition_partition_id(); + let partition_id = partition.lock().transition_partition_id(); let namespace_id = partition.lock().namespace_id(); assert_matches!(partition.lock().sort_key(), SortKeyState::Provided(None)); @@ -344,7 +343,7 @@ mod tests { .await .partitions() .cas_sort_key( - &transition_partition_id, + &partition_id, None, &["bananas", "are", "good", "for", "you"], ) @@ -367,7 +366,7 @@ mod tests { assert_matches!(&completion_observer.calls().as_slice(), &[n] => { assert_eq!(n.namespace_id(), namespace_id); assert_eq!(n.table_id(), table_id); - assert_eq!(n.partition_id(), partition_id); + assert_eq!(n.partition_id(), &partition_id); assert_eq!(n.sequence_numbers().len(), 1); }); @@ -392,12 +391,12 @@ mod tests { .repositories() .await .parquet_files() - .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition_id)) + .list_by_partition_not_to_delete(&partition_id) .await .expect("query for parquet files failed"); // Validate a single file was inserted with the expected properties. - let (object_store_id, file_size_bytes) = assert_matches!(&*files, &[ParquetFile { + let (object_store_id, file_size_bytes) = assert_matches!(&*files, [ParquetFile { namespace_id: got_namespace_id, table_id: got_table_id, partition_id: got_partition_id, @@ -412,12 +411,12 @@ mod tests { { assert_eq!(created_at.get(), max_l0_created_at.get()); - assert_eq!(got_namespace_id, namespace_id); - assert_eq!(got_table_id, table_id); - assert_eq!(got_partition_id, partition_id); + assert_eq!(got_namespace_id, &namespace_id); + assert_eq!(got_table_id, &table_id); + assert_eq!(got_partition_id, &partition_id); - assert_eq!(row_count, 1); - assert_eq!(compaction_level, CompactionLevel::Initial); + assert_eq!(*row_count, 1); + assert_eq!(compaction_level, &CompactionLevel::Initial); (object_store_id, file_size_bytes) } @@ -438,18 +437,14 @@ mod tests { assert_eq!(files.len(), 2, "expected two uploaded files"); // Ensure the catalog record points at a valid file in object storage. - let want_path = ParquetFilePath::new( - namespace_id, - table_id, - &transition_partition_id, - object_store_id, - ) - .object_store_path(); + let want_path = + ParquetFilePath::new(namespace_id, table_id, &partition_id, *object_store_id) + .object_store_path(); let file = files .into_iter() .find(|f| f.location == want_path) .expect("did not find final file in object storage"); - assert_eq!(file.size, file_size_bytes as usize); + assert_eq!(file.size, *file_size_bytes as usize); } } diff --git a/ingester/src/persist/queue.rs b/ingester/src/persist/queue.rs index dbb9714e5c..af03d08c86 100644 --- a/ingester/src/persist/queue.rs +++ b/ingester/src/persist/queue.rs @@ -55,7 +55,8 @@ pub(crate) mod mock { use std::{sync::Arc, time::Duration}; use data_types::{ - ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionId, TableId, Timestamp, + ColumnId, ColumnSet, NamespaceId, ParquetFileParams, PartitionHashId, PartitionKey, + TableId, Timestamp, TransitionPartitionId, }; use test_helpers::timeout::FutureTimeout; use tokio::task::JoinHandle; @@ -155,13 +156,16 @@ pub(crate) mod mock { let wait_ms: u64 = rand::random::() % 100; tokio::time::sleep(Duration::from_millis(wait_ms)).await; let sequence_numbers = partition.lock().mark_persisted(data); + let table_id = TableId::new(2); + let partition_hash_id = + PartitionHashId::new(table_id, &PartitionKey::from("arbitrary")); + let partition_id = TransitionPartitionId::Deterministic(partition_hash_id); completion_observer .persist_complete(Arc::new(CompletedPersist::new( ParquetFileParams { namespace_id: NamespaceId::new(1), - table_id: TableId::new(2), - partition_id: PartitionId::new(3), - partition_hash_id: None, + table_id, + partition_id, object_store_id: Default::default(), min_time: Timestamp::new(42), max_time: Timestamp::new(42), diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 0ecef0bd9d..96ae85b59b 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -394,8 +394,7 @@ where ParquetFileParams { namespace_id: NamespaceId::new(1), table_id: TableId::new(2), - partition_id: PartitionId::new(3), - partition_hash_id: None, + partition_id: ARBITRARY_TRANSITION_PARTITION_ID.clone(), object_store_id: Default::default(), min_time: Timestamp::new(42), max_time: Timestamp::new(42), diff --git a/iox_catalog/migrations/20230726175943_make_parquet_file_partition_id_optional.sql b/iox_catalog/migrations/20230726175943_make_parquet_file_partition_id_optional.sql new file mode 100644 index 0000000000..b95b8dad83 --- /dev/null +++ b/iox_catalog/migrations/20230726175943_make_parquet_file_partition_id_optional.sql @@ -0,0 +1,24 @@ +DROP TRIGGER IF EXISTS update_partition ON parquet_file; + +ALTER TABLE parquet_file +ALTER COLUMN partition_id +DROP NOT NULL; + +CREATE OR REPLACE FUNCTION update_partition_on_new_file_at() +RETURNS TRIGGER +LANGUAGE PLPGSQL +AS $$ +BEGIN + UPDATE partition + SET new_file_at = NEW.created_at + WHERE (NEW.partition_id IS NULL OR id = NEW.partition_id) + AND (NEW.partition_hash_id IS NULL OR hash_id = NEW.partition_hash_id); + + RETURN NEW; +END; +$$; + +CREATE TRIGGER update_partition + AFTER INSERT ON parquet_file + FOR EACH ROW + EXECUTE PROCEDURE update_partition_on_new_file_at(); diff --git a/iox_catalog/sqlite/migrations/20230726175943_make_parquet_file_partition_id_optional.sql b/iox_catalog/sqlite/migrations/20230726175943_make_parquet_file_partition_id_optional.sql new file mode 100644 index 0000000000..1adcbe3371 --- /dev/null +++ b/iox_catalog/sqlite/migrations/20230726175943_make_parquet_file_partition_id_optional.sql @@ -0,0 +1,98 @@ +CREATE TABLE parquet_file_temp +AS SELECT * FROM parquet_file; + +DROP TABLE parquet_file; + +CREATE TABLE parquet_file +( + id INTEGER + constraint parquet_file_pkey + primary key autoincrement, + shard_id numeric not null + constraint parquet_file_sequencer_id_fkey + references shard, + table_id numeric not null + references table_name, + partition_id numeric + references partition, + partition_hash_id bytea + references partition (hash_id), + + object_store_id uuid not null + constraint parquet_location_unique + unique, + max_sequence_number numeric, + min_time numeric, + max_time numeric, + to_delete numeric, + row_count numeric default 0 not null, + file_size_bytes numeric default 0 not null, + compaction_level smallint default 0 not null, + created_at numeric, + namespace_id numeric not null + references namespace + on delete cascade, + column_set numeric[] not null, + max_l0_created_at numeric default 0 not null +); + +create index if not exists parquet_file_deleted_at_idx + on parquet_file (to_delete); + +create index if not exists parquet_file_partition_idx + on parquet_file (partition_id); + +create index if not exists parquet_file_table_idx + on parquet_file (table_id); + +create index if not exists parquet_file_shard_compaction_delete_idx + on parquet_file (shard_id, compaction_level, to_delete); + +create index if not exists parquet_file_shard_compaction_delete_created_idx + on parquet_file (shard_id, compaction_level, to_delete, created_at); + +create index if not exists parquet_file_partition_created_idx + on parquet_file (partition_id, created_at); + +CREATE INDEX IF NOT EXISTS parquet_file_partition_hash_id_idx +ON parquet_file (partition_hash_id) +WHERE partition_hash_id IS NOT NULL; + +create trigger if not exists update_partition + after insert + on parquet_file + for each row +begin + UPDATE partition + SET new_file_at = NEW.created_at + WHERE (NEW.partition_id IS NULL OR id = NEW.partition_id) + AND (NEW.partition_hash_id IS NULL OR hash_id = NEW.partition_hash_id); +end; + +create trigger if not exists update_billing + after insert + on parquet_file + for each row +begin + INSERT INTO billing_summary (namespace_id, total_file_size_bytes) + VALUES (NEW.namespace_id, NEW.file_size_bytes) + ON CONFLICT (namespace_id) DO UPDATE + SET total_file_size_bytes = billing_summary.total_file_size_bytes + NEW.file_size_bytes + WHERE billing_summary.namespace_id = NEW.namespace_id; +end; + +create trigger if not exists decrement_summary + after update + on parquet_file + for each row + when OLD.to_delete IS NULL AND NEW.to_delete IS NOT NULL +begin + UPDATE billing_summary + SET total_file_size_bytes = billing_summary.total_file_size_bytes - OLD.file_size_bytes + WHERE billing_summary.namespace_id = OLD.namespace_id; +end; + +INSERT INTO parquet_file +SELECT * FROM parquet_file_temp; + +DROP TABLE parquet_file_temp; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index a56bf9edda..cf7b85556e 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -1865,7 +1865,7 @@ pub(crate) mod test_helpers { let other_params = ParquetFileParams { table_id: other_partition.table_id, - partition_id: other_partition.id, + partition_id: other_partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(50), max_time: Timestamp::new(60), @@ -1978,7 +1978,7 @@ pub(crate) mod test_helpers { let f1_params = ParquetFileParams { table_id: partition2.table_id, - partition_id: partition2.id, + partition_id: partition2.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(10), @@ -2449,7 +2449,7 @@ pub(crate) mod test_helpers { let l0_five_hour_ago_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_five_hour_ago, - partition_id: partition2.id, + partition_id: partition2.transition_partition_id(), ..parquet_file_params.clone() }; repos @@ -2492,7 +2492,7 @@ pub(crate) mod test_helpers { let l1_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_now, - partition_id: partition2.id, + partition_id: partition2.transition_partition_id(), compaction_level: CompactionLevel::FileNonOverlapped, ..parquet_file_params.clone() }; @@ -2578,7 +2578,7 @@ pub(crate) mod test_helpers { let l2_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_now, - partition_id: partition3.id, + partition_id: partition3.transition_partition_id(), compaction_level: CompactionLevel::Final, ..parquet_file_params.clone() }; @@ -2619,7 +2619,7 @@ pub(crate) mod test_helpers { let l0_one_hour_ago_file_params = ParquetFileParams { object_store_id: Uuid::new_v4(), created_at: time_one_hour_ago, - partition_id: partition3.id, + partition_id: partition3.transition_partition_id(), ..parquet_file_params.clone() }; repos @@ -2720,8 +2720,7 @@ pub(crate) mod test_helpers { level1_file.compaction_level = CompactionLevel::FileNonOverlapped; let other_partition_params = ParquetFileParams { - partition_id: partition2.id, - partition_hash_id: partition2.hash_id().cloned(), + partition_id: partition2.transition_partition_id(), object_store_id: Uuid::new_v4(), ..parquet_file_params.clone() }; @@ -2744,12 +2743,20 @@ pub(crate) mod test_helpers { expected_ids.sort(); assert_eq!(file_ids, expected_ids); - // remove namespace to avoid it from affecting later tests - repos - .namespaces() - .soft_delete("namespace_parquet_file_test_list_by_partiton_not_to_delete") + // Using the catalog partition ID should return the same files, even if the Parquet file + // records don't have the partition ID on them (which is the default now) + let files = repos + .parquet_files() + .list_by_partition_not_to_delete(&TransitionPartitionId::Deprecated(partition.id)) .await - .expect("delete namespace should succeed"); + .unwrap(); + assert_eq!(files.len(), 2); + + let mut file_ids: Vec<_> = files.into_iter().map(|f| f.id).collect(); + file_ids.sort(); + let mut expected_ids = vec![parquet_file.id, level1_file.id]; + expected_ids.sort(); + assert_eq!(file_ids, expected_ids); } async fn test_update_to_compaction_level_1(catalog: Arc) { diff --git a/iox_catalog/src/lib.rs b/iox_catalog/src/lib.rs index e45f4f59a8..ec6102f363 100644 --- a/iox_catalog/src/lib.rs +++ b/iox_catalog/src/lib.rs @@ -396,8 +396,7 @@ pub mod test_helpers { ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(10), diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index da8ec69e18..dfca594e6d 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -887,14 +887,28 @@ impl ParquetFileRepo for MemTxn { ) -> Result> { let stage = self.stage(); + let partition = stage + .partitions + .iter() + .find(|p| match partition_id { + TransitionPartitionId::Deterministic(hash_id) => p + .hash_id() + .map(|p_hash_id| p_hash_id == hash_id) + .unwrap_or(false), + TransitionPartitionId::Deprecated(id) => id == &p.id, + }) + .unwrap() + .clone(); + Ok(stage .parquet_files .iter() - .filter(|f| match partition_id { - TransitionPartitionId::Deterministic(hash_id) => { - f.partition_hash_id.as_ref().map_or(false, |h| h == hash_id) - } - TransitionPartitionId::Deprecated(id) => f.partition_id == *id, + .filter(|f| match &f.partition_id { + TransitionPartitionId::Deterministic(hash_id) => partition + .hash_id() + .map(|p_hash_id| p_hash_id == hash_id) + .unwrap_or(false), + TransitionPartitionId::Deprecated(id) => id == &partition.id, }) .filter(|f| f.to_delete.is_none()) .cloned() @@ -996,17 +1010,15 @@ async fn create_parquet_file( ParquetFileId::new(stage.parquet_files.len() as i64 + 1), ); let created_at = parquet_file.created_at; - let partition_id = parquet_file.partition_id; + let partition_id = parquet_file.partition_id.clone(); stage.parquet_files.push(parquet_file); // Update the new_file_at field its partition to the time of created_at let partition = stage .partitions .iter_mut() - .find(|p| p.id == partition_id) - .ok_or(Error::PartitionNotFound { - id: TransitionPartitionId::Deprecated(partition_id), - })?; + .find(|p| p.transition_partition_id() == partition_id) + .ok_or(Error::PartitionNotFound { id: partition_id })?; partition.new_file_at = Some(created_at); Ok(stage.parquet_files.last().unwrap().clone()) diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 9714a5496f..ea2f99c8f1 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1627,22 +1627,26 @@ RETURNING id; let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_hash_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.hash_id = $1 AND parquet_file.to_delete IS NULL; "#, ) .bind(hash_id), // $1 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFile>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.id = $1 AND parquet_file.to_delete IS NULL; "#, ) @@ -1754,7 +1758,6 @@ where namespace_id, table_id, partition_id, - partition_hash_id, object_store_id, min_time, max_time, @@ -1766,6 +1769,11 @@ where max_l0_created_at, } = parquet_file_params; + let (partition_id, partition_hash_id) = match partition_id { + TransitionPartitionId::Deterministic(hash_id) => (None, Some(hash_id)), + TransitionPartitionId::Deprecated(id) => (Some(id), None), + }; + let partition_hash_id_ref = &partition_hash_id.as_ref(); let query = sqlx::query_scalar::<_, ParquetFileId>( r#" @@ -2203,7 +2211,10 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; .create(parquet_file_params) .await .unwrap(); - assert!(parquet_file.partition_hash_id.is_none()); + assert_matches!( + parquet_file.partition_id, + TransitionPartitionId::Deprecated(_) + ); } #[test] diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 23ecd696f6..509fe5db10 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -1221,8 +1221,8 @@ struct ParquetFilePod { id: ParquetFileId, namespace_id: NamespaceId, table_id: TableId, - partition_id: PartitionId, - partition_hash_id: Option, + #[sqlx(flatten)] + partition_id: TransitionPartitionId, object_store_id: Uuid, min_time: Timestamp, max_time: Timestamp, @@ -1242,7 +1242,6 @@ impl From for ParquetFile { namespace_id: value.namespace_id, table_id: value.table_id, partition_id: value.partition_id, - partition_hash_id: value.partition_hash_id, object_store_id: value.object_store_id, min_time: value.min_time, max_time: value.max_time, @@ -1395,22 +1394,26 @@ RETURNING id; let query = match partition_id { TransitionPartitionId::Deterministic(hash_id) => sqlx::query_as::<_, ParquetFilePod>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_hash_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.hash_id = $1 AND parquet_file.to_delete IS NULL; "#, ) .bind(hash_id), // $1 TransitionPartitionId::Deprecated(id) => sqlx::query_as::<_, ParquetFilePod>( r#" -SELECT id, namespace_id, table_id, partition_id, partition_hash_id, object_store_id, min_time, - max_time, to_delete, file_size_bytes, row_count, compaction_level, created_at, column_set, - max_l0_created_at +SELECT parquet_file.id, namespace_id, parquet_file.table_id, partition_id, partition_hash_id, + object_store_id, min_time, max_time, parquet_file.to_delete, file_size_bytes, row_count, + compaction_level, created_at, column_set, max_l0_created_at FROM parquet_file -WHERE parquet_file.partition_id = $1 +INNER JOIN partition +ON partition.id = parquet_file.partition_id OR partition.hash_id = parquet_file.partition_hash_id +WHERE partition.id = $1 AND parquet_file.to_delete IS NULL; "#, ) @@ -1533,7 +1536,6 @@ where namespace_id, table_id, partition_id, - partition_hash_id, object_store_id, min_time, max_time, @@ -1545,7 +1547,10 @@ where max_l0_created_at, } = parquet_file_params; - let partition_hash_id_ref = &partition_hash_id.as_ref(); + let (partition_id, partition_hash_id) = match partition_id { + TransitionPartitionId::Deterministic(hash_id) => (None, Some(hash_id)), + TransitionPartitionId::Deprecated(id) => (Some(id), None), + }; let res = sqlx::query_as::<_, ParquetFilePod>( r#" INSERT INTO parquet_file ( @@ -1562,7 +1567,7 @@ RETURNING .bind(TRANSITION_SHARD_ID) // $1 .bind(table_id) // $2 .bind(partition_id) // $3 - .bind(partition_hash_id_ref) // $4 + .bind(partition_hash_id.as_ref()) // $4 .bind(object_store_id) // $5 .bind(min_time) // $6 .bind(max_time) // $7 @@ -1811,7 +1816,10 @@ RETURNING id, hash_id, table_id, partition_key, sort_key, new_file_at; .create(parquet_file_params) .await .unwrap(); - assert!(parquet_file.partition_hash_id.is_none()); + assert_matches!( + parquet_file.partition_id, + TransitionPartitionId::Deprecated(_) + ); } macro_rules! test_column_create_or_get_many_unchecked { diff --git a/iox_tests/src/builders.rs b/iox_tests/src/builders.rs index db62514f89..fd147aa077 100644 --- a/iox_tests/src/builders.rs +++ b/iox_tests/src/builders.rs @@ -1,6 +1,7 @@ use data_types::{ ColumnSet, CompactionLevel, NamespaceId, ParquetFile, ParquetFileId, Partition, PartitionHashId, PartitionId, PartitionKey, SkippedCompaction, Table, TableId, Timestamp, + TransitionPartitionId, }; use uuid::Uuid; @@ -20,8 +21,7 @@ impl ParquetFileBuilder { id: ParquetFileId::new(id), namespace_id: NamespaceId::new(0), table_id, - partition_id: PartitionId::new(0), - partition_hash_id: Some(PartitionHashId::new( + partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new( table_id, &PartitionKey::from("arbitrary"), )), @@ -39,11 +39,11 @@ impl ParquetFileBuilder { } } - /// Set the partition id - pub fn with_partition(self, id: i64) -> Self { + /// Set the partition identifier + pub fn with_partition(self, partition_id: TransitionPartitionId) -> Self { Self { file: ParquetFile { - partition_id: PartitionId::new(id), + partition_id, ..self.file }, } diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index c24c1f6b3e..e1be414f1f 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -602,8 +602,7 @@ impl TestPartition { let parquet_file_params = ParquetFileParams { namespace_id: self.namespace.namespace.id, table_id: self.table.table.id, - partition_id: self.partition.id, - partition_hash_id: self.partition.hash_id().cloned(), + partition_id: self.partition.transition_partition_id(), object_store_id: object_store_id.unwrap_or_else(Uuid::new_v4), min_time: Timestamp::new(min_time), max_time: Timestamp::new(max_time), diff --git a/iox_tests/src/lib.rs b/iox_tests/src/lib.rs index 60cdf73bab..79be4553ba 100644 --- a/iox_tests/src/lib.rs +++ b/iox_tests/src/lib.rs @@ -17,6 +17,8 @@ // Workaround for "unused crate" lint false positives. use workspace_hack as _; +use data_types::{PartitionHashId, PartitionKey, TableId, TransitionPartitionId}; + mod catalog; pub use catalog::{ TestCatalog, TestNamespace, TestParquetFile, TestParquetFileBuilder, TestPartition, TestTable, @@ -24,3 +26,14 @@ pub use catalog::{ mod builders; pub use builders::{ParquetFileBuilder, PartitionBuilder, SkippedCompactionBuilder, TableBuilder}; + +/// Create a partition identifier from an int (which gets used as the table ID) and a partition key +/// with the string "arbitrary". Most useful in cases where there isn't any actual catalog +/// interaction (that is, in mocks) and when the important property of the partition identifiers is +/// that they're either the same or different than other partition identifiers. +pub fn partition_identifier(table_id: i64) -> TransitionPartitionId { + TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(table_id), + &PartitionKey::from("arbitrary"), + )) +} diff --git a/parquet_file/src/lib.rs b/parquet_file/src/lib.rs index ff78087adf..9a852729d5 100644 --- a/parquet_file/src/lib.rs +++ b/parquet_file/src/lib.rs @@ -108,7 +108,7 @@ impl From<&ParquetFile> for ParquetFilePath { Self { namespace_id: f.namespace_id, table_id: f.table_id, - partition_id: f.transition_partition_id(), + partition_id: f.partition_id.clone(), object_store_id: f.object_store_id, } } @@ -119,7 +119,7 @@ impl From<&ParquetFileParams> for ParquetFilePath { Self { namespace_id: f.namespace_id, table_id: f.table_id, - partition_id: f.transition_partition_id(), + partition_id: f.partition_id.clone(), object_store_id: f.object_store_id, } } diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 389f0e45c3..7a9dcfa75b 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -91,7 +91,7 @@ use bytes::Bytes; use data_types::{ ColumnId, ColumnSet, ColumnSummary, CompactionLevel, InfluxDbType, NamespaceId, ParquetFileParams, PartitionHashId, PartitionId, PartitionKey, StatValues, Statistics, TableId, - Timestamp, + Timestamp, TransitionPartitionId, }; use generated_types::influxdata::iox::ingester::v1 as proto; use iox_time::Time; @@ -443,6 +443,7 @@ impl IoxMetadata { where F: for<'a> Fn(&'a str) -> ColumnId, { + let partition_id = TransitionPartitionId::from((partition_id, partition_hash_id.as_ref())); let decoded = metadata.decode().expect("invalid IOx metadata"); trace!( ?partition_id, @@ -487,7 +488,6 @@ impl IoxMetadata { namespace_id: self.namespace_id, table_id: self.table_id, partition_id, - partition_hash_id, object_store_id: self.object_store_id, min_time, max_time, diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index 0cca211320..a6e3bade51 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -361,8 +361,8 @@ mod tests { partition.create_parquet_file(builder).await; let table_id = table.table.id; - let single_file_size = 240; - let two_file_size = 448; + let single_file_size = 256; + let two_file_size = 480; assert!(single_file_size < two_file_size); let cache = make_cache(&catalog); diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 7226ecd4cb..ed90f25530 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -17,7 +17,7 @@ use cache_system::{ }; use data_types::{ partition_template::{build_column_values, ColumnValue}, - ColumnId, Partition, PartitionId, TransitionPartitionId, + ColumnId, Partition, TransitionPartitionId, }; use datafusion::scalar::ScalarValue; use iox_catalog::{interface::Catalog, partition_lookup_batch}; @@ -38,7 +38,7 @@ const CACHE_ID: &str = "partition"; type CacheT = Box< dyn Cache< - K = PartitionId, + K = TransitionPartitionId, V = Option, GetExtra = (Arc, Option), PeekExtra = ((), Option), @@ -49,7 +49,7 @@ type CacheT = Box< #[derive(Debug)] pub struct PartitionCache { cache: CacheT, - remove_if_handle: RemoveIfHandle>, + remove_if_handle: RemoveIfHandle>, flusher: Arc, } @@ -64,7 +64,8 @@ impl PartitionCache { testing: bool, ) -> Self { let loader = FunctionLoader::new( - move |partition_ids: Vec, cached_tables: Vec>| { + move |partition_ids: Vec, + cached_tables: Vec>| { // sanity checks assert_eq!(partition_ids.len(), cached_tables.len()); @@ -75,23 +76,20 @@ impl PartitionCache { // prepare output buffer let mut out = (0..partition_ids.len()).map(|_| None).collect::>(); let mut out_map = - HashMap::::with_capacity(partition_ids.len()); + HashMap::::with_capacity(partition_ids.len()); for (idx, id) in partition_ids.iter().enumerate() { - match out_map.entry(*id) { - Entry::Occupied(_) => unreachable!("cache system requested same partition from loader concurrently, this should have been prevented by the CacheDriver"), + match out_map.entry(id.clone()) { + Entry::Occupied(_) => unreachable!( + "cache system requested same partition from loader concurrently, \ + this should have been prevented by the CacheDriver" + ), Entry::Vacant(v) => { v.insert(idx); } } } - // build `&[&TransitionPartitionId]` for batch catalog request - let ids = partition_ids - .iter() - .copied() - .map(TransitionPartitionId::Deprecated) - .collect::>(); - let ids = ids.iter().collect::>(); + let ids: Vec<&TransitionPartitionId> = partition_ids.iter().collect(); // fetch catalog data let partitions = Backoff::new(&backoff_config) @@ -104,7 +102,7 @@ impl PartitionCache { // build output for p in partitions { - let idx = out_map[&p.id]; + let idx = out_map[&p.transition_partition_id()]; let cached_table = &cached_tables[idx]; let p = CachedPartition::new(p, cached_table); out[idx] = Some(p); @@ -180,7 +178,7 @@ impl PartitionCache { self.remove_if_handle.remove_if_and_get( &self.cache, - partition_id, + partition_id.clone(), move |cached_partition| { let invalidates = if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) @@ -195,7 +193,7 @@ impl PartitionCache { if invalidates { debug!( - partition_id = partition_id.get(), + partition_id = %partition_id, "invalidate partition cache", ); } @@ -217,13 +215,13 @@ impl PartitionCache { /// Request for [`PartitionCache::get`]. #[derive(Debug)] pub struct PartitionRequest { - pub partition_id: PartitionId, + pub partition_id: TransitionPartitionId, pub sort_key_should_cover: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct CachedPartition { - pub id: PartitionId, + pub id: TransitionPartitionId, pub sort_key: Option>, pub column_ranges: ColumnRanges, } @@ -299,7 +297,7 @@ impl CachedPartition { column_ranges.shrink_to_fit(); Self { - id: partition.id, + id: partition.transition_partition_id(), sort_key, column_ranges: Arc::new(column_ranges), } @@ -368,7 +366,10 @@ mod tests { ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count, }; use async_trait::async_trait; - use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType}; + use data_types::{ + partition_template::TablePartitionTemplateOverride, ColumnType, PartitionHashId, + PartitionId, PartitionKey, TableId, + }; use futures::StreamExt; use generated_types::influxdata::iox::partition_template::v1::{ template_part::Part, PartitionTemplate, TemplatePart, @@ -419,8 +420,11 @@ mod tests { true, ); + let p1_id = p1.transition_partition_id(); + let p2_id = p2.transition_partition_id(); + let sort_key1a = cache - .get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .get_one(Arc::clone(&cached_table), &p1_id, &Vec::new(), None) .await .unwrap() .sort_key; @@ -434,24 +438,24 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); let sort_key2 = cache - .get_one(Arc::clone(&cached_table), p2.id, &Vec::new(), None) + .get_one(Arc::clone(&cached_table), &p2_id, &Vec::new(), None) .await .unwrap() .sort_key; assert_eq!(sort_key2, None); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); let sort_key1b = cache - .get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .get_one(Arc::clone(&cached_table), &p1_id, &Vec::new(), None) .await .unwrap() .sort_key; @@ -461,16 +465,37 @@ mod tests { )); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); // non-existing partition for _ in 0..2 { + // Non-existing partition identified by partition hash ID let res = cache .get_one( Arc::clone(&cached_table), - PartitionId::new(i64::MAX), + &TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(i64::MAX), + &PartitionKey::from("bananas_not_found"), + )), + &[], + None, + ) + .await; + assert_eq!(res, None); + assert_catalog_access_metric_count( + &catalog.metric_registry, + "partition_get_by_hash_id_batch", + 3, + ); + + // Non-existing partition identified by deprecated catalog IDs; this part can be + // removed when partition identification is fully transitioned to partition hash IDs + let res = cache + .get_one( + Arc::clone(&cached_table), + &TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)), &Vec::new(), None, ) @@ -479,7 +504,7 @@ mod tests { assert_catalog_access_metric_count( &catalog.metric_registry, "partition_get_by_id_batch", - 3, + 1, ); } } @@ -548,8 +573,14 @@ mod tests { true, ); + let p1_id = p1.transition_partition_id(); + let p2_id = p2.transition_partition_id(); + let p3_id = p3.transition_partition_id(); + let p4_id = p4.transition_partition_id(); + let p5_id = p5.transition_partition_id(); + let ranges1a = cache - .get_one(Arc::clone(&cached_table), p1.id, &[], None) + .get_one(Arc::clone(&cached_table), &p1_id, &[], None) .await .unwrap() .column_ranges; @@ -578,12 +609,12 @@ mod tests { )); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); let ranges2 = cache - .get_one(Arc::clone(&cached_table), p2.id, &[], None) + .get_one(Arc::clone(&cached_table), &p2_id, &[], None) .await .unwrap() .column_ranges; @@ -599,12 +630,12 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); let ranges3 = cache - .get_one(Arc::clone(&cached_table), p3.id, &[], None) + .get_one(Arc::clone(&cached_table), &p3_id, &[], None) .await .unwrap() .column_ranges; @@ -629,12 +660,12 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 3, ); let ranges4 = cache - .get_one(Arc::clone(&cached_table), p4.id, &[], None) + .get_one(Arc::clone(&cached_table), &p4_id, &[], None) .await .unwrap() .column_ranges; @@ -659,12 +690,12 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 4, ); let ranges5 = cache - .get_one(Arc::clone(&cached_table), p5.id, &[], None) + .get_one(Arc::clone(&cached_table), &p5_id, &[], None) .await .unwrap() .column_ranges; @@ -680,28 +711,48 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 5, ); let ranges1b = cache - .get_one(Arc::clone(&cached_table), p1.id, &[], None) + .get_one(Arc::clone(&cached_table), &p1_id, &[], None) .await .unwrap() .column_ranges; assert!(Arc::ptr_eq(&ranges1a, &ranges1b)); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 5, ); - // non-existing partition for _ in 0..2 { + // Non-existing partition identified by partition hash ID let res = cache .get_one( Arc::clone(&cached_table), - PartitionId::new(i64::MAX), + &TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(i64::MAX), + &PartitionKey::from("bananas_not_found"), + )), + &[], + None, + ) + .await; + assert_eq!(res, None); + assert_catalog_access_metric_count( + &catalog.metric_registry, + "partition_get_by_hash_id_batch", + 6, + ); + + // Non-existing partition identified by deprecated catalog IDs; this part can be + // removed when partition identification is fully transitioned to partition hash IDs + let res = cache + .get_one( + Arc::clone(&cached_table), + &TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)), &[], None, ) @@ -710,7 +761,7 @@ mod tests { assert_catalog_access_metric_count( &catalog.metric_registry, "partition_get_by_id_batch", - 6, + 1, ); } } @@ -724,7 +775,7 @@ mod tests { let c1 = t.create_column("foo", ColumnType::Tag).await; let c2 = t.create_column("time", ColumnType::Time).await; let p = t.create_partition("k1").await; - let p_id = p.partition.id; + let p_id = p.partition.transition_partition_id(); let p_sort_key = p.partition.sort_key(); let cached_table = Arc::new(CachedTable { id: t.table.id, @@ -751,41 +802,41 @@ mod tests { ); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[], None) + .get_one(Arc::clone(&cached_table), &p_id, &[], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); // requesting nother will not expire assert!(p_sort_key.is_none()); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[], None) + .get_one(Arc::clone(&cached_table), &p_id, &[], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); // but requesting something will expire let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .get_one(Arc::clone(&cached_table), &p_id, &[c1.column.id], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); @@ -801,7 +852,7 @@ mod tests { // expire & fetch let p_sort_key = p.partition.sort_key(); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .get_one(Arc::clone(&cached_table), &p_id, &[c1.column.id], None) .await .unwrap() .sort_key; @@ -815,7 +866,7 @@ mod tests { ); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 3, ); @@ -827,7 +878,7 @@ mod tests { vec![c1.column.id, c2.column.id], ] { let sort_key_2 = cache - .get_one(Arc::clone(&cached_table), p_id, &should_cover, None) + .get_one(Arc::clone(&cached_table), &p_id, &should_cover, None) .await .unwrap() .sort_key; @@ -837,7 +888,7 @@ mod tests { )); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 3, ); } @@ -847,7 +898,7 @@ mod tests { let sort_key_2 = cache .get_one( Arc::clone(&cached_table), - p_id, + &p_id, &[c1.column.id, c3.column.id], None, ) @@ -861,7 +912,7 @@ mod tests { assert_eq!(sort_key, sort_key_2); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 4, ); } @@ -892,34 +943,45 @@ mod tests { true, ); + let p1_id = p1.transition_partition_id(); + let p2_id = p2.transition_partition_id(); + let mut res = cache .get( Arc::clone(&cached_table), vec![ PartitionRequest { - partition_id: p1.id, + partition_id: p1_id.clone(), sort_key_should_cover: vec![], }, PartitionRequest { - partition_id: p2.id, + partition_id: p2_id.clone(), sort_key_should_cover: vec![], }, PartitionRequest { - partition_id: p1.id, + partition_id: p1_id.clone(), + sort_key_should_cover: vec![], + }, + // requesting non-existing partitions is fine, they just don't appear in + // the output + PartitionRequest { + partition_id: TransitionPartitionId::Deprecated(PartitionId::new(i64::MAX)), sort_key_should_cover: vec![], }, PartitionRequest { - // requesting non-existing partitions is fine, they just don't appear in the output - partition_id: PartitionId::new(i64::MAX), + partition_id: TransitionPartitionId::Deterministic(PartitionHashId::new( + TableId::new(i64::MAX), + &PartitionKey::from("bananas_not_found"), + )), sort_key_should_cover: vec![], }, ], None, ) .await; - res.sort_by_key(|p| p.id); - let ids = res.iter().map(|p| p.id).collect::>(); - assert_eq!(ids, vec![p1.id, p1.id, p2.id]); + res.sort_by(|a, b| a.id.cmp(&b.id)); + let ids = res.into_iter().map(|p| p.id).collect::>(); + assert_eq!(ids, vec![p1_id.clone(), p1_id, p2_id]); assert_catalog_access_metric_count( &catalog.metric_registry, "partition_get_by_id_batch", @@ -1008,7 +1070,7 @@ mod tests { c_id: ColumnId, /// Partitions within that table. - partitions: Vec, + partitions: Vec, } impl ConcurrencyTestState { @@ -1032,7 +1094,7 @@ mod tests { t.create_partition_with_sort_key(&format!("p{i}"), &["time"]) .await .partition - .id + .transition_partition_id() } }) .collect::>() @@ -1046,7 +1108,8 @@ mod tests { } } - /// Perform the actual [`PartitionCache::get`] call and run some basic sanity checks on the result. + /// Perform the actual [`PartitionCache::get`] call and run some basic sanity checks on the + /// result. async fn run(self, cache: Arc) { let Self { cached_table, @@ -1060,15 +1123,15 @@ mod tests { partitions .iter() .map(|p| PartitionRequest { - partition_id: *p, + partition_id: p.clone(), sort_key_should_cover: vec![], }) .collect(), None, ) .await; - results.sort_by_key(|p| p.id); - let partitions_res = results.iter().map(|p| p.id).collect::>(); + results.sort_by(|a, b| a.id.cmp(&b.id)); + let partitions_res = results.iter().map(|p| p.id.clone()).collect::>(); assert_eq!(partitions, partitions_res); assert!(results .iter() @@ -1086,7 +1149,7 @@ mod tests { async fn get_one( &self, cached_table: Arc, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, sort_key_should_cover: &[ColumnId], span: Option, ) -> Option; @@ -1097,14 +1160,14 @@ mod tests { async fn get_one( &self, cached_table: Arc, - partition_id: PartitionId, + partition_id: &TransitionPartitionId, sort_key_should_cover: &[ColumnId], span: Option, ) -> Option { self.get( cached_table, vec![PartitionRequest { - partition_id, + partition_id: partition_id.clone(), sort_key_should_cover: sort_key_should_cover.to_vec(), }], span, diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 01c95f3dbf..7f7bb69037 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -859,10 +859,6 @@ impl IngesterPartition { } } - pub(crate) fn partition_id(&self) -> PartitionId { - self.partition_id - } - pub(crate) fn transition_partition_id(&self) -> TransitionPartitionId { TransitionPartitionId::from((self.partition_id, self.partition_hash_id.as_ref())) } diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index f76a98ed68..3e4a93d0d4 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, PartitionId, TransitionPartitionId}; +use data_types::{ChunkId, ChunkOrder, ColumnId, ParquetFile, TransitionPartitionId}; use futures::StreamExt; use hashbrown::HashSet; use iox_catalog::interface::Catalog; @@ -56,7 +56,7 @@ impl ChunkAdapter { &self, cached_table: Arc, files: Arc<[Arc]>, - cached_partitions: &HashMap, + cached_partitions: &HashMap, span: Option, ) -> Vec { let span_recorder = SpanRecorder::new(span); @@ -170,18 +170,13 @@ impl ChunkAdapter { let order = ChunkOrder::new(parquet_file.file.max_l0_created_at.get()); - let partition_id = parquet_file.file.partition_id; - let transition_partition_id = TransitionPartitionId::from(( - partition_id, - parquet_file.file.partition_hash_id.as_ref(), - )); + let partition_id = parquet_file.file.partition_id.clone(); let meta = Arc::new(QuerierParquetChunkMeta { chunk_id, order, sort_key: Some(sort_key), partition_id, - transition_partition_id, }); let parquet_chunk = Arc::new(ParquetChunk::new( diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index c3794a82dc..cf2100fcd0 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -1,6 +1,6 @@ //! Querier Chunks -use data_types::{ChunkId, ChunkOrder, PartitionId, TransitionPartitionId}; +use data_types::{ChunkId, ChunkOrder, TransitionPartitionId}; use datafusion::physical_plan::Statistics; use iox_query::chunk_statistics::{create_chunk_statistics, ColumnRanges}; use parquet_file::chunk::ParquetChunk; @@ -25,10 +25,7 @@ pub struct QuerierParquetChunkMeta { sort_key: Option, /// Partition ID. - partition_id: PartitionId, - - /// Transition partition ID. - transition_partition_id: TransitionPartitionId, + partition_id: TransitionPartitionId, } impl QuerierParquetChunkMeta { @@ -43,13 +40,8 @@ impl QuerierParquetChunkMeta { } /// Partition ID. - pub fn partition_id(&self) -> PartitionId { - self.partition_id - } - - /// Partition ID. - pub fn transition_partition_id(&self) -> &TransitionPartitionId { - &self.transition_partition_id + pub fn partition_id(&self) -> &TransitionPartitionId { + &self.partition_id } } @@ -251,7 +243,7 @@ pub mod tests { .get( Arc::clone(&self.cached_table), vec![PartitionRequest { - partition_id: self.parquet_file.partition_id, + partition_id: self.parquet_file.partition_id.clone(), sort_key_should_cover: vec![], }], None, @@ -261,7 +253,7 @@ pub mod tests { .next() .unwrap(); let cached_partitions = - HashMap::from([(self.parquet_file.partition_id, cached_partition)]); + HashMap::from([(self.parquet_file.partition_id.clone(), cached_partition)]); self.adapter .new_chunks( Arc::clone(&self.cached_table), diff --git a/querier/src/parquet/query_access.rs b/querier/src/parquet/query_access.rs index 1e9cf44743..fe87f4c9ce 100644 --- a/querier/src/parquet/query_access.rs +++ b/querier/src/parquet/query_access.rs @@ -15,11 +15,11 @@ impl QueryChunk for QuerierParquetChunk { } fn partition_id(&self) -> PartitionId { - self.meta().partition_id() + unimplemented!() } fn transition_partition_id(&self) -> &TransitionPartitionId { - self.meta().transition_partition_id() + self.meta().partition_id() } fn sort_key(&self) -> Option<&SortKey> { diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 52750ec47e..26268ae49c 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -8,7 +8,7 @@ use crate::{ parquet::ChunkAdapter, IngesterConnection, }; -use data_types::{ColumnId, NamespaceId, ParquetFile, PartitionId, TableId}; +use data_types::{ColumnId, NamespaceId, ParquetFile, TableId, TransitionPartitionId}; use datafusion::error::DataFusionError; use futures::join; use iox_query::{provider, provider::ChunkPruner, QueryChunk}; @@ -282,7 +282,7 @@ impl QuerierTable { let chunks = partitions .into_iter() .filter_map(|mut c| { - let cached_partition = cached_partitions.get(&c.partition_id())?; + let cached_partition = cached_partitions.get(&c.transition_partition_id())?; c.set_partition_column_ranges(&cached_partition.column_ranges); Some(c) }) @@ -322,16 +322,16 @@ impl QuerierTable { ingester_partitions: &[IngesterPartition], parquet_files: &[Arc], span: Option, - ) -> HashMap { + ) -> HashMap { let span_recorder = SpanRecorder::new(span); - let mut should_cover: HashMap> = + let mut should_cover: HashMap> = HashMap::with_capacity(ingester_partitions.len()); // For ingester partitions we only need the column ranges -- which are static -- not the sort key. So it is // sufficient to collect the partition IDs. for p in ingester_partitions { - should_cover.entry(p.partition_id()).or_default(); + should_cover.entry(p.transition_partition_id()).or_default(); } // For parquet files we must ensure that the -- potentially evolving -- sort key coveres the primary key. @@ -342,7 +342,7 @@ impl QuerierTable { .collect::>(); for f in parquet_files { should_cover - .entry(f.partition_id) + .entry(f.partition_id.clone()) .or_default() .extend(f.column_set.iter().copied().filter(|id| pk.contains(id))); } @@ -366,7 +366,7 @@ impl QuerierTable { ) .await; - partitions.into_iter().map(|p| (p.id, p)).collect() + partitions.into_iter().map(|p| (p.id.clone(), p)).collect() } /// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks) @@ -889,7 +889,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 2); @@ -899,7 +899,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 4); @@ -912,7 +912,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); @@ -922,7 +922,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 1, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 6); @@ -936,7 +936,7 @@ mod tests { assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count( &catalog.metric_registry, - "partition_get_by_id_batch", + "partition_get_by_hash_id_batch", 2, ); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 8); diff --git a/service_grpc_catalog/src/lib.rs b/service_grpc_catalog/src/lib.rs index 356539060d..c5787ac07e 100644 --- a/service_grpc_catalog/src/lib.rs +++ b/service_grpc_catalog/src/lib.rs @@ -18,7 +18,7 @@ // Workaround for "unused crate" lint false positives. use workspace_hack as _; -use data_types::{PartitionId, TableId, TransitionPartitionId}; +use data_types::{PartitionHashId, PartitionId, TableId, TransitionPartitionId}; use generated_types::influxdata::iox::catalog::v1::*; use iox_catalog::interface::{Catalog, SoftDeletedRows}; use observability_deps::tracing::*; @@ -47,14 +47,14 @@ impl catalog_service_server::CatalogService for CatalogService { ) -> Result, Status> { let mut repos = self.catalog.repositories().await; let req = request.into_inner(); - let partition_id = TransitionPartitionId::Deprecated(PartitionId::new(req.partition_id)); + let partition_id = to_partition_id(req.partition_identifier)?; let parquet_files = repos .parquet_files() .list_by_partition_not_to_delete(&partition_id) .await .map_err(|e| { - warn!(error=%e, %req.partition_id, "failed to get parquet_files for partition"); + warn!(error=%e, %partition_id, "failed to get parquet_files for partition"); Status::not_found(e.to_string()) })?; @@ -169,13 +169,52 @@ impl catalog_service_server::CatalogService for CatalogService { } } +fn to_partition_identifier(partition_id: &TransitionPartitionId) -> PartitionIdentifier { + match partition_id { + TransitionPartitionId::Deterministic(hash_id) => PartitionIdentifier { + id: Some(partition_identifier::Id::HashId( + hash_id.as_bytes().to_owned(), + )), + }, + TransitionPartitionId::Deprecated(id) => PartitionIdentifier { + id: Some(partition_identifier::Id::CatalogId(id.get())), + }, + } +} + +fn to_partition_id( + partition_identifier: Option, +) -> Result { + let partition_id = + match partition_identifier + .and_then(|pi| pi.id) + .ok_or(Status::invalid_argument( + "No partition identifier specified", + ))? { + partition_identifier::Id::HashId(bytes) => TransitionPartitionId::Deterministic( + PartitionHashId::try_from(&bytes[..]).map_err(|e| { + Status::invalid_argument(format!( + "Could not parse bytes as a `PartitionHashId`: {e}" + )) + })?, + ), + partition_identifier::Id::CatalogId(id) => { + TransitionPartitionId::Deprecated(PartitionId::new(id)) + } + }; + + Ok(partition_id) +} + // converts the catalog ParquetFile to protobuf fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile { + let partition_identifier = to_partition_identifier(&p.partition_id); + ParquetFile { id: p.id.get(), namespace_id: p.namespace_id.get(), table_id: p.table_id.get(), - partition_id: p.partition_id.get(), + partition_identifier: Some(partition_identifier), object_store_id: p.object_store_id.to_string(), min_time: p.min_time.get(), max_time: p.max_time.get(), @@ -191,8 +230,10 @@ fn to_parquet_file(p: data_types::ParquetFile) -> ParquetFile { // converts the catalog Partition to protobuf fn to_partition(p: data_types::Partition) -> Partition { + let identifier = to_partition_identifier(&p.transition_partition_id()); + Partition { - id: p.id.get(), + identifier: Some(identifier), key: p.partition_key.to_string(), table_id: p.table_id.get(), array_sort_key: p.sort_key, @@ -230,8 +271,7 @@ mod tests { let p1params = ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(5), @@ -248,13 +288,15 @@ mod tests { }; p1 = repos.parquet_files().create(p1params).await.unwrap(); p2 = repos.parquet_files().create(p2params).await.unwrap(); - partition_id = partition.id; + partition_id = partition.transition_partition_id(); Arc::clone(&catalog) }; + let partition_identifier = to_partition_identifier(&partition_id); + let grpc = super::CatalogService::new(catalog); let request = GetParquetFilesByPartitionIdRequest { - partition_id: partition_id.get(), + partition_identifier: Some(partition_identifier), }; let tonic_response = grpc diff --git a/service_grpc_object_store/src/lib.rs b/service_grpc_object_store/src/lib.rs index 70059c3b2f..8ab7b7adcb 100644 --- a/service_grpc_object_store/src/lib.rs +++ b/service_grpc_object_store/src/lib.rs @@ -75,7 +75,7 @@ impl object_store_service_server::ObjectStoreService for ObjectStoreService { let path = ParquetFilePath::new( parquet_file.namespace_id, parquet_file.table_id, - &parquet_file.transition_partition_id(), + &parquet_file.partition_id.clone(), parquet_file.object_store_id, ); let path = path.object_store_path(); @@ -128,8 +128,7 @@ mod tests { let p1params = ParquetFileParams { namespace_id: namespace.id, table_id: table.id, - partition_id: partition.id, - partition_hash_id: partition.hash_id().cloned(), + partition_id: partition.transition_partition_id(), object_store_id: Uuid::new_v4(), min_time: Timestamp::new(1), max_time: Timestamp::new(5), @@ -150,7 +149,7 @@ mod tests { let path = ParquetFilePath::new( p1.namespace_id, p1.table_id, - &p1.transition_partition_id(), + &p1.partition_id.clone(), p1.object_store_id, ); let path = path.object_store_path();