refactor: introduce `ChunkOrder` type

pull/24376/head
Marco Neumann 2021-09-14 17:10:23 +02:00
parent a98ea6fe5e
commit becef1c75f
21 changed files with 138 additions and 101 deletions

View File

@ -160,7 +160,7 @@ pub struct ChunkSummary {
pub time_closed: Option<DateTime<Utc>>, pub time_closed: Option<DateTime<Utc>>,
/// Order of this chunk relative to other overlapping chunks. /// Order of this chunk relative to other overlapping chunks.
pub order: u32, pub order: ChunkOrder,
} }
/// Represents metadata about the physical storage of a column in a chunk /// Represents metadata about the physical storage of a column in a chunk
@ -195,3 +195,39 @@ impl ChunkSummary {
&& self.row_count == other.row_count && self.row_count == other.row_count
} }
} }
/// Order of a chunk.
///
/// This is used for:
/// 1. **upsert locker:** chunks with higher order overwrite data in chunks with lower order
/// 2. **locking order:** chunks must be locked in ascending order
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ChunkOrder(u32);
impl ChunkOrder {
pub const MAX: Self = Self(u32::MAX);
pub fn new(order: u32) -> Self {
Self(order)
}
pub fn get(&self) -> u32 {
self.0
}
pub fn next(&self) -> Self {
Self(self.0.checked_add(1).expect("chunk order overflow"))
}
}
impl std::fmt::Display for ChunkOrder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ChunkOrder").field(&self.0).finish()
}
}
impl From<u32> for ChunkOrder {
fn from(order: u32) -> Self {
Self(order)
}
}

View File

@ -40,7 +40,7 @@ impl From<ChunkSummary> for management::Chunk {
time_of_first_write: Some(time_of_first_write.into()), time_of_first_write: Some(time_of_first_write.into()),
time_of_last_write: Some(time_of_last_write.into()), time_of_last_write: Some(time_of_last_write.into()),
time_closed: time_closed.map(Into::into), time_closed: time_closed.map(Into::into),
order, order: order.get(),
} }
} }
} }
@ -124,7 +124,7 @@ impl TryFrom<management::Chunk> for ChunkSummary {
time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?, time_of_first_write: required_timestamp(time_of_first_write, "time_of_first_write")?,
time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?, time_of_last_write: required_timestamp(time_of_last_write, "time_of_last_write")?,
time_closed: timestamp(time_closed, "time_closed")?, time_closed: timestamp(time_closed, "time_closed")?,
order, order: order.into(),
}) })
} }
} }
@ -168,6 +168,7 @@ impl TryFrom<management::ChunkLifecycleAction> for Option<ChunkLifecycleAction>
mod test { mod test {
use super::*; use super::*;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use data_types::chunk_metadata::ChunkOrder;
#[test] #[test]
fn valid_proto_to_summary() { fn valid_proto_to_summary() {
@ -206,7 +207,7 @@ mod test {
time_of_last_write: now, time_of_last_write: now,
time_closed: None, time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)), time_of_last_access: Some(Utc.timestamp_nanos(50_000_000_007)),
order: 5, order: ChunkOrder::new(5),
}; };
assert_eq!( assert_eq!(
@ -232,7 +233,7 @@ mod test {
time_of_last_write: now, time_of_last_write: now,
time_closed: None, time_closed: None,
time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)), time_of_last_access: Some(Utc.timestamp_nanos(12_000_100_007)),
order: 5, order: ChunkOrder::new(5),
}; };
let proto = management::Chunk::try_from(summary).expect("conversion successful"); let proto = management::Chunk::try_from(summary).expect("conversion successful");

View File

