refactor(db): NULLable persisted_sequence_number
Makes the partition.persisted_sequence_number column in the catalog DB NULLable. 0 is a valid persisted sequence number.pull/24376/head
parent
f718cfd71c
commit
66bf0ff272
|
|
@ -853,7 +853,9 @@ pub struct Partition {
|
|||
/// It is a system invariant that this value monotonically increases over
|
||||
/// time - wrote another way, it is an invariant that partitions are
|
||||
/// persisted (or at least made visible) in sequence order.
|
||||
pub persisted_sequence_number: SequenceNumber,
|
||||
///
|
||||
/// If [`None`] no data has been persisted for this partition.
|
||||
pub persisted_sequence_number: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
impl Partition {
|
||||
|
|
|
|||
|
|
@ -406,7 +406,7 @@ mod tests {
|
|||
use crate::{AggregateTSMField, AggregateTSMTag};
|
||||
use assert_matches::assert_matches;
|
||||
use client_util::connection::Builder;
|
||||
use data_types::{PartitionId, SequenceNumber, TableId};
|
||||
use data_types::{PartitionId, TableId};
|
||||
use iox_catalog::mem::MemCatalog;
|
||||
use parking_lot::RwLock;
|
||||
use std::{collections::HashSet, net::SocketAddr};
|
||||
|
|
@ -1313,7 +1313,7 @@ mod tests {
|
|||
id: PartitionId::new(1),
|
||||
shard_id: ShardId::new(1),
|
||||
table_id: TableId::new(1),
|
||||
persisted_sequence_number: SequenceNumber::new(0),
|
||||
persisted_sequence_number: None,
|
||||
partition_key: PartitionKey::from("2022-06-21"),
|
||||
sort_key: Vec::new(),
|
||||
};
|
||||
|
|
@ -1360,7 +1360,7 @@ mod tests {
|
|||
id: PartitionId::new(1),
|
||||
shard_id: ShardId::new(1),
|
||||
table_id: TableId::new(1),
|
||||
persisted_sequence_number: SequenceNumber::new(0),
|
||||
persisted_sequence_number: None,
|
||||
partition_key: PartitionKey::from("2022-06-21"),
|
||||
// N.B. sort key is already what it will computed to; here we're testing the `adjust_sort_key_columns` code path
|
||||
sort_key: vec!["host".to_string(), "arch".to_string(), "time".to_string()],
|
||||
|
|
@ -1407,7 +1407,7 @@ mod tests {
|
|||
id: PartitionId::new(1),
|
||||
shard_id: ShardId::new(1),
|
||||
table_id: TableId::new(1),
|
||||
persisted_sequence_number: SequenceNumber::new(0),
|
||||
persisted_sequence_number: None,
|
||||
partition_key: PartitionKey::from("2022-06-21"),
|
||||
// N.B. is missing host so will need updating
|
||||
sort_key: vec!["arch".to_string(), "time".to_string()],
|
||||
|
|
@ -1456,7 +1456,7 @@ mod tests {
|
|||
id: PartitionId::new(1),
|
||||
shard_id: ShardId::new(1),
|
||||
table_id: TableId::new(1),
|
||||
persisted_sequence_number: SequenceNumber::new(0),
|
||||
persisted_sequence_number: None,
|
||||
partition_key: PartitionKey::from("2022-06-21"),
|
||||
// N.B. is missing arch so will need updating
|
||||
sort_key: vec!["host".to_string(), "time".to_string()],
|
||||
|
|
|
|||
|
|
@ -247,7 +247,7 @@ mod tests {
|
|||
table_id: TableId::new(table_id),
|
||||
partition_key: partition_key.into(),
|
||||
sort_key: vec![],
|
||||
persisted_sequence_number: SequenceNumber::new(42),
|
||||
persisted_sequence_number: Some(SequenceNumber::new(42)),
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -318,7 +318,7 @@ mod tests {
|
|||
table_id: TableId::new(table_id),
|
||||
partition_key: partition_key.into(),
|
||||
sort_key: vec![],
|
||||
persisted_sequence_number: SequenceNumber::new(42),
|
||||
persisted_sequence_number: Some(SequenceNumber::new(42)),
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -416,7 +416,7 @@ mod tests {
|
|||
partition_key: partition_key.into(),
|
||||
// NO SORT KEY from the catalog here, first persisting batch
|
||||
sort_key: vec![],
|
||||
persisted_sequence_number: SequenceNumber::new(42),
|
||||
persisted_sequence_number: Some(SequenceNumber::new(42)),
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -517,7 +517,7 @@ mod tests {
|
|||
// SPECIFY A SORT KEY HERE to simulate a sort key being stored in the catalog
|
||||
// this is NOT what the computed sort key would be based on this data's cardinality
|
||||
sort_key: vec!["tag3".to_string(), "tag1".to_string(), "time".to_string()],
|
||||
persisted_sequence_number: SequenceNumber::new(42),
|
||||
persisted_sequence_number: Some(SequenceNumber::new(42)),
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -619,7 +619,7 @@ mod tests {
|
|||
// this is NOT what the computed sort key would be based on this data's cardinality
|
||||
// The new column, tag1, should get added just before the time column
|
||||
sort_key: vec!["tag3".to_string(), "time".to_string()],
|
||||
persisted_sequence_number: SequenceNumber::new(42),
|
||||
persisted_sequence_number: Some(SequenceNumber::new(42)),
|
||||
},
|
||||
};
|
||||
|
||||
|
|
@ -729,7 +729,7 @@ mod tests {
|
|||
"tag4".to_string(),
|
||||
"time".to_string(),
|
||||
],
|
||||
persisted_sequence_number: SequenceNumber::new(42),
|
||||
persisted_sequence_number: Some(SequenceNumber::new(42)),
|
||||
},
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -343,16 +343,12 @@ impl Persister for IngesterData {
|
|||
// It is an invariant that partitions are persisted in order so that
|
||||
// both the per-shard, and per-partition watermarks are correctly
|
||||
// advanced and accurate.
|
||||
//
|
||||
// Skips the check for the first persist for a partition that
|
||||
// contains a single write (where parquet file max=0).
|
||||
if parquet_file.max_sequence_number.get() != 0 {
|
||||
if let Some(last_persist) = partition_info.partition.persisted_sequence_number {
|
||||
assert!(
|
||||
parquet_file.max_sequence_number
|
||||
> partition_info.partition.persisted_sequence_number,
|
||||
parquet_file.max_sequence_number > last_persist,
|
||||
"out of order partition persistence, persisting {}, previously persisted {}",
|
||||
parquet_file.max_sequence_number.get(),
|
||||
partition_info.partition.persisted_sequence_number.get(),
|
||||
last_persist.get(),
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -1001,7 +997,10 @@ mod tests {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(partition.persisted_sequence_number, SequenceNumber::new(2));
|
||||
assert_eq!(
|
||||
partition.persisted_sequence_number,
|
||||
Some(SequenceNumber::new(2))
|
||||
);
|
||||
|
||||
// This value should be recorded in the metrics asserted next;
|
||||
// it is less than 500 KB
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
-- Make persisted_sequence_number nullable.
|
||||
--
|
||||
-- NULL == no persisted data for this partition
|
||||
ALTER TABLE
|
||||
"partition" DROP COLUMN "persisted_sequence_number";
|
||||
|
||||
-- Remove implicit NULL / default of 0.
|
||||
ALTER TABLE
|
||||
"partition"
|
||||
ADD
|
||||
COLUMN "persisted_sequence_number" BIGINT NULL DEFAULT NULL;
|
||||
|
|
@ -1614,7 +1614,7 @@ pub(crate) mod test_helpers {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(partition.persisted_sequence_number, SequenceNumber::new(0));
|
||||
assert_eq!(partition.persisted_sequence_number, None);
|
||||
// Set
|
||||
repos
|
||||
.partitions()
|
||||
|
|
@ -1628,7 +1628,10 @@ pub(crate) mod test_helpers {
|
|||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(partition.persisted_sequence_number, SequenceNumber::new(42));
|
||||
assert_eq!(
|
||||
partition.persisted_sequence_number,
|
||||
Some(SequenceNumber::new(42))
|
||||
);
|
||||
}
|
||||
|
||||
async fn test_tombstone(catalog: Arc<dyn Catalog>) {
|
||||
|
|
|
|||
|
|
@ -735,7 +735,7 @@ impl PartitionRepo for MemTxn {
|
|||
table_id,
|
||||
partition_key: key,
|
||||
sort_key: vec![],
|
||||
persisted_sequence_number: SequenceNumber::new(0),
|
||||
persisted_sequence_number: None,
|
||||
};
|
||||
stage.partitions.push(p);
|
||||
stage.partitions.last().unwrap()
|
||||
|
|
@ -888,7 +888,7 @@ impl PartitionRepo for MemTxn {
|
|||
let stage = self.stage();
|
||||
match stage.partitions.iter_mut().find(|p| p.id == partition_id) {
|
||||
Some(p) => {
|
||||
p.persisted_sequence_number = sequence_number;
|
||||
p.persisted_sequence_number = Some(sequence_number);
|
||||
Ok(())
|
||||
}
|
||||
None => Err(Error::PartitionNotFound { id: partition_id }),
|
||||
|
|
|
|||
Loading…
Reference in New Issue