From 66bf0ff272caa5aad4db4982c579326b1dcbf1ea Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 15 Sep 2022 18:02:15 +0200 Subject: [PATCH] refactor(db): NULLable persisted_sequence_number Makes the partition.persisted_sequence_number column in the catalog DB NULLable. 0 is a valid persisted sequence number. --- data_types/src/lib.rs | 4 +++- import/src/aggregate_tsm_schema/update_catalog.rs | 10 +++++----- ingester/src/compact.rs | 12 ++++++------ ingester/src/data.rs | 15 +++++++-------- ...15155941_nullable-partition-persist-offset.sql | 11 +++++++++++ iox_catalog/src/interface.rs | 7 +++++-- iox_catalog/src/mem.rs | 4 ++-- 7 files changed, 39 insertions(+), 24 deletions(-) create mode 100644 iox_catalog/migrations/20220915155941_nullable-partition-persist-offset.sql diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index b29a5f6c48..06ca665c1b 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -853,7 +853,9 @@ pub struct Partition { /// It is a system invariant that this value monotonically increases over /// time - wrote another way, it is an invariant that partitions are /// persisted (or at least made visible) in sequence order. - pub persisted_sequence_number: SequenceNumber, + /// + /// If [`None`] no data has been persisted for this partition. + pub persisted_sequence_number: Option, } impl Partition { diff --git a/import/src/aggregate_tsm_schema/update_catalog.rs b/import/src/aggregate_tsm_schema/update_catalog.rs index 58ffae964e..35e808b5cc 100644 --- a/import/src/aggregate_tsm_schema/update_catalog.rs +++ b/import/src/aggregate_tsm_schema/update_catalog.rs @@ -406,7 +406,7 @@ mod tests { use crate::{AggregateTSMField, AggregateTSMTag}; use assert_matches::assert_matches; use client_util::connection::Builder; - use data_types::{PartitionId, SequenceNumber, TableId}; + use data_types::{PartitionId, TableId}; use iox_catalog::mem::MemCatalog; use parking_lot::RwLock; use std::{collections::HashSet, net::SocketAddr}; @@ -1313,7 +1313,7 @@ mod tests { id: PartitionId::new(1), shard_id: ShardId::new(1), table_id: TableId::new(1), - persisted_sequence_number: SequenceNumber::new(0), + persisted_sequence_number: None, partition_key: PartitionKey::from("2022-06-21"), sort_key: Vec::new(), }; @@ -1360,7 +1360,7 @@ mod tests { id: PartitionId::new(1), shard_id: ShardId::new(1), table_id: TableId::new(1), - persisted_sequence_number: SequenceNumber::new(0), + persisted_sequence_number: None, partition_key: PartitionKey::from("2022-06-21"), // N.B. sort key is already what it will computed to; here we're testing the `adjust_sort_key_columns` code path sort_key: vec!["host".to_string(), "arch".to_string(), "time".to_string()], @@ -1407,7 +1407,7 @@ mod tests { id: PartitionId::new(1), shard_id: ShardId::new(1), table_id: TableId::new(1), - persisted_sequence_number: SequenceNumber::new(0), + persisted_sequence_number: None, partition_key: PartitionKey::from("2022-06-21"), // N.B. is missing host so will need updating sort_key: vec!["arch".to_string(), "time".to_string()], @@ -1456,7 +1456,7 @@ mod tests { id: PartitionId::new(1), shard_id: ShardId::new(1), table_id: TableId::new(1), - persisted_sequence_number: SequenceNumber::new(0), + persisted_sequence_number: None, partition_key: PartitionKey::from("2022-06-21"), // N.B. is missing arch so will need updating sort_key: vec!["host".to_string(), "time".to_string()], diff --git a/ingester/src/compact.rs b/ingester/src/compact.rs index 292ed212b9..e831bd2a61 100644 --- a/ingester/src/compact.rs +++ b/ingester/src/compact.rs @@ -247,7 +247,7 @@ mod tests { table_id: TableId::new(table_id), partition_key: partition_key.into(), sort_key: vec![], - persisted_sequence_number: SequenceNumber::new(42), + persisted_sequence_number: Some(SequenceNumber::new(42)), }, }; @@ -318,7 +318,7 @@ mod tests { table_id: TableId::new(table_id), partition_key: partition_key.into(), sort_key: vec![], - persisted_sequence_number: SequenceNumber::new(42), + persisted_sequence_number: Some(SequenceNumber::new(42)), }, }; @@ -416,7 +416,7 @@ mod tests { partition_key: partition_key.into(), // NO SORT KEY from the catalog here, first persisting batch sort_key: vec![], - persisted_sequence_number: SequenceNumber::new(42), + persisted_sequence_number: Some(SequenceNumber::new(42)), }, }; @@ -517,7 +517,7 @@ mod tests { // SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog // this is NOT what the computed sort key would be based on this data's cardinality sort_key: vec!["tag3".to_string(), "tag1".to_string(), "time".to_string()], - persisted_sequence_number: SequenceNumber::new(42), + persisted_sequence_number: Some(SequenceNumber::new(42)), }, }; @@ -619,7 +619,7 @@ mod tests { // this is NOT what the computed sort key would be based on this data's cardinality // The new column, tag1, should get added just before the time column sort_key: vec!["tag3".to_string(), "time".to_string()], - persisted_sequence_number: SequenceNumber::new(42), + persisted_sequence_number: Some(SequenceNumber::new(42)), }, }; @@ -729,7 +729,7 @@ mod tests { "tag4".to_string(), "time".to_string(), ], - persisted_sequence_number: SequenceNumber::new(42), + persisted_sequence_number: Some(SequenceNumber::new(42)), }, }; diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 1053998ed6..06ab8f3b2c 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -343,16 +343,12 @@ impl Persister for IngesterData { // It is an invariant that partitions are persisted in order so that // both the per-shard, and per-partition watermarks are correctly // advanced and accurate. - // - // Skips the check for the first persist for a partition that - // contains a single write (where parquet file max=0). - if parquet_file.max_sequence_number.get() != 0 { + if let Some(last_persist) = partition_info.partition.persisted_sequence_number { assert!( - parquet_file.max_sequence_number - > partition_info.partition.persisted_sequence_number, + parquet_file.max_sequence_number > last_persist, "out of order partition persistence, persisting {}, previously persisted {}", parquet_file.max_sequence_number.get(), - partition_info.partition.persisted_sequence_number.get(), + last_persist.get(), ); } @@ -1001,7 +997,10 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(partition.persisted_sequence_number, SequenceNumber::new(2)); + assert_eq!( + partition.persisted_sequence_number, + Some(SequenceNumber::new(2)) + ); // This value should be recorded in the metrics asserted next; // it is less than 500 KB diff --git a/iox_catalog/migrations/20220915155941_nullable-partition-persist-offset.sql b/iox_catalog/migrations/20220915155941_nullable-partition-persist-offset.sql new file mode 100644 index 0000000000..8e7321f19a --- /dev/null +++ b/iox_catalog/migrations/20220915155941_nullable-partition-persist-offset.sql @@ -0,0 +1,11 @@ +-- Make persisted_sequence_number nullable. +-- +-- NULL == no persisted data for this partition +ALTER TABLE + "partition" DROP COLUMN "persisted_sequence_number"; + +-- Remove implicit NULL / default of 0. +ALTER TABLE + "partition" +ADD + COLUMN "persisted_sequence_number" BIGINT NULL DEFAULT NULL; diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index fe01f11966..9811fcc49d 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -1614,7 +1614,7 @@ pub(crate) mod test_helpers { .await .unwrap() .unwrap(); - assert_eq!(partition.persisted_sequence_number, SequenceNumber::new(0)); + assert_eq!(partition.persisted_sequence_number, None); // Set repos .partitions() @@ -1628,7 +1628,10 @@ pub(crate) mod test_helpers { .await .unwrap() .unwrap(); - assert_eq!(partition.persisted_sequence_number, SequenceNumber::new(42)); + assert_eq!( + partition.persisted_sequence_number, + Some(SequenceNumber::new(42)) + ); } async fn test_tombstone(catalog: Arc) { diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index bd53b58db3..0e2374d6f8 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -735,7 +735,7 @@ impl PartitionRepo for MemTxn { table_id, partition_key: key, sort_key: vec![], - persisted_sequence_number: SequenceNumber::new(0), + persisted_sequence_number: None, }; stage.partitions.push(p); stage.partitions.last().unwrap() @@ -888,7 +888,7 @@ impl PartitionRepo for MemTxn { let stage = self.stage(); match stage.partitions.iter_mut().find(|p| p.id == partition_id) { Some(p) => { - p.persisted_sequence_number = sequence_number; + p.persisted_sequence_number = Some(sequence_number); Ok(()) } None => Err(Error::PartitionNotFound { id: partition_id }),