@ -10,7 +10,7 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}, chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkOrder, ChunkStorage},
database_rules::LifecycleRules, database_rules::LifecycleRules,
DatabaseName, DatabaseName,
}; };
@ -153,7 +153,7 @@ pub trait LockableChunk: Sized {
fn id(&self) -> u32; fn id(&self) -> u32;
fn order(&self) -> u32; fn order(&self) -> ChunkOrder;
} }
pub trait LifecyclePartition { pub trait LifecyclePartition {
@ -191,8 +191,6 @@ pub trait LifecycleChunk {
fn storage(&self) -> ChunkStorage; fn storage(&self) -> ChunkStorage;
fn row_count(&self) -> usize; fn row_count(&self) -> usize;
fn order(&self) -> u32;
} }
/// The trait for a persist handle /// The trait for a persist handle

View File

@ -677,7 +677,7 @@ mod tests {
ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk, ChunkLifecycleAction, LifecycleReadGuard, LifecycleWriteGuard, LockableChunk,
LockablePartition, PersistHandle, LockablePartition, PersistHandle,
}; };
use data_types::chunk_metadata::{ChunkAddr, ChunkStorage}; use data_types::chunk_metadata::{ChunkAddr, ChunkOrder, ChunkStorage};
use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions; use data_types::database_rules::MaxActiveCompactions::MaxActiveCompactions;
use std::{ use std::{
cmp::max, cmp::max,
@ -698,7 +698,7 @@ mod tests {
#[derive(Debug)] #[derive(Debug)]
struct TestPartition { struct TestPartition {
chunks: BTreeMap<u32, (u32, Arc<RwLock<TestChunk>>)>, chunks: BTreeMap<u32, (ChunkOrder, Arc<RwLock<TestChunk>>)>,
persistable_row_count: usize, persistable_row_count: usize,
minimum_unpersisted_age: Option<Instant>, minimum_unpersisted_age: Option<Instant>,
max_persistable_timestamp: Option<DateTime<Utc>>, max_persistable_timestamp: Option<DateTime<Utc>>,
@ -731,7 +731,7 @@ mod tests {
time_of_last_write: DateTime<Utc>, time_of_last_write: DateTime<Utc>,
lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>, lifecycle_action: Option<TaskTracker<ChunkLifecycleAction>>,
storage: ChunkStorage, storage: ChunkStorage,
order: u32, order: ChunkOrder,
} }
impl TestChunk { impl TestChunk {
@ -754,7 +754,7 @@ mod tests {
time_of_last_write: from_secs(time_of_last_write), time_of_last_write: from_secs(time_of_last_write),
lifecycle_action: None, lifecycle_action: None,
storage, storage,
order: 0, order: ChunkOrder::new(0),
} }
} }
@ -778,7 +778,7 @@ mod tests {
self self
} }
fn with_order(mut self, order: u32) -> Self { fn with_order(mut self, order: ChunkOrder) -> Self {
self.order = order; self.order = order;
self self
} }
@ -801,7 +801,7 @@ mod tests {
db: &'a TestDb, db: &'a TestDb,
chunk: Arc<RwLock<TestChunk>>, chunk: Arc<RwLock<TestChunk>>,
id: u32, id: u32,
order: u32, order: ChunkOrder,
} }
#[derive(Debug)] #[derive(Debug)]
@ -870,7 +870,7 @@ mod tests {
let mut new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer); let mut new_chunk = TestChunk::new(id, 0, ChunkStorage::ReadBuffer);
new_chunk.row_count = 0; new_chunk.row_count = 0;
let mut order = u32::MAX; let mut order = ChunkOrder::MAX;
for chunk in &chunks { for chunk in &chunks {
partition.chunks.remove(&chunk.addr.chunk_id); partition.chunks.remove(&chunk.addr.chunk_id);
new_chunk.row_count += chunk.row_count; new_chunk.row_count += chunk.row_count;
@ -880,7 +880,7 @@ mod tests {
(None, Some(ts)) => Some(ts), (None, Some(ts)) => Some(ts),
(None, None) => None, (None, None) => None,
}; };
order = order.min(chunk.order()); order = order.min(chunk.order);
} }
partition partition
@ -907,10 +907,10 @@ mod tests {
chunks: Vec<LifecycleWriteGuard<'_, TestChunk, Self::Chunk>>, chunks: Vec<LifecycleWriteGuard<'_, TestChunk, Self::Chunk>>,
handle: Self::PersistHandle, handle: Self::PersistHandle,
) -> Result<TaskTracker<()>, Self::Error> { ) -> Result<TaskTracker<()>, Self::Error> {
let mut order = u32::MAX; let mut order = ChunkOrder::MAX;
for chunk in &chunks { for chunk in &chunks {
partition.chunks.remove(&chunk.addr.chunk_id); partition.chunks.remove(&chunk.addr.chunk_id);
order = order.min(chunk.order()); order = order.min(chunk.order);
} }
let id = partition.next_id; let id = partition.next_id;
@ -982,7 +982,7 @@ mod tests {
self.id self.id
} }
fn order(&self) -> u32 { fn order(&self) -> ChunkOrder {
self.order self.order
} }
} }
@ -1037,10 +1037,6 @@ mod tests {
fn row_count(&self) -> usize { fn row_count(&self) -> usize {
self.row_count self.row_count
} }
fn order(&self) -> u32 {
self.order
}
} }
impl TestPartition { impl TestPartition {
@ -1050,8 +1046,7 @@ mod tests {
.into_iter() .into_iter()
.map(|x| { .map(|x| {
max_id = max(max_id, x.addr.chunk_id); max_id = max(max_id, x.addr.chunk_id);
let order = x.order(); (x.addr.chunk_id, (x.order, Arc::new(RwLock::new(x))))
(x.addr.chunk_id, (order, Arc::new(RwLock::new(x))))
}) })
.collect(); .collect();
@ -1492,23 +1487,23 @@ mod tests {
// blocked by action below // blocked by action below
TestChunk::new(19, 20, ChunkStorage::ReadBuffer) TestChunk::new(19, 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
.with_order(4), .with_order(ChunkOrder::new(4)),
// has an action // has an action
TestChunk::new(20, 20, ChunkStorage::ReadBuffer) TestChunk::new(20, 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
.with_order(3) .with_order(ChunkOrder::new(3))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
// closed => can compact // closed => can compact
TestChunk::new(21, 20, ChunkStorage::ReadBuffer) TestChunk::new(21, 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
.with_order(2), .with_order(ChunkOrder::new(2)),
TestChunk::new(22, 20, ChunkStorage::ReadBuffer) TestChunk::new(22, 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
.with_order(1), .with_order(ChunkOrder::new(1)),
// has an action, but doesn't block because it's first // has an action, but doesn't block because it's first
TestChunk::new(23, 20, ChunkStorage::ReadBuffer) TestChunk::new(23, 20, ChunkStorage::ReadBuffer)
.with_row_count(400) .with_row_count(400)
.with_order(0) .with_order(ChunkOrder::new(0))
.with_action(ChunkLifecycleAction::Compacting), .with_action(ChunkLifecycleAction::Compacting),
]), ]),
]; ];

View File

@ -505,7 +505,9 @@ File {
}, },
}, },
}, },
chunk_order: 5, chunk_order: ChunkOrder(
5,
),
}, },
), ),
schema: Ok( schema: Ok(

View File

@ -173,7 +173,7 @@ mod tests {
}, },
}; };
use chrono::Utc; use chrono::Utc;
use data_types::chunk_metadata::ChunkAddr; use data_types::chunk_metadata::{ChunkAddr, ChunkOrder};
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream; use datafusion_util::MemoryStream;
use parquet::arrow::ArrowWriter; use parquet::arrow::ArrowWriter;
@ -373,7 +373,7 @@ mod tests {
database_checkpoint, database_checkpoint,
time_of_first_write: Utc::now(), time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(), time_of_last_write: Utc::now(),
chunk_order: 5, chunk_order: ChunkOrder::new(5),
}; };
let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches)); let stream: SendableRecordBatchStream = Box::pin(MemoryStream::new(record_batches));
let (path, file_size_bytes, metadata) = storage let (path, file_size_bytes, metadata) = storage

