From f3f792fd086fa31b4ebc4cf01cbb801a2a88a398 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 24 Mar 2022 11:51:45 -0400 Subject: [PATCH] feat: Add namespace_id to the parquet_files table; object store paths need it --- compactor/src/compact.rs | 3 +++ data_types2/src/lib.rs | 4 ++++ ingester/src/data.rs | 1 + ...20220324152729_add_namespace_id_to_parquet_file.sql | 10 ++++++++++ iox_catalog/src/interface.rs | 6 ++++++ iox_catalog/src/mem.rs | 2 ++ iox_catalog/src/postgres.rs | 6 ++++-- iox_tests/src/util.rs | 1 + parquet_file/src/metadata.rs | 1 + 9 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 iox_catalog/migrations/20220324152729_add_namespace_id_to_parquet_file.sql diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index a4d9e275d5..2ecd18ad01 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -1499,6 +1499,7 @@ mod tests { ParquetFile { id: ParquetFileId::new(0), sequencer_id: SequencerId::new(0), + namespace_id: NamespaceId::new(0), table_id: TableId::new(0), partition_id: PartitionId::new(0), object_store_id: Uuid::new_v4(), @@ -1691,6 +1692,7 @@ mod tests { let p1 = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: table.id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -1993,6 +1995,7 @@ mod tests { // Prepare metadata in form of ParquetFileParams to get added with tombstone let parquet = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: table.id, partition_id: partition.id, object_store_id: Uuid::new_v4(), diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index dbf256ef37..753e1ac1a1 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -710,6 +710,8 @@ pub struct ParquetFile { pub id: ParquetFileId, /// the sequencer that sequenced writes that went into this file pub sequencer_id: SequencerId, + /// the namespace + pub namespace_id: NamespaceId, /// the table pub table_id: TableId, /// the partition @@ -743,6 +745,8 @@ pub struct ParquetFile { pub struct ParquetFileParams { /// the sequencer that sequenced writes that went into this file pub sequencer_id: SequencerId, + /// the namespace + pub namespace_id: NamespaceId, /// the table pub table_id: TableId, /// the partition diff --git a/ingester/src/data.rs b/ingester/src/data.rs index cbb0b1a074..6a2ee77554 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1897,6 +1897,7 @@ mod tests { .unwrap(); let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: table.id, partition_id: partition.id, object_store_id: Uuid::new_v4(), diff --git a/iox_catalog/migrations/20220324152729_add_namespace_id_to_parquet_file.sql b/iox_catalog/migrations/20220324152729_add_namespace_id_to_parquet_file.sql new file mode 100644 index 0000000000..a4962b8cb5 --- /dev/null +++ b/iox_catalog/migrations/20220324152729_add_namespace_id_to_parquet_file.sql @@ -0,0 +1,10 @@ +ALTER TABLE + IF EXISTS parquet_file + ADD COLUMN namespace_id INT NOT NULL; +ALTER TABLE + IF EXISTS parquet_file + ADD FOREIGN KEY (namespace_id) + REFERENCES namespace (id) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE NO ACTION + NOT VALID; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 492af73ca0..bc0a9c129e 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -902,6 +902,7 @@ pub(crate) mod test_helpers { .await .unwrap(); let parquet_file_params = ParquetFileParams { + namespace_id: namespace.id, sequencer_id: seq.id, table_id: t.id, partition_id: partition.id, @@ -1503,6 +1504,7 @@ pub(crate) mod test_helpers { let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -1712,6 +1714,7 @@ pub(crate) mod test_helpers { let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -2038,6 +2041,7 @@ pub(crate) mod test_helpers { let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -2166,6 +2170,7 @@ pub(crate) mod test_helpers { // Create a file with times entirely within the window let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), @@ -2366,6 +2371,7 @@ pub(crate) mod test_helpers { // Create a file with times entirely within the window let parquet_file_params = ParquetFileParams { sequencer_id: sequencer.id, + namespace_id: namespace.id, table_id: partition.table_id, partition_id: partition.id, object_store_id: Uuid::new_v4(), diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 7297a2e2d3..8fb01d5c30 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -902,6 +902,7 @@ impl ParquetFileRepo for MemTxn { let ParquetFileParams { sequencer_id, + namespace_id, table_id, partition_id, object_store_id, @@ -926,6 +927,7 @@ impl ParquetFileRepo for MemTxn { let parquet_file = ParquetFile { id: ParquetFileId::new(stage.parquet_files.len() as i64 + 1), sequencer_id, + namespace_id, table_id, partition_id, object_store_id, diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 780a75bb39..7de87a6e77 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1377,6 +1377,7 @@ impl ParquetFileRepo for PostgresTxn { async fn create(&mut self, parquet_file_params: ParquetFileParams) -> Result { let ParquetFileParams { sequencer_id, + namespace_id, table_id, partition_id, object_store_id, @@ -1395,8 +1396,8 @@ impl ParquetFileRepo for PostgresTxn { INSERT INTO parquet_file ( sequencer_id, table_id, partition_id, object_store_id, min_sequence_number, max_sequence_number, min_time, max_time, file_size_bytes, parquet_metadata, - row_count, compaction_level, created_at ) -VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 ) + row_count, compaction_level, created_at, namespace_id ) +VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) RETURNING *; "#, ) @@ -1413,6 +1414,7 @@ RETURNING *; .bind(row_count) // $11 .bind(INITIAL_COMPACTION_LEVEL) // $12 .bind(created_at) // $13 + .bind(namespace_id) // $14 .fetch_one(&mut self.inner) .await .map_err(|e| { diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index 56f2459f8e..dd11ebd58d 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -438,6 +438,7 @@ impl TestPartition { let parquet_file_params = ParquetFileParams { sequencer_id: self.sequencer.sequencer.id, + namespace_id: self.namespace.namespace.id, table_id: self.table.table.id, partition_id: self.partition.id, object_store_id, diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 6fbf0378bd..5ae01d00d7 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -662,6 +662,7 @@ impl IoxMetadata { ) -> ParquetFileParams { ParquetFileParams { sequencer_id: self.sequencer_id, + namespace_id: self.namespace_id, table_id: self.table_id, partition_id: self.partition_id, object_store_id: self.object_store_id,