diff --git a/data_types/src/chunk_metadata.rs b/data_types/src/chunk_metadata.rs index 21efcb5906..083690b6b0 100644 --- a/data_types/src/chunk_metadata.rs +++ b/data_types/src/chunk_metadata.rs @@ -2,7 +2,7 @@ use crate::partition_metadata::PartitionAddr; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{num::NonZeroU32, sync::Arc}; /// Address of the chunk within the catalog #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -233,17 +233,19 @@ impl std::fmt::Display for ChunkId { /// 1. **upsert order:** chunks with higher order overwrite data in chunks with lower order /// 2. **locking order:** chunks must be locked in consistent (ascending) order #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -pub struct ChunkOrder(u32); +pub struct ChunkOrder(NonZeroU32); impl ChunkOrder { - pub const MAX: Self = Self(u32::MAX); + // TODO: remove `unsafe` once https://github.com/rust-lang/rust/issues/51999 is fixed + pub const MIN: Self = Self(unsafe { NonZeroU32::new_unchecked(1) }); + pub const MAX: Self = Self(unsafe { NonZeroU32::new_unchecked(u32::MAX) }); - pub fn new(order: u32) -> Self { - Self(order) + pub fn new(order: u32) -> Option { + NonZeroU32::new(order).map(Self) } pub fn get(&self) -> u32 { - self.0 + self.0.get() } /// Get next chunk order. @@ -251,18 +253,15 @@ impl ChunkOrder { /// # Panic /// Panics if `self` is already [max](Self::MAX). pub fn next(&self) -> Self { - Self(self.0.checked_add(1).expect("chunk order overflow")) + Self( + NonZeroU32::new(self.0.get().checked_add(1).expect("chunk order overflow")) + .expect("did not overflow, so cannot be zero"), + ) } } impl std::fmt::Display for ChunkOrder { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_tuple("ChunkOrder").field(&self.0).finish() - } -} - -impl From for ChunkOrder { - fn from(order: u32) -> Self { - Self(order) + f.debug_tuple("ChunkOrder").field(&self.0.get()).finish() } } diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs index d6f9ad7468..27d79027b4 100644 --- a/generated_types/src/chunk.rs +++ b/generated_types/src/chunk.rs @@ -2,7 +2,9 @@ use crate::{ google::{FieldViolation, FromFieldOpt}, influxdata::iox::management::v1 as management, }; -use data_types::chunk_metadata::{ChunkId, ChunkLifecycleAction, ChunkStorage, ChunkSummary}; +use data_types::chunk_metadata::{ + ChunkId, ChunkLifecycleAction, ChunkOrder, ChunkStorage, ChunkSummary, +}; use std::{ convert::{TryFrom, TryInto}, sync::Arc, @@ -122,7 +124,10 @@ impl TryFrom for ChunkSummary { time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?, time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?, time_closed: timestamp(time_closed, "time_closed")?, - order: order.into(), + order: ChunkOrder::new(order).ok_or_else(|| FieldViolation { + field: "order".to_string(), + description: "Order must be non-zero".to_string(), + })?, }) } } @@ -204,7 +209,7 @@ mod test { time_of_last_write: now, time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)), - order: ChunkOrder::new(5), + order: ChunkOrder::new(5).unwrap(), }; assert_eq!( @@ -230,7 +235,7 @@ mod test { time_of_last_write: now, time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)), - order: ChunkOrder::new(5), + order: ChunkOrder::new(5).unwrap(), }; let proto = management::Chunk::try_from(summary).expect("conversion successful"); diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 4b52b1021c..a3fdeffd7d 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -754,7 +754,7 @@ mod tests { time_of_last_write: from_secs(time_of_last_write), lifecycle_action: None, storage, - order: ChunkOrder::new(0), + order: ChunkOrder::MIN, } } @@ -1517,23 +1517,23 @@ mod tests { // blocked by action below TestChunk::new(ChunkId::new(19), 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(ChunkOrder::new(4)), + .with_order(ChunkOrder::new(5).unwrap()), // has an action TestChunk::new(ChunkId::new(20), 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(ChunkOrder::new(3)) + .with_order(ChunkOrder::new(4).unwrap()) .with_action(ChunkLifecycleAction::Compacting), // closed => can compact TestChunk::new(ChunkId::new(21), 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(ChunkOrder::new(2)), + .with_order(ChunkOrder::new(3).unwrap()), TestChunk::new(ChunkId::new(22), 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(ChunkOrder::new(1)), + .with_order(ChunkOrder::new(2).unwrap()), // has an action, but doesn't block because it's first TestChunk::new(ChunkId::new(23), 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(ChunkOrder::new(0)) + .with_order(ChunkOrder::new(1).unwrap()) .with_action(ChunkLifecycleAction::Compacting), ]), ]; diff --git a/parquet_file/src/catalog/core.rs b/parquet_file/src/catalog/core.rs index 62b6eb66ab..a134e01e56 100644 --- a/parquet_file/src/catalog/core.rs +++ b/parquet_file/src/catalog/core.rs @@ -39,7 +39,7 @@ pub use crate::catalog::internals::proto_parse::Error as ProtoParseError; /// Current version for serialized transactions. /// /// For breaking changes, this will change. -pub const TRANSACTION_VERSION: u32 = 15; +pub const TRANSACTION_VERSION: u32 = 16; #[derive(Debug, Snafu)] pub enum Error { diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index 60c34766d3..d405a55467 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -272,7 +272,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 15, + version: 16, actions: [], revision_counter: 0, uuid: "00000000-0000-0000-0000-000000000000", @@ -297,7 +297,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 15, + version: 16, actions: [ Action { action: Some( @@ -396,7 +396,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 15, + version: 16, actions: [], revision_counter: 0, uuid: "00000000-0000-0000-0000-000000000000", @@ -421,7 +421,7 @@ File { is_checkpoint: false, proto: Ok( Transaction { - version: 15, + version: 16, actions: [ Action { action: Some( diff --git a/parquet_file/src/catalog/rebuild.rs b/parquet_file/src/catalog/rebuild.rs index dee81a5baa..cb3c955bae 100644 --- a/parquet_file/src/catalog/rebuild.rs +++ b/parquet_file/src/catalog/rebuild.rs @@ -379,7 +379,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: ChunkOrder::new(5), + chunk_order: ChunkOrder::new(5).unwrap(), }; let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches)); let (path, file_size_bytes, metadata) = storage diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 221437a24b..424a789fd1 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -121,7 +121,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro /// /// **Important: When changing this structure, consider bumping the /// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!** -pub const METADATA_VERSION: u32 = 7; +pub const METADATA_VERSION: u32 = 8; /// File-level metadata key to store the IOx-specific data. /// @@ -375,7 +375,11 @@ impl IoxMetadata { chunk_id: ChunkId::new(proto_msg.chunk_id), partition_checkpoint, database_checkpoint, - chunk_order: proto_msg.chunk_order.into(), + chunk_order: ChunkOrder::new(proto_msg.chunk_order).ok_or_else(|| { + Error::IoxMetadataFieldMissing { + field: "chunk_order".to_string(), + } + })?, }) } @@ -1074,7 +1078,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: ChunkOrder::new(5), + chunk_order: ChunkOrder::new(5).unwrap(), }; let proto_bytes = metadata.to_protobuf().unwrap(); diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 5f6110d44a..01a1538e98 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -450,7 +450,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: ChunkOrder::new(5), + chunk_order: ChunkOrder::new(5).unwrap(), }; // create parquet file @@ -525,7 +525,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: ChunkOrder::new(5), + chunk_order: ChunkOrder::new(5).unwrap(), }; let (path, _file_size_bytes, _metadata) = storage diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index d345cfe414..8b3bcc811c 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -189,7 +189,7 @@ pub async fn make_chunk_given_record_batch( database_checkpoint, time_of_first_write: Utc.timestamp(30, 40), time_of_last_write: Utc.timestamp(50, 60), - chunk_order: ChunkOrder::new(5), + chunk_order: ChunkOrder::new(5).unwrap(), }; let (path, file_size_bytes, parquet_metadata) = storage .write_to_object_store(addr.clone(), stream, metadata) diff --git a/query/src/test.rs b/query/src/test.rs index 7e27bdce1b..2607b9bd19 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -254,7 +254,7 @@ impl TestChunk { saved_error: Default::default(), predicate_match: Default::default(), delete_predicates: Default::default(), - order: ChunkOrder::new(0), + order: ChunkOrder::MIN, } } diff --git a/server/src/db.rs b/server/src/db.rs index a2401b0506..6de9f54db6 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -911,14 +911,9 @@ impl Db { /// `Instant::now()` that is used by the background worker. Can be mocked for testing. fn background_worker_now(&self) -> Instant { - let mut guard = self.background_worker_now_override.lock(); - match *guard { - Some(now) => { - *guard = Some(now + Duration::from_nanos(1)); - now - } - None => Instant::now(), - } + self.background_worker_now_override + .lock() + .unwrap_or_else(Instant::now) } async fn cleanup_unreferenced_parquet_files( @@ -1773,7 +1768,7 @@ mod tests { .id(); // A chunk is now in the object store and still in read buffer - let expected_parquet_size = 1243; + let expected_parquet_size = 1245; catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size); // now also in OS catalog_chunk_size_bytes_metric_eq(registry, "object_store", expected_parquet_size); @@ -2212,7 +2207,7 @@ mod tests { // Read buffer + Parquet chunk size catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1241); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1243); // All the chunks should have different IDs assert_ne!(mb_chunk.id(), rb_chunk.id()); @@ -2327,7 +2322,7 @@ mod tests { // Read buffer + Parquet chunk size catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1241); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1243); // Unload RB chunk but keep it in OS let pq_chunk = db @@ -2348,7 +2343,7 @@ mod tests { // Parquet chunk size only catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 0); - catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1241); + catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1243); // Verify data written to the parquet file in object store // @@ -2594,7 +2589,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), time_closed: None, - order: ChunkOrder::new(5), + order: ChunkOrder::new(5).unwrap(), }]; let size: usize = db @@ -2829,7 +2824,7 @@ mod tests { storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, memory_bytes: 4085, // size of RB and OS chunks - object_store_bytes: 1533, // size of parquet file + object_store_bytes: 1537, // size of parquet file row_count: 2, time_of_last_access: None, time_of_first_write: Utc.timestamp_nanos(1), diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index aedf5a2e1f..3d884df773 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -1162,7 +1162,7 @@ mod tests { mb_chunk, time_of_write, ChunkMetrics::new_unregistered(), - ChunkOrder::new(5), + ChunkOrder::new(5).unwrap(), ) } @@ -1180,7 +1180,7 @@ mod tests { now, ChunkMetrics::new_unregistered(), vec![], - ChunkOrder::new(6), + ChunkOrder::new(6).unwrap(), ) } } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 0d7ef3698e..be0877521c 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -154,7 +154,7 @@ impl Partition { next_chunk_id: ChunkId::new(0), metrics: Arc::new(metrics), persistence_windows: None, - next_chunk_order: ChunkOrder::new(0), + next_chunk_order: ChunkOrder::MIN, } } diff --git a/server/src/db/system_tables/chunks.rs b/server/src/db/system_tables/chunks.rs index 540b332a73..14a3acd2cb 100644 --- a/server/src/db/system_tables/chunks.rs +++ b/server/src/db/system_tables/chunks.rs @@ -165,7 +165,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(10_000_000_000), time_of_last_write: Utc.timestamp_nanos(10_000_000_000), time_closed: None, - order: ChunkOrder::new(5), + order: ChunkOrder::new(5).unwrap(), }, ChunkSummary { partition_key: Arc::from("p1"), @@ -180,7 +180,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(80_000_000_000), time_of_last_write: Utc.timestamp_nanos(80_000_000_000), time_closed: None, - order: ChunkOrder::new(6), + order: ChunkOrder::new(6).unwrap(), }, ChunkSummary { partition_key: Arc::from("p1"), @@ -195,7 +195,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(100_000_000_000), time_of_last_write: Utc.timestamp_nanos(200_000_000_000), time_closed: None, - order: ChunkOrder::new(7), + order: ChunkOrder::new(7).unwrap(), }, ]; diff --git a/server/src/db/system_tables/columns.rs b/server/src/db/system_tables/columns.rs index a3d7b4c1ee..567cd74d0b 100644 --- a/server/src/db/system_tables/columns.rs +++ b/server/src/db/system_tables/columns.rs @@ -321,7 +321,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, - order: ChunkOrder::new(5), + order: ChunkOrder::new(5).unwrap(), }, columns: vec![ ChunkColumnSummary { @@ -358,7 +358,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, - order: ChunkOrder::new(6), + order: ChunkOrder::new(6).unwrap(), }, columns: vec![ChunkColumnSummary { name: "c1".into(), @@ -389,7 +389,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, - order: ChunkOrder::new(5), + order: ChunkOrder::new(5).unwrap(), }, columns: vec![ChunkColumnSummary { name: "c3".into(), diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index c1bc4af2f9..2f31a2f62d 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -535,7 +535,7 @@ async fn test_chunk_get() { time_of_first_write: None, time_of_last_write: None, time_closed: None, - order: 0, + order: 1, }, Chunk { partition_key: "disk".into(), @@ -550,7 +550,7 @@ async fn test_chunk_get() { time_of_first_write: None, time_of_last_write: None, time_closed: None, - order: 0, + order: 1, }, ]; assert_eq!( @@ -722,7 +722,7 @@ async fn test_list_partition_chunks() { time_of_first_write: None, time_of_last_write: None, time_closed: None, - order: 0, + order: 1, }]; assert_eq!( diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 666c546761..4506e5ed02 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -709,7 +709,7 @@ async fn test_list_partition_chunks() { let expected = r#" "partition_key": "cpu", "table_name": "cpu", - "order": 0, + "order": 1, "id": 0, "storage": "OpenMutableBuffer", "#;