View File

@ -87,7 +87,10 @@
//! [Apache Thrift]: https://thrift.apache.org/ //! [Apache Thrift]: https://thrift.apache.org/
//! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md //! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; use data_types::{
chunk_metadata::ChunkOrder,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
};
use generated_types::influxdata::iox::catalog::v1 as proto; use generated_types::influxdata::iox::catalog::v1 as proto;
use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema}; use internal_types::schema::{InfluxColumnType, InfluxFieldType, Schema};
use parquet::{ use parquet::{
@ -271,7 +274,7 @@ pub struct IoxMetadata {
pub database_checkpoint: DatabaseCheckpoint, pub database_checkpoint: DatabaseCheckpoint,
/// Order of this chunk relative to other overlapping chunks. /// Order of this chunk relative to other overlapping chunks.
pub chunk_order: u32, pub chunk_order: ChunkOrder,
} }
impl IoxMetadata { impl IoxMetadata {
@ -366,7 +369,7 @@ impl IoxMetadata {
chunk_id: proto_msg.chunk_id, chunk_id: proto_msg.chunk_id,
partition_checkpoint, partition_checkpoint,
database_checkpoint, database_checkpoint,
chunk_order: proto_msg.chunk_order, chunk_order: proto_msg.chunk_order.into(),
}) })
} }
@ -421,7 +424,7 @@ impl IoxMetadata {
chunk_id: self.chunk_id, chunk_id: self.chunk_id,
partition_checkpoint: Some(proto_partition_checkpoint), partition_checkpoint: Some(proto_partition_checkpoint),
database_checkpoint: Some(proto_database_checkpoint), database_checkpoint: Some(proto_database_checkpoint),
chunk_order: self.chunk_order, chunk_order: self.chunk_order.get(),
}; };
let mut buf = Vec::new(); let mut buf = Vec::new();
@ -1069,7 +1072,7 @@ mod tests {
database_checkpoint, database_checkpoint,
time_of_first_write: Utc::now(), time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(), time_of_last_write: Utc::now(),
chunk_order: 5, chunk_order: ChunkOrder::new(5),
}; };
let proto_bytes = metadata.to_protobuf().unwrap(); let proto_bytes = metadata.to_protobuf().unwrap();
@ -1120,7 +1123,7 @@ mod tests {
database_checkpoint, database_checkpoint,
time_of_first_write: Utc::now(), time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(), time_of_last_write: Utc::now(),
chunk_order: 5, chunk_order: ChunkOrder::new(5),
}; };
let proto_bytes = metadata.to_protobuf().unwrap(); let proto_bytes = metadata.to_protobuf().unwrap();

View File

