diff --git a/data_types/src/chunk_metadata.rs b/data_types/src/chunk_metadata.rs index cd4bb42f74..2b6dc8b578 100644 --- a/data_types/src/chunk_metadata.rs +++ b/data_types/src/chunk_metadata.rs @@ -160,7 +160,7 @@ pub struct ChunkSummary { pub time_closed: Option>, /// Order of this chunk relative to other overlapping chunks. - pub order: u32, + pub order: ChunkOrder, } /// Represents metadata about the physical storage of a column in a chunk @@ -195,3 +195,39 @@ impl ChunkSummary { && self.row_count == other.row_count } } + +/// Order of a chunk. +/// +/// This is used for: +/// 1. **upsert locker:** chunks with higher order overwrite data in chunks with lower order +/// 2. **locking order:** chunks must be locked in ascending order +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct ChunkOrder(u32); + +impl ChunkOrder { + pub const MAX: Self = Self(u32::MAX); + + pub fn new(order: u32) -> Self { + Self(order) + } + + pub fn get(&self) -> u32 { + self.0 + } + + pub fn next(&self) -> Self { + Self(self.0.checked_add(1).expect("chunk order overflow")) + } +} + +impl std::fmt::Display for ChunkOrder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("ChunkOrder").field(&self.0).finish() + } +} + +impl From for ChunkOrder { + fn from(order: u32) -> Self { + Self(order) + } +} diff --git a/generated_types/src/chunk.rs b/generated_types/src/chunk.rs index 41c012be71..3fdbe680ad 100644 --- a/generated_types/src/chunk.rs +++ b/generated_types/src/chunk.rs @@ -40,7 +40,7 @@ impl From for management::Chunk { time_of_first_write: Some(time_of_first_write.into()), time_of_last_write: Some(time_of_last_write.into()), time_closed: time_closed.map(Into::into), - order, + order: order.get(), } } } @@ -124,7 +124,7 @@ impl TryFrom for ChunkSummary { time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?, time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?, time_closed: timestamp(time_closed, "time_closed")?, - order, + order: order.into(), }) } } @@ -168,6 +168,7 @@ impl TryFrom for Option mod test { use super::*; use chrono::{TimeZone, Utc}; + use data_types::chunk_metadata::ChunkOrder; #[test] fn valid_proto_to_summary() { @@ -206,7 +207,7 @@ mod test { time_of_last_write: now, time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)), - order: 5, + order: ChunkOrder::new(5), }; assert_eq!( @@ -232,7 +233,7 @@ mod test { time_of_last_write: now, time_closed: None, time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)), - order: 5, + order: ChunkOrder::new(5), }; let proto = management::Chunk::try_from(summary).expect("conversion successful"); diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index 5e1d4bf80f..a859005f2d 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc}; use data_types::{ - chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}, + chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkOrder, ChunkStorage}, database_rules::LifecycleRules, DatabaseName, }; @@ -153,7 +153,7 @@ pub trait LockableChunk: Sized { fn id(&self) -> u32; - fn order(&self) -> u32; + fn order(&self) -> ChunkOrder; } pub trait LifecyclePartition { @@ -191,8 +191,6 @@ pub trait LifecycleChunk { fn storage(&self) -> ChunkStorage; fn row_count(&self) -> usize; - - fn order(&self) -> u32; } /// The trait for a persist handle diff --git a/lifecycle/src/policy.rs b/lifecycle/src/policy.rs index 37af09c594..b327a3b94a 100644 --- a/lifecycle/src/policy.rs +++ b/lifecycle/src/policy.rs @@ -677,7 +677,7 @@ mod tests { ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, LockablePartition, PersistHandle, }; - use data_types::chunk_metadata::{ChunkAddr, ChunkStorage}; + use data_types::chunk_metadata::{ChunkAddr, ChunkOrder, ChunkStorage}; use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions; use std::{ cmp::max, @@ -698,7 +698,7 @@ mod tests { #[derive(Debug)] struct TestPartition { - chunks: BTreeMap>)>, + chunks: BTreeMap>)>, persistable_row_count: usize, minimum_unpersisted_age: Option, max_persistable_timestamp: Option>, @@ -731,7 +731,7 @@ mod tests { time_of_last_write: DateTime, lifecycle_action: Option>, storage: ChunkStorage, - order: u32, + order: ChunkOrder, } impl TestChunk { @@ -754,7 +754,7 @@ mod tests { time_of_last_write: from_secs(time_of_last_write), lifecycle_action: None, storage, - order: 0, + order: ChunkOrder::new(0), } } @@ -778,7 +778,7 @@ mod tests { self } - fn with_order(mut self, order: u32) -> Self { + fn with_order(mut self, order: ChunkOrder) -> Self { self.order = order; self } @@ -801,7 +801,7 @@ mod tests { db: &'a TestDb, chunk: Arc>, id: u32, - order: u32, + order: ChunkOrder, } #[derive(Debug)] @@ -870,7 +870,7 @@ mod tests { let mut new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer); new_chunk.row_count = 0; - let mut order = u32::MAX; + let mut order = ChunkOrder::MAX; for chunk in &chunks { partition.chunks.remove(&chunk.addr.chunk_id); new_chunk.row_count += chunk.row_count; @@ -880,7 +880,7 @@ mod tests { (None, Some(ts)) => Some(ts), (None, None) => None, }; - order = order.min(chunk.order()); + order = order.min(chunk.order); } partition @@ -907,10 +907,10 @@ mod tests { chunks: Vec>, handle: Self::PersistHandle, ) -> Result, Self::Error> { - let mut order = u32::MAX; + let mut order = ChunkOrder::MAX; for chunk in &chunks { partition.chunks.remove(&chunk.addr.chunk_id); - order = order.min(chunk.order()); + order = order.min(chunk.order); } let id = partition.next_id; @@ -982,7 +982,7 @@ mod tests { self.id } - fn order(&self) -> u32 { + fn order(&self) -> ChunkOrder { self.order } } @@ -1037,10 +1037,6 @@ mod tests { fn row_count(&self) -> usize { self.row_count } - - fn order(&self) -> u32 { - self.order - } } impl TestPartition { @@ -1050,8 +1046,7 @@ mod tests { .into_iter() .map(|x| { max_id = max(max_id, x.addr.chunk_id); - let order = x.order(); - (x.addr.chunk_id, (order, Arc::new(RwLock::new(x)))) + (x.addr.chunk_id, (x.order, Arc::new(RwLock::new(x)))) }) .collect(); @@ -1492,23 +1487,23 @@ mod tests { // blocked by action below TestChunk::new(19, 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(4), + .with_order(ChunkOrder::new(4)), // has an action TestChunk::new(20, 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(3) + .with_order(ChunkOrder::new(3)) .with_action(ChunkLifecycleAction::Compacting), // closed => can compact TestChunk::new(21, 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(2), + .with_order(ChunkOrder::new(2)), TestChunk::new(22, 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(1), + .with_order(ChunkOrder::new(1)), // has an action, but doesn't block because it's first TestChunk::new(23, 20, ChunkStorage::ReadBuffer) .with_row_count(400) - .with_order(0) + .with_order(ChunkOrder::new(0)) .with_action(ChunkLifecycleAction::Compacting), ]), ]; diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index f03435cc39..5fb7668e5c 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -505,7 +505,9 @@ File { }, }, }, - chunk_order: 5, + chunk_order: ChunkOrder( + 5, + ), }, ), schema: Ok( diff --git a/parquet_file/src/catalog/rebuild.rs b/parquet_file/src/catalog/rebuild.rs index 839c2ed35c..663741d0df 100644 --- a/parquet_file/src/catalog/rebuild.rs +++ b/parquet_file/src/catalog/rebuild.rs @@ -173,7 +173,7 @@ mod tests { }, }; use chrono::Utc; - use data_types::chunk_metadata::ChunkAddr; + use data_types::chunk_metadata::{ChunkAddr, ChunkOrder}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; use parquet::arrow::ArrowWriter; @@ -373,7 +373,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: 5, + chunk_order: ChunkOrder::new(5), }; let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches)); let (path, file_size_bytes, metadata) = storage diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index cc352f13ef..f530cfcfc8 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -87,7 +87,10 @@ //! [Apache Thrift]: https://thrift.apache.org/ //! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md use chrono::{DateTime, NaiveDateTime, Utc}; -use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; +use data_types::{ + chunk_metadata::ChunkOrder, + partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, +}; use generated_types::influxdata::iox::catalog::v1 as proto; use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema}; use parquet::{ @@ -271,7 +274,7 @@ pub struct IoxMetadata { pub database_checkpoint: DatabaseCheckpoint, /// Order of this chunk relative to other overlapping chunks. - pub chunk_order: u32, + pub chunk_order: ChunkOrder, } impl IoxMetadata { @@ -366,7 +369,7 @@ impl IoxMetadata { chunk_id: proto_msg.chunk_id, partition_checkpoint, database_checkpoint, - chunk_order: proto_msg.chunk_order, + chunk_order: proto_msg.chunk_order.into(), }) } @@ -421,7 +424,7 @@ impl IoxMetadata { chunk_id: self.chunk_id, partition_checkpoint: Some(proto_partition_checkpoint), database_checkpoint: Some(proto_database_checkpoint), - chunk_order: self.chunk_order, + chunk_order: self.chunk_order.get(), }; let mut buf = Vec::new(); @@ -1069,7 +1072,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: 5, + chunk_order: ChunkOrder::new(5), }; let proto_bytes = metadata.to_protobuf().unwrap(); @@ -1120,7 +1123,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: 5, + chunk_order: ChunkOrder::new(5), }; let proto_bytes = metadata.to_protobuf().unwrap(); diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index bfe4d6b806..d49dee0552 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -430,7 +430,7 @@ mod tests { use arrow::array::{ArrayRef, StringArray}; use arrow_util::assert_batches_eq; use chrono::Utc; - use data_types::partition_metadata::TableSummary; + use data_types::{chunk_metadata::ChunkOrder, partition_metadata::TableSummary}; use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion_util::MemoryStream; use parquet::schema::types::ColumnPath; @@ -452,7 +452,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: 5, + chunk_order: ChunkOrder::new(5), }; // create parquet file @@ -527,7 +527,7 @@ mod tests { database_checkpoint, time_of_first_write: Utc::now(), time_of_last_write: Utc::now(), - chunk_order: 5, + chunk_order: ChunkOrder::new(5), }; let (path, _file_size_bytes, _metadata) = storage diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index 46feaa6106..2e640c9464 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -13,7 +13,7 @@ use arrow::{ }; use chrono::{TimeZone, Utc}; use data_types::{ - chunk_metadata::ChunkAddr, + chunk_metadata::{ChunkAddr, ChunkOrder}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, server_id::ServerId, DatabaseName, @@ -189,7 +189,7 @@ pub async fn make_chunk_given_record_batch( database_checkpoint, time_of_first_write: Utc.timestamp(30, 40), time_of_last_write: Utc.timestamp(50, 60), - chunk_order: 5, + chunk_order: ChunkOrder::new(5), }; let (path, file_size_bytes, parquet_metadata) = storage .write_to_object_store(addr.clone(), stream, metadata) diff --git a/query/src/lib.rs b/query/src/lib.rs index 3599b96cb7..6f5781df64 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -9,7 +9,7 @@ )] use data_types::{ - chunk_metadata::ChunkSummary, + chunk_metadata::{ChunkOrder, ChunkSummary}, partition_metadata::{InfluxDbType, TableSummary}, }; use datafusion::physical_plan::SendableRecordBatchStream; @@ -152,7 +152,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync { fn chunk_type(&self) -> &str; /// Order of this chunk relative to other overlapping chunks. - fn order(&self) -> u32; + fn order(&self) -> ChunkOrder; } /// Implement ChunkMeta for something wrapped in an Arc (like Chunks often are) diff --git a/query/src/test.rs b/query/src/test.rs index cdf3630e4a..7f404ffe7a 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -13,6 +13,7 @@ use arrow::{ datatypes::{DataType, Int32Type, TimeUnit}, record_batch::RecordBatch, }; +use data_types::chunk_metadata::ChunkOrder; use data_types::{ chunk_metadata::ChunkSummary, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, @@ -173,7 +174,7 @@ pub struct TestChunk { predicate_match: Option, /// Order of this chunk relative to other overlapping chunks. - order: u32, + order: ChunkOrder, } /// Implements a method for adding a column with default stats @@ -247,7 +248,7 @@ impl TestChunk { table_data: Default::default(), saved_error: Default::default(), predicate_match: Default::default(), - order: Default::default(), + order: ChunkOrder::new(0), } } @@ -892,7 +893,7 @@ impl QueryChunk for TestChunk { Ok(Some(column_names)) } - fn order(&self) -> u32 { + fn order(&self) -> ChunkOrder { self.order } } diff --git a/server/src/db.rs b/server/src/db.rs index 5d4d5deffa..d0c97d4daf 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -20,7 +20,7 @@ use std::{ use ::lifecycle::{LifecycleChunk, LockableChunk, LockablePartition}; use data_types::{ - chunk_metadata::ChunkSummary, + chunk_metadata::{ChunkOrder, ChunkSummary}, database_rules::DatabaseRules, partition_metadata::{PartitionSummary, TableSummary}, server_id::ServerId, @@ -579,7 +579,7 @@ impl Db { table_name: &str, partition_key: &str, chunk_id: u32, - ) -> catalog::Result<(Arc>, u32)> { + ) -> catalog::Result<(Arc>, ChunkOrder)> { self.catalog.chunk(table_name, partition_key, chunk_id) } @@ -3110,7 +3110,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), time_closed: None, - order: 5, + order: ChunkOrder::new(5), }]; let size: usize = db @@ -3401,7 +3401,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), time_closed: None, - order: 5, + order: ChunkOrder::new(5), }, ChunkSummary { partition_key: Arc::from("1970-01-05T15"), @@ -3416,7 +3416,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), time_closed: None, - order: 6, + order: ChunkOrder::new(6), }, ChunkSummary { partition_key: Arc::from("1970-01-05T15"), @@ -3431,7 +3431,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1), time_closed: None, - order: 5, + order: ChunkOrder::new(5), }, ]; diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index 923ad7ae03..2676383e29 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -2,6 +2,7 @@ use std::collections::BTreeSet; use std::sync::Arc; +use data_types::chunk_metadata::ChunkOrder; use hashbrown::{HashMap, HashSet}; use data_types::chunk_metadata::ChunkSummary; @@ -162,7 +163,7 @@ impl Catalog { table_name: impl AsRef, partition_key: impl AsRef, chunk_id: u32, - ) -> Result<(Arc>, u32)> { + ) -> Result<(Arc>, ChunkOrder)> { let table_name = table_name.as_ref(); let partition_key = partition_key.as_ref(); diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index e7abcd0111..cf6d71b839 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -5,8 +5,8 @@ use snafu::Snafu; use data_types::{ chunk_metadata::{ - ChunkAddr, ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage, ChunkSummary, - DetailedChunkSummary, + ChunkAddr, ChunkColumnSummary, ChunkLifecycleAction, ChunkOrder, ChunkStorage, + ChunkSummary, DetailedChunkSummary, }, instant::to_approximate_datetime, partition_metadata::TableSummary, @@ -221,7 +221,7 @@ pub struct CatalogChunk { time_closed: Option>, /// Order of this chunk relative to other overlapping chunks. - order: u32, + order: ChunkOrder, } macro_rules! unexpected_state { @@ -275,7 +275,7 @@ impl CatalogChunk { chunk: mutable_buffer::chunk::MBChunk, time_of_write: DateTime, metrics: ChunkMetrics, - order: u32, + order: ChunkOrder, ) -> Self { assert_eq!(chunk.table_name(), &addr.table_name); @@ -308,7 +308,7 @@ impl CatalogChunk { schema: Arc, metrics: ChunkMetrics, delete_predicates: Arc>, - order: u32, + order: ChunkOrder, ) -> Self { let stage = ChunkStage::Frozen { meta: Arc::new(ChunkMetadata { @@ -343,7 +343,7 @@ impl CatalogChunk { time_of_last_write: DateTime, metrics: ChunkMetrics, delete_predicates: Arc>, - order: u32, + order: ChunkOrder, ) -> Self { assert_eq!(chunk.table_name(), addr.table_name.as_ref()); @@ -422,7 +422,7 @@ impl CatalogChunk { self.time_closed } - pub fn order(&self) -> u32 { + pub fn order(&self) -> ChunkOrder { self.order } @@ -1231,7 +1231,7 @@ mod tests { mb_chunk, time_of_write, ChunkMetrics::new_unregistered(), - 5, + ChunkOrder::new(5), ) } @@ -1249,7 +1249,7 @@ mod tests { now, ChunkMetrics::new_unregistered(), Arc::new(vec![] as Vec), - 6, + ChunkOrder::new(6), ) } } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 54a2b0dd6e..f625e85092 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -4,7 +4,7 @@ use super::chunk::{CatalogChunk, ChunkStage, Error as ChunkError}; use crate::db::catalog::metrics::PartitionMetrics; use chrono::{DateTime, Utc}; use data_types::{ - chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkSummary}, + chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkOrder, ChunkSummary}, partition_metadata::{PartitionAddr, PartitionSummary}, }; use internal_types::schema::Schema; @@ -52,7 +52,7 @@ pub struct Partition { /// The chunks that make up this partition, indexed by id. // // Alongside the chunk we also store its order. - chunks: BTreeMap>)>, + chunks: BTreeMap>)>, /// When this partition was created created_at: DateTime, @@ -71,7 +71,7 @@ pub struct Partition { persistence_windows: Option, /// Tracks next chunk order in this partition. - next_chunk_order: u32, + next_chunk_order: ChunkOrder, } impl Partition { @@ -89,7 +89,7 @@ impl Partition { next_chunk_id: 0, metrics: Arc::new(metrics), persistence_windows: None, - next_chunk_order: 0, + next_chunk_order: ChunkOrder::new(0), } } @@ -142,7 +142,7 @@ impl Partition { assert_eq!(chunk.table_name().as_ref(), self.table_name()); let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow"); - let chunk_order = Self::pick_next(&mut self.next_chunk_order, "Chunk Order Overflow"); + let chunk_order = self.next_chunk_order(); let addr = ChunkAddr::new(&self.addr, chunk_id); @@ -177,7 +177,7 @@ impl Partition { time_of_last_write: DateTime, schema: Arc, delete_predicates: Arc>, - chunk_order: u32, + chunk_order: ChunkOrder, ) -> (u32, Arc>) { let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow"); assert!( @@ -232,7 +232,7 @@ impl Partition { time_of_first_write: DateTime, time_of_last_write: DateTime, delete_predicates: Arc>, - chunk_order: u32, + chunk_order: ChunkOrder, ) -> Arc> { assert_eq!(chunk.table_name(), self.table_name()); @@ -257,9 +257,7 @@ impl Partition { self.next_chunk_id = self .next_chunk_id .max(chunk_id.checked_add(1).expect("Chunk ID Overflow")); - self.next_chunk_order = self - .next_chunk_order - .max(chunk_order.checked_add(1).expect("Chunk Order Overflow")); + self.next_chunk_order = self.next_chunk_order.max(chunk_order.next()); vacant.insert((chunk_order, Arc::clone(&chunk))); chunk @@ -311,7 +309,7 @@ impl Partition { } /// Return an immutable chunk and its order reference by chunk id. - pub fn chunk(&self, chunk_id: u32) -> Option<(&Arc>, u32)> { + pub fn chunk(&self, chunk_id: u32) -> Option<(&Arc>, ChunkOrder)> { self.chunks .get(&chunk_id) .map(|(order, chunk)| (chunk, *order)) @@ -330,7 +328,7 @@ impl Partition { /// Return chunks in this partition with their order and ids. /// /// Note that chunks are guaranteed ordered by chunk order and ID. - pub fn keyed_chunks(&self) -> Vec<(u32, u32, &Arc>)> { + pub fn keyed_chunks(&self) -> Vec<(u32, ChunkOrder, &Arc>)> { let mut chunks: Vec<_> = self .chunks .iter() @@ -386,6 +384,12 @@ impl Partition { .as_ref() .map(|persistence_windows| persistence_windows.sequencer_numbers()) } + + fn next_chunk_order(&mut self) -> ChunkOrder { + let res = self.next_chunk_order; + self.next_chunk_order = self.next_chunk_order.next(); + res + } } impl Display for Partition { diff --git a/server/src/db/chunk.rs b/server/src/db/chunk.rs index 3c3573e37f..8201afe6f5 100644 --- a/server/src/db/chunk.rs +++ b/server/src/db/chunk.rs @@ -2,7 +2,7 @@ use super::{ catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream, }; use chrono::{DateTime, Utc}; -use data_types::partition_metadata; +use data_types::{chunk_metadata::ChunkOrder, partition_metadata}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion_util::MemoryStream; use internal_types::{ @@ -85,7 +85,7 @@ pub struct DbChunk { meta: Arc, time_of_first_write: DateTime, time_of_last_write: DateTime, - order: u32, + order: ChunkOrder, } #[derive(Debug)] @@ -491,7 +491,7 @@ impl QueryChunk for DbChunk { } } - fn order(&self) -> u32 { + fn order(&self) -> ChunkOrder { self.order } } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index 8b5f4186f9..2acc0a2c9f 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -6,7 +6,7 @@ use crate::{ use ::lifecycle::LifecycleDb; use chrono::{DateTime, TimeZone, Utc}; use data_types::{ - chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}, + chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkOrder, ChunkStorage}, database_rules::LifecycleRules, error::ErrorLogger, job::Job, @@ -65,7 +65,7 @@ pub struct LockableCatalogChunk { pub db: Arc, pub chunk: Arc>, pub id: u32, - pub order: u32, + pub order: ChunkOrder, } impl LockableChunk for LockableCatalogChunk { @@ -105,7 +105,7 @@ impl LockableChunk for LockableCatalogChunk { self.id } - fn order(&self) -> u32 { + fn order(&self) -> ChunkOrder { self.order } } @@ -348,10 +348,6 @@ impl LifecycleChunk for CatalogChunk { fn row_count(&self) -> usize { self.storage().0 } - - fn order(&self) -> u32 { - self.order() - } } /// Executes a plan and collects the results into a read buffer chunk diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 85502abd7b..63ee18d425 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -7,7 +7,7 @@ use crate::db::{ DbChunk, }; use chrono::{DateTime, Utc}; -use data_types::job::Job; +use data_types::{chunk_metadata::ChunkOrder, job::Job}; use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; use query::{ @@ -48,7 +48,7 @@ pub(crate) fn compact_chunks( let mut time_of_first_write: Option> = None; let mut time_of_last_write: Option> = None; let mut delete_predicates: Vec = vec![]; - let mut min_order = u32::MAX; + let mut min_order = ChunkOrder::MAX; let query_chunks = chunks .into_iter() .map(|mut chunk| { diff --git a/server/src/db/lifecycle/persist.rs b/server/src/db/lifecycle/persist.rs index f8f6f49dd1..49a2bc8532 100644 --- a/server/src/db/lifecycle/persist.rs +++ b/server/src/db/lifecycle/persist.rs @@ -7,7 +7,7 @@ use crate::db::{ DbChunk, }; use chrono::{DateTime, Utc}; -use data_types::job::Job; +use data_types::{chunk_metadata::ChunkOrder, job::Job}; use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition}; use observability_deps::tracing::info; use persistence_windows::persistence_windows::FlushHandle; @@ -55,7 +55,7 @@ pub fn persist_chunks( let mut time_of_last_write: Option> = None; let mut query_chunks = vec![]; let mut delete_predicates: Vec = vec![]; - let mut min_order = u32::MAX; + let mut min_order = ChunkOrder::MAX; for mut chunk in chunks { // Sanity-check assert!(Arc::ptr_eq(&db, &chunk.data().db)); diff --git a/server/src/db/system_tables/chunks.rs b/server/src/db/system_tables/chunks.rs index f8742089e3..c46a619c7a 100644 --- a/server/src/db/system_tables/chunks.rs +++ b/server/src/db/system_tables/chunks.rs @@ -116,7 +116,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec) -> Result< .collect::(); let order = chunks .iter() - .map(|c| Some(c.order)) + .map(|c| Some(c.order.get())) .collect::(); RecordBatch::try_new( @@ -144,7 +144,7 @@ mod tests { use super::*; use arrow_util::assert_batches_eq; use chrono::{TimeZone, Utc}; - use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage}; + use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkOrder, ChunkStorage}; #[test] fn test_from_chunk_summaries() { @@ -162,7 +162,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(10_000_000_000), time_of_last_write: Utc.timestamp_nanos(10_000_000_000), time_closed: None, - order: 5, + order: ChunkOrder::new(5), }, ChunkSummary { partition_key: Arc::from("p1"), @@ -177,7 +177,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(80_000_000_000), time_of_last_write: Utc.timestamp_nanos(80_000_000_000), time_closed: None, - order: 6, + order: ChunkOrder::new(6), }, ChunkSummary { partition_key: Arc::from("p1"), @@ -192,7 +192,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(100_000_000_000), time_of_last_write: Utc.timestamp_nanos(200_000_000_000), time_closed: None, - order: 7, + order: ChunkOrder::new(7), }, ]; diff --git a/server/src/db/system_tables/columns.rs b/server/src/db/system_tables/columns.rs index 07d90f1e7d..b544e6a6e3 100644 --- a/server/src/db/system_tables/columns.rs +++ b/server/src/db/system_tables/columns.rs @@ -220,7 +220,7 @@ mod tests { use arrow_util::assert_batches_eq; use chrono::{TimeZone, Utc}; use data_types::{ - chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary}, + chunk_metadata::{ChunkColumnSummary, ChunkOrder, ChunkStorage, ChunkSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, }; @@ -321,7 +321,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, - order: 5, + order: ChunkOrder::new(5), }, columns: vec![ ChunkColumnSummary { @@ -358,7 +358,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, - order: 6, + order: ChunkOrder::new(6), }, columns: vec![ChunkColumnSummary { name: "c1".into(), @@ -389,7 +389,7 @@ mod tests { time_of_first_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(2), time_closed: None, - order: 5, + order: ChunkOrder::new(5), }, columns: vec![ChunkColumnSummary { name: "c3".into(),