@ -430,7 +430,7 @@ mod tests {
use arrow::array::{ArrayRef, StringArray}; use arrow::array::{ArrayRef, StringArray};
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use chrono::Utc; use chrono::Utc;
use data_types::partition_metadata::TableSummary; use data_types::{chunk_metadata::ChunkOrder, partition_metadata::TableSummary};
use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion_util::MemoryStream; use datafusion_util::MemoryStream;
use parquet::schema::types::ColumnPath; use parquet::schema::types::ColumnPath;
@ -452,7 +452,7 @@ mod tests {
database_checkpoint, database_checkpoint,
time_of_first_write: Utc::now(), time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(), time_of_last_write: Utc::now(),
chunk_order: 5, chunk_order: ChunkOrder::new(5),
}; };
// create parquet file // create parquet file
@ -527,7 +527,7 @@ mod tests {
database_checkpoint, database_checkpoint,
time_of_first_write: Utc::now(), time_of_first_write: Utc::now(),
time_of_last_write: Utc::now(), time_of_last_write: Utc::now(),
chunk_order: 5, chunk_order: ChunkOrder::new(5),
}; };
let (path, _file_size_bytes, _metadata) = storage let (path, _file_size_bytes, _metadata) = storage

View File

@ -13,7 +13,7 @@ use arrow::{
}; };
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use data_types::{ use data_types::{
chunk_metadata::ChunkAddr, chunk_metadata::{ChunkAddr, ChunkOrder},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
server_id::ServerId, server_id::ServerId,
DatabaseName, DatabaseName,
@ -189,7 +189,7 @@ pub async fn make_chunk_given_record_batch(
database_checkpoint, database_checkpoint,
time_of_first_write: Utc.timestamp(30, 40), time_of_first_write: Utc.timestamp(30, 40),
time_of_last_write: Utc.timestamp(50, 60), time_of_last_write: Utc.timestamp(50, 60),
chunk_order: 5, chunk_order: ChunkOrder::new(5),
}; };
let (path, file_size_bytes, parquet_metadata) = storage let (path, file_size_bytes, parquet_metadata) = storage
.write_to_object_store(addr.clone(), stream, metadata) .write_to_object_store(addr.clone(), stream, metadata)

View File

@ -9,7 +9,7 @@
)] )]
use data_types::{ use data_types::{
chunk_metadata::ChunkSummary, chunk_metadata::{ChunkOrder, ChunkSummary},
partition_metadata::{InfluxDbType, TableSummary}, partition_metadata::{InfluxDbType, TableSummary},
}; };
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
@ -152,7 +152,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync {
fn chunk_type(&self) -> &str; fn chunk_type(&self) -> &str;
/// Order of this chunk relative to other overlapping chunks. /// Order of this chunk relative to other overlapping chunks.
fn order(&self) -> u32; fn order(&self) -> ChunkOrder;
} }
/// Implement ChunkMeta for something wrapped in an Arc (like Chunks often are) /// Implement ChunkMeta for something wrapped in an Arc (like Chunks often are)

View File

@ -13,6 +13,7 @@ use arrow::{
datatypes::{DataType, Int32Type, TimeUnit}, datatypes::{DataType, Int32Type, TimeUnit},
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use data_types::chunk_metadata::ChunkOrder;
use data_types::{ use data_types::{
chunk_metadata::ChunkSummary, chunk_metadata::ChunkSummary,
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics, TableSummary},
@ -173,7 +174,7 @@ pub struct TestChunk {
predicate_match: Option<PredicateMatch>, predicate_match: Option<PredicateMatch>,
/// Order of this chunk relative to other overlapping chunks. /// Order of this chunk relative to other overlapping chunks.
order: u32, order: ChunkOrder,
} }
/// Implements a method for adding a column with default stats /// Implements a method for adding a column with default stats
@ -247,7 +248,7 @@ impl TestChunk {
table_data: Default::default(), table_data: Default::default(),
saved_error: Default::default(), saved_error: Default::default(),
predicate_match: Default::default(), predicate_match: Default::default(),
order: Default::default(), order: ChunkOrder::new(0),
} }
} }
@ -892,7 +893,7 @@ impl QueryChunk for TestChunk {
Ok(Some(column_names)) Ok(Some(column_names))
} }
fn order(&self) -> u32 { fn order(&self) -> ChunkOrder {
self.order self.order
} }
} }

View File

@ -20,7 +20,7 @@ use std::{
use ::lifecycle::{LifecycleChunk, LockableChunk, LockablePartition}; use ::lifecycle::{LifecycleChunk, LockableChunk, LockablePartition};
use data_types::{ use data_types::{
chunk_metadata::ChunkSummary, chunk_metadata::{ChunkOrder, ChunkSummary},
database_rules::DatabaseRules, database_rules::DatabaseRules,
partition_metadata::{PartitionSummary, TableSummary}, partition_metadata::{PartitionSummary, TableSummary},
server_id::ServerId, server_id::ServerId,
@ -579,7 +579,7 @@ impl Db {
table_name: &str, table_name: &str,
partition_key: &str, partition_key: &str,
chunk_id: u32, chunk_id: u32,
) -> catalog::Result<(Arc<tracker::RwLock<CatalogChunk>>, u32)> { ) -> catalog::Result<(Arc<tracker::RwLock<CatalogChunk>>, ChunkOrder)> {
self.catalog.chunk(table_name, partition_key, chunk_id) self.catalog.chunk(table_name, partition_key, chunk_id)
} }
@ -3110,7 +3110,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1), time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None, time_closed: None,
order: 5, order: ChunkOrder::new(5),
}]; }];
let size: usize = db let size: usize = db
@ -3401,7 +3401,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1), time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None, time_closed: None,
order: 5, order: ChunkOrder::new(5),
}, },
ChunkSummary { ChunkSummary {
partition_key: Arc::from("1970-01-05T15"), partition_key: Arc::from("1970-01-05T15"),
@ -3416,7 +3416,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1), time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None, time_closed: None,
order: 6, order: ChunkOrder::new(6),
}, },
ChunkSummary { ChunkSummary {
partition_key: Arc::from("1970-01-05T15"), partition_key: Arc::from("1970-01-05T15"),
@ -3431,7 +3431,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1), time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(1), time_of_last_write: Utc.timestamp_nanos(1),
time_closed: None, time_closed: None,
order: 5, order: ChunkOrder::new(5),
}, },
]; ];

View File

@ -2,6 +2,7 @@
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::sync::Arc; use std::sync::Arc;
use data_types::chunk_metadata::ChunkOrder;
use hashbrown::{HashMap, HashSet}; use hashbrown::{HashMap, HashSet};
use data_types::chunk_metadata::ChunkSummary; use data_types::chunk_metadata::ChunkSummary;
@ -162,7 +163,7 @@ impl Catalog {
table_name: impl AsRef<str>, table_name: impl AsRef<str>,
partition_key: impl AsRef<str>, partition_key: impl AsRef<str>,
chunk_id: u32, chunk_id: u32,
) -> Result<(Arc<RwLock<CatalogChunk>>, u32)> { ) -> Result<(Arc<RwLock<CatalogChunk>>, ChunkOrder)> {
let table_name = table_name.as_ref(); let table_name = table_name.as_ref();
let partition_key = partition_key.as_ref(); let partition_key = partition_key.as_ref();

View File

@ -5,8 +5,8 @@ use snafu::Snafu;
use data_types::{ use data_types::{
chunk_metadata::{ chunk_metadata::{
ChunkAddr, ChunkColumnSummary, ChunkLifecycleAction, ChunkStorage, ChunkSummary, ChunkAddr, ChunkColumnSummary, ChunkLifecycleAction, ChunkOrder, ChunkStorage,
DetailedChunkSummary, ChunkSummary, DetailedChunkSummary,
}, },
instant::to_approximate_datetime, instant::to_approximate_datetime,
partition_metadata::TableSummary, partition_metadata::TableSummary,
@ -221,7 +221,7 @@ pub struct CatalogChunk {
time_closed: Option<DateTime<Utc>>, time_closed: Option<DateTime<Utc>>,
/// Order of this chunk relative to other overlapping chunks. /// Order of this chunk relative to other overlapping chunks.
order: u32, order: ChunkOrder,
} }
macro_rules! unexpected_state { macro_rules! unexpected_state {
@ -275,7 +275,7 @@ impl CatalogChunk {
chunk: mutable_buffer::chunk::MBChunk, chunk: mutable_buffer::chunk::MBChunk,
time_of_write: DateTime<Utc>, time_of_write: DateTime<Utc>,
metrics: ChunkMetrics, metrics: ChunkMetrics,
order: u32, order: ChunkOrder,
) -> Self { ) -> Self {
assert_eq!(chunk.table_name(), &addr.table_name); assert_eq!(chunk.table_name(), &addr.table_name);
@ -308,7 +308,7 @@ impl CatalogChunk {
schema: Arc<Schema>, schema: Arc<Schema>,
metrics: ChunkMetrics, metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>, delete_predicates: Arc<Vec<Predicate>>,
order: u32, order: ChunkOrder,
) -> Self { ) -> Self {
let stage = ChunkStage::Frozen { let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata { meta: Arc::new(ChunkMetadata {
@ -343,7 +343,7 @@ impl CatalogChunk {
time_of_last_write: DateTime<Utc>, time_of_last_write: DateTime<Utc>,
metrics: ChunkMetrics, metrics: ChunkMetrics,
delete_predicates: Arc<Vec<Predicate>>, delete_predicates: Arc<Vec<Predicate>>,
order: u32, order: ChunkOrder,
) -> Self { ) -> Self {
assert_eq!(chunk.table_name(), addr.table_name.as_ref()); assert_eq!(chunk.table_name(), addr.table_name.as_ref());
@ -422,7 +422,7 @@ impl CatalogChunk {
self.time_closed self.time_closed
} }
pub fn order(&self) -> u32 { pub fn order(&self) -> ChunkOrder {
self.order self.order
} }
@ -1231,7 +1231,7 @@ mod tests {
mb_chunk, mb_chunk,
time_of_write, time_of_write,
ChunkMetrics::new_unregistered(), ChunkMetrics::new_unregistered(),
5, ChunkOrder::new(5),
) )
} }
@ -1249,7 +1249,7 @@ mod tests {
now, now,
ChunkMetrics::new_unregistered(), ChunkMetrics::new_unregistered(),
Arc::new(vec![] as Vec<Predicate>), Arc::new(vec![] as Vec<Predicate>),
6, ChunkOrder::new(6),
) )
} }
} }

View File

@ -4,7 +4,7 @@ use super::chunk::{CatalogChunk, ChunkStage, Error as ChunkError};
use crate::db::catalog::metrics::PartitionMetrics; use crate::db::catalog::metrics::PartitionMetrics;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkSummary}, chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkOrder, ChunkSummary},
partition_metadata::{PartitionAddr, PartitionSummary}, partition_metadata::{PartitionAddr, PartitionSummary},
}; };
use internal_types::schema::Schema; use internal_types::schema::Schema;
@ -52,7 +52,7 @@ pub struct Partition {
/// The chunks that make up this partition, indexed by id. /// The chunks that make up this partition, indexed by id.
// //
// Alongside the chunk we also store its order. // Alongside the chunk we also store its order.
chunks: BTreeMap<u32, (u32, Arc<RwLock<CatalogChunk>>)>, chunks: BTreeMap<u32, (ChunkOrder, Arc<RwLock<CatalogChunk>>)>,
/// When this partition was created /// When this partition was created
created_at: DateTime<Utc>, created_at: DateTime<Utc>,
@ -71,7 +71,7 @@ pub struct Partition {
persistence_windows: Option<PersistenceWindows>, persistence_windows: Option<PersistenceWindows>,
/// Tracks next chunk order in this partition. /// Tracks next chunk order in this partition.
next_chunk_order: u32, next_chunk_order: ChunkOrder,
} }
impl Partition { impl Partition {
@ -89,7 +89,7 @@ impl Partition {
next_chunk_id: 0, next_chunk_id: 0,
metrics: Arc::new(metrics), metrics: Arc::new(metrics),
persistence_windows: None, persistence_windows: None,
next_chunk_order: 0, next_chunk_order: ChunkOrder::new(0),
} }
} }
@ -142,7 +142,7 @@ impl Partition {
assert_eq!(chunk.table_name().as_ref(), self.table_name()); assert_eq!(chunk.table_name().as_ref(), self.table_name());
let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow"); let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow");
let chunk_order = Self::pick_next(&mut self.next_chunk_order, "Chunk Order Overflow"); let chunk_order = self.next_chunk_order();
let addr = ChunkAddr::new(&self.addr, chunk_id); let addr = ChunkAddr::new(&self.addr, chunk_id);
@ -177,7 +177,7 @@ impl Partition {
time_of_last_write: DateTime<Utc>, time_of_last_write: DateTime<Utc>,
schema: Arc<Schema>, schema: Arc<Schema>,
delete_predicates: Arc<Vec<Predicate>>, delete_predicates: Arc<Vec<Predicate>>,
chunk_order: u32, chunk_order: ChunkOrder,
) -> (u32, Arc<RwLock<CatalogChunk>>) { ) -> (u32, Arc<RwLock<CatalogChunk>>) {
let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow"); let chunk_id = Self::pick_next(&mut self.next_chunk_id, "Chunk ID Overflow");
assert!( assert!(
@ -232,7 +232,7 @@ impl Partition {
time_of_first_write: DateTime<Utc>, time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>, time_of_last_write: DateTime<Utc>,
delete_predicates: Arc<Vec<Predicate>>, delete_predicates: Arc<Vec<Predicate>>,
chunk_order: u32, chunk_order: ChunkOrder,
) -> Arc<RwLock<CatalogChunk>> { ) -> Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name(), self.table_name()); assert_eq!(chunk.table_name(), self.table_name());
@ -257,9 +257,7 @@ impl Partition {
self.next_chunk_id = self self.next_chunk_id = self
.next_chunk_id .next_chunk_id
.max(chunk_id.checked_add(1).expect("Chunk ID Overflow")); .max(chunk_id.checked_add(1).expect("Chunk ID Overflow"));
self.next_chunk_order = self self.next_chunk_order = self.next_chunk_order.max(chunk_order.next());
.next_chunk_order
.max(chunk_order.checked_add(1).expect("Chunk Order Overflow"));
vacant.insert((chunk_order, Arc::clone(&chunk))); vacant.insert((chunk_order, Arc::clone(&chunk)));
chunk chunk
@ -311,7 +309,7 @@ impl Partition {
} }
/// Return an immutable chunk and its order reference by chunk id. /// Return an immutable chunk and its order reference by chunk id.
pub fn chunk(&self, chunk_id: u32) -> Option<(&Arc<RwLock<CatalogChunk>>, u32)> { pub fn chunk(&self, chunk_id: u32) -> Option<(&Arc<RwLock<CatalogChunk>>, ChunkOrder)> {
self.chunks self.chunks
.get(&chunk_id) .get(&chunk_id)
.map(|(order, chunk)| (chunk, *order)) .map(|(order, chunk)| (chunk, *order))
@ -330,7 +328,7 @@ impl Partition {
/// Return chunks in this partition with their order and ids. /// Return chunks in this partition with their order and ids.
/// ///
/// Note that chunks are guaranteed ordered by chunk order and ID. /// Note that chunks are guaranteed ordered by chunk order and ID.
pub fn keyed_chunks(&self) -> Vec<(u32, u32, &Arc<RwLock<CatalogChunk>>)> { pub fn keyed_chunks(&self) -> Vec<(u32, ChunkOrder, &Arc<RwLock<CatalogChunk>>)> {
let mut chunks: Vec<_> = self let mut chunks: Vec<_> = self
.chunks .chunks
.iter() .iter()
@ -386,6 +384,12 @@ impl Partition {
.as_ref() .as_ref()
.map(|persistence_windows| persistence_windows.sequencer_numbers()) .map(|persistence_windows| persistence_windows.sequencer_numbers())
} }
fn next_chunk_order(&mut self) -> ChunkOrder {
let res = self.next_chunk_order;
self.next_chunk_order = self.next_chunk_order.next();
res
}
} }
impl Display for Partition { impl Display for Partition {

View File

@ -2,7 +2,7 @@ use super::{
catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream, catalog::chunk::ChunkMetadata, pred::to_read_buffer_predicate, streams::ReadFilterResultsStream,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::partition_metadata; use data_types::{chunk_metadata::ChunkOrder, partition_metadata};
use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_util::MemoryStream; use datafusion_util::MemoryStream;
use internal_types::{ use internal_types::{
@ -85,7 +85,7 @@ pub struct DbChunk {
meta: Arc<ChunkMetadata>, meta: Arc<ChunkMetadata>,
time_of_first_write: DateTime<Utc>, time_of_first_write: DateTime<Utc>,
time_of_last_write: DateTime<Utc>, time_of_last_write: DateTime<Utc>,
order: u32, order: ChunkOrder,
} }
#[derive(Debug)] #[derive(Debug)]
@ -491,7 +491,7 @@ impl QueryChunk for DbChunk {
} }
} }
fn order(&self) -> u32 { fn order(&self) -> ChunkOrder {
self.order self.order
} }
} }

View File

@ -6,7 +6,7 @@ use crate::{
use ::lifecycle::LifecycleDb; use ::lifecycle::LifecycleDb;
use chrono::{DateTime, TimeZone, Utc}; use chrono::{DateTime, TimeZone, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkStorage}, chunk_metadata::{ChunkAddr, ChunkLifecycleAction, ChunkOrder, ChunkStorage},
database_rules::LifecycleRules, database_rules::LifecycleRules,
error::ErrorLogger, error::ErrorLogger,
job::Job, job::Job,
@ -65,7 +65,7 @@ pub struct LockableCatalogChunk {
pub db: Arc<Db>, pub db: Arc<Db>,
pub chunk: Arc<RwLock<CatalogChunk>>, pub chunk: Arc<RwLock<CatalogChunk>>,
pub id: u32, pub id: u32,
pub order: u32, pub order: ChunkOrder,
} }
impl LockableChunk for LockableCatalogChunk { impl LockableChunk for LockableCatalogChunk {
@ -105,7 +105,7 @@ impl LockableChunk for LockableCatalogChunk {
self.id self.id
} }
fn order(&self) -> u32 { fn order(&self) -> ChunkOrder {
self.order self.order
} }
} }
@ -348,10 +348,6 @@ impl LifecycleChunk for CatalogChunk {
fn row_count(&self) -> usize { fn row_count(&self) -> usize {
self.storage().0 self.storage().0
} }
fn order(&self) -> u32 {
self.order()
}
} }
/// Executes a plan and collects the results into a read buffer chunk /// Executes a plan and collects the results into a read buffer chunk

View File

@ -7,7 +7,7 @@ use crate::db::{
DbChunk, DbChunk,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::job::Job; use data_types::{chunk_metadata::ChunkOrder, job::Job};
use lifecycle::LifecycleWriteGuard; use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info; use observability_deps::tracing::info;
use query::{ use query::{
@ -48,7 +48,7 @@ pub(crate) fn compact_chunks(
let mut time_of_first_write: Option<DateTime<Utc>> = None; let mut time_of_first_write: Option<DateTime<Utc>> = None;
let mut time_of_last_write: Option<DateTime<Utc>> = None; let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut delete_predicates: Vec<Predicate> = vec![]; let mut delete_predicates: Vec<Predicate> = vec![];
let mut min_order = u32::MAX; let mut min_order = ChunkOrder::MAX;
let query_chunks = chunks let query_chunks = chunks
.into_iter() .into_iter()
.map(|mut chunk| { .map(|mut chunk| {

View File

@ -7,7 +7,7 @@ use crate::db::{
DbChunk, DbChunk,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::job::Job; use data_types::{chunk_metadata::ChunkOrder, job::Job};
use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition}; use lifecycle::{LifecycleWriteGuard, LockableChunk, LockablePartition};
use observability_deps::tracing::info; use observability_deps::tracing::info;
use persistence_windows::persistence_windows::FlushHandle; use persistence_windows::persistence_windows::FlushHandle;
@ -55,7 +55,7 @@ pub fn persist_chunks(
let mut time_of_last_write: Option<DateTime<Utc>> = None; let mut time_of_last_write: Option<DateTime<Utc>> = None;
let mut query_chunks = vec![]; let mut query_chunks = vec![];
let mut delete_predicates: Vec<Predicate> = vec![]; let mut delete_predicates: Vec<Predicate> = vec![];
let mut min_order = u32::MAX; let mut min_order = ChunkOrder::MAX;
for mut chunk in chunks { for mut chunk in chunks {
// Sanity-check // Sanity-check
assert!(Arc::ptr_eq(&db, &chunk.data().db)); assert!(Arc::ptr_eq(&db, &chunk.data().db));

View File

@ -116,7 +116,7 @@ fn from_chunk_summaries(schema: SchemaRef, chunks: Vec<ChunkSummary>) -> Result<
.collect::<TimestampNanosecondArray>(); .collect::<TimestampNanosecondArray>();
let order = chunks let order = chunks
.iter() .iter()
.map(|c| Some(c.order)) .map(|c| Some(c.order.get()))
.collect::<UInt32Array>(); .collect::<UInt32Array>();
RecordBatch::try_new( RecordBatch::try_new(
@ -144,7 +144,7 @@ mod tests {
use super::*; use super::*;
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkStorage}; use data_types::chunk_metadata::{ChunkLifecycleAction, ChunkOrder, ChunkStorage};
#[test] #[test]
fn test_from_chunk_summaries() { fn test_from_chunk_summaries() {
@ -162,7 +162,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(10_000_000_000), time_of_first_write: Utc.timestamp_nanos(10_000_000_000),
time_of_last_write: Utc.timestamp_nanos(10_000_000_000), time_of_last_write: Utc.timestamp_nanos(10_000_000_000),
time_closed: None, time_closed: None,
order: 5, order: ChunkOrder::new(5),
}, },
ChunkSummary { ChunkSummary {
partition_key: Arc::from("p1"), partition_key: Arc::from("p1"),
@ -177,7 +177,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(80_000_000_000), time_of_first_write: Utc.timestamp_nanos(80_000_000_000),
time_of_last_write: Utc.timestamp_nanos(80_000_000_000), time_of_last_write: Utc.timestamp_nanos(80_000_000_000),
time_closed: None, time_closed: None,
order: 6, order: ChunkOrder::new(6),
}, },
ChunkSummary { ChunkSummary {
partition_key: Arc::from("p1"), partition_key: Arc::from("p1"),
@ -192,7 +192,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(100_000_000_000), time_of_first_write: Utc.timestamp_nanos(100_000_000_000),
time_of_last_write: Utc.timestamp_nanos(200_000_000_000), time_of_last_write: Utc.timestamp_nanos(200_000_000_000),
time_closed: None, time_closed: None,
order: 7, order: ChunkOrder::new(7),
}, },
]; ];

View File

@ -220,7 +220,7 @@ mod tests {
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ChunkColumnSummary, ChunkStorage, ChunkSummary}, chunk_metadata::{ChunkColumnSummary, ChunkOrder, ChunkStorage, ChunkSummary},
partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}, partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics},
}; };
@ -321,7 +321,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1), time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2), time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None, time_closed: None,
order: 5, order: ChunkOrder::new(5),
}, },
columns: vec![ columns: vec![
ChunkColumnSummary { ChunkColumnSummary {
@ -358,7 +358,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1), time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2), time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None, time_closed: None,
order: 6, order: ChunkOrder::new(6),
}, },
columns: vec![ChunkColumnSummary { columns: vec![ChunkColumnSummary {
name: "c1".into(), name: "c1".into(),
@ -389,7 +389,7 @@ mod tests {
time_of_first_write: Utc.timestamp_nanos(1), time_of_first_write: Utc.timestamp_nanos(1),
time_of_last_write: Utc.timestamp_nanos(2), time_of_last_write: Utc.timestamp_nanos(2),
time_closed: None, time_closed: None,
order: 5, order: ChunkOrder::new(5),
}, },
columns: vec![ChunkColumnSummary { columns: vec![ChunkColumnSummary {
name: "c3".into(), name: "c3".into(),