From e64ac10c407e96294ecefbd6aceb6499923b0b00 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 23 Nov 2021 15:32:46 -0500 Subject: [PATCH] refactor: address review comments and compute max checkpoints --- server/src/db/catalog/chunk.rs | 4 +- server/src/db/catalog/partition.rs | 102 +++++++----------- .../src/db/lifecycle/compact_object_store.rs | 68 ++++++------ server/src/db/lifecycle/error.rs | 8 ++ 4 files changed, 86 insertions(+), 96 deletions(-) diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 3999a895a4..8264be2435 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -733,11 +733,11 @@ impl CatalogChunk { } /// Return parquet chunk if it is persisted - pub fn parquet_chunk(&self) -> Option> { + pub fn parquet_chunk(&self) -> Option<&Arc> { match &self.stage { ChunkStage::Persisted { meta: _, parquet, .. - } => Some(Arc::clone(parquet)), + } => Some(parquet), _ => None, } } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 0f7d150e09..12eb9e3311 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -17,6 +17,7 @@ use snafu::{OptionExt, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, fmt::Display, + ops::RangeInclusive, sync::Arc, }; use time::{Time, TimeProvider}; @@ -220,7 +221,7 @@ impl Partition { self.create_open_chunk_with_specified_id_order(chunk, chunk_id, chunk_order) } - pub fn create_open_chunk_with_specified_id_order( + fn create_open_chunk_with_specified_id_order( &mut self, chunk: mutable_buffer::MBChunk, chunk_id: ChunkId, @@ -389,36 +390,16 @@ impl Partition { pub fn contiguous_chunks( &self, chunk_ids: &BTreeSet, - chunk_orders: &BTreeSet, + chunk_orders: &RangeInclusive, ) -> Result { - if chunk_orders.is_empty() { + if chunk_ids.is_empty() { return Ok(true); } - - // First and last order in the chunk_orders for comparison - let first_order_element = chunk_orders.iter().next(); - let first_order = match first_order_element { - Some(first_order) => first_order, - _ => { - return ContiguousCheck {}.fail(); - } - }; - let last_order_element = chunk_orders.iter().rev().next(); - let last_order = match last_order_element { - Some(last_order) => last_order, - _ => { - return ContiguousCheck {}.fail(); - } - }; - - let chunks = self.chunks(); - for chunk in chunks { - let chunk = chunk.read(); - let order = chunk.order(); - let id = chunk.id(); + let chunk_keys = self.keyed_chunks(); + for (id, order, ..) in chunk_keys { // this chunk's order is in the middle of the given orders but does // not belong to their chunks - if order >= *first_order && order <= *last_order && !chunk_ids.contains(&id) { + if chunk_orders.contains(&order) && !chunk_ids.contains(&id) { return Ok(false); } } @@ -543,10 +524,10 @@ mod tests { let partition = make_partitition_for_chunks_with_ids_orders(&id_orders); let ids = BTreeSet::new(); - let orders = BTreeSet::new(); + let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN); // contiguous - assert!(partition.contiguous_chunks(&ids, &orders).unwrap()); + assert!(partition.contiguous_chunks(&ids, &order_range).unwrap()); } #[test] @@ -556,25 +537,26 @@ mod tests { let partition = make_partitition_for_chunks_with_ids_orders(&id_orders); let mut ids = BTreeSet::new(); - let mut orders = BTreeSet::new(); + let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN); // no chunks provided // --> contiguous - assert!(partition.contiguous_chunks(&ids, &orders).unwrap()); + assert!(partition.contiguous_chunks(&ids, &order_range).unwrap()); // provide itself // --> contiguous ids.insert(ChunkId::new_test(1)); - orders.insert(ChunkOrder::new(1).unwrap()); - assert!(partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap()); + assert!(partition.contiguous_chunks(&ids, &order_range).unwrap()); // provide same order, different id - // --> not contiguos + // --> not contiguous let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(2)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(1).unwrap()); - assert!(!partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap()); + assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap()); } #[test] @@ -587,28 +569,25 @@ mod tests { let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(1)); ids.insert(ChunkId::new_test(2)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(1).unwrap()); - orders.insert(ChunkOrder::new(2).unwrap()); - assert!(partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap()); + assert!(partition.contiguous_chunks(&ids, &order_range).unwrap()); // (3,3) and (2,2) are contiguous let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(3)); ids.insert(ChunkId::new_test(2)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(3).unwrap()); - orders.insert(ChunkOrder::new(2).unwrap()); - assert!(partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(3).unwrap()); + assert!(partition.contiguous_chunks(&ids, &order_range).unwrap()); // (3,3) and (1,1) are NOT contiguous let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(3)); ids.insert(ChunkId::new_test(1)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(3).unwrap()); - orders.insert(ChunkOrder::new(1).unwrap()); - assert!(!partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(3).unwrap()); + assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap()); } #[test] @@ -620,36 +599,33 @@ mod tests { // (1,1) is contiguous let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(1)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(1).unwrap()); - assert!(partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap()); + assert!(partition.contiguous_chunks(&ids, &order_range).unwrap()); // (2,2) and (3,2) are contiguous let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(2)); ids.insert(ChunkId::new_test(3)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(3).unwrap()); - orders.insert(ChunkOrder::new(2).unwrap()); - assert!(partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(2).unwrap()); + assert!(partition.contiguous_chunks(&ids, &order_range).unwrap()); // (1,1) and (2,2) are NOT contiguous because there is chunk (3, 2) with the same order 2 let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(1)); ids.insert(ChunkId::new_test(2)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(1).unwrap()); - orders.insert(ChunkOrder::new(2).unwrap()); - assert!(!partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap()); + assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap()); // (3,2) and (1,1) are NOT contiguous let mut ids = BTreeSet::new(); ids.insert(ChunkId::new_test(3)); ids.insert(ChunkId::new_test(1)); - let mut orders = BTreeSet::new(); - orders.insert(ChunkOrder::new(2).unwrap()); - orders.insert(ChunkOrder::new(1).unwrap()); - assert!(!partition.contiguous_chunks(&ids, &orders).unwrap()); + let order_range = + RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap()); + assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap()); } fn make_partitition_for_chunks_with_ids_orders(id_orders: &[(u128, u32)]) -> Partition { diff --git a/server/src/db/lifecycle/compact_object_store.rs b/server/src/db/lifecycle/compact_object_store.rs index 79c6868a42..f2baa68f5a 100644 --- a/server/src/db/lifecycle/compact_object_store.rs +++ b/server/src/db/lifecycle/compact_object_store.rs @@ -2,8 +2,8 @@ use super::{ error::{ - ChunksNotContiguous, ChunksNotInPartition, ChunksNotPersisted, EmptyChunks, - ParquetChunkError, ParquetMetaRead, WritingToObjectStore, + ChunksNotContiguous, ChunksNotInPartition, ChunksNotPersisted, ComparePartitionCheckpoint, + EmptyChunks, NoCheckpoint, ParquetChunkError, ParquetMetaRead, WritingToObjectStore, }, LockableCatalogChunk, LockableCatalogPartition, Result, }; @@ -36,7 +36,9 @@ use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, use schema::Schema; use snafu::ResultExt; use std::{ + cmp::Ordering, collections::{BTreeSet, HashSet}, + ops::RangeInclusive, sync::Arc, }; use time::Time; @@ -188,9 +190,10 @@ fn mark_chunks_to_compact( let mut input_rows = 0; let mut delete_predicates: HashSet> = HashSet::new(); let mut min_order = ChunkOrder::MAX; + let mut max_order = ChunkOrder::MIN; - let mut db_ckpts = vec![]; - let mut part_ckpts = vec![]; + let mut database_checkpoint: Option = None; + let mut partition_checkpoint: Option = None; let os_chunks = chunks .into_iter() @@ -218,6 +221,7 @@ fn mark_chunks_to_compact( delete_predicates.extend(chunk.delete_predicates().iter().cloned()); min_order = min_order.min(chunk.order()); + max_order = max_order.max(chunk.order()); chunk_orders.insert(chunk.order()); chunk_ids.insert(chunk.id()); @@ -230,8 +234,27 @@ fn mark_chunks_to_compact( .read_iox_metadata() .context(ParquetMetaRead)?; - db_ckpts.push(iox_metadata.database_checkpoint); - part_ckpts.push(iox_metadata.partition_checkpoint); + // fold all database_checkpoints into one for the compacting chunk + if let Some(db_ckpt) = &mut database_checkpoint { + db_ckpt.fold(&iox_metadata.database_checkpoint); + } else { + database_checkpoint = Some(iox_metadata.database_checkpoint); + } + + // keep max partition_checkpoint for the compacting chunk + if let Some(part_ckpt) = &partition_checkpoint { + match part_ckpt.partial_cmp(&iox_metadata.partition_checkpoint) { + None => { + return ComparePartitionCheckpoint {}.fail(); + } + Some(Ordering::Less) => { + partition_checkpoint = Some(iox_metadata.partition_checkpoint); + } + _ => {} + } + } else { + partition_checkpoint = Some(iox_metadata.partition_checkpoint); + } } else { return ChunksNotPersisted {}.fail(); } @@ -243,15 +266,18 @@ fn mark_chunks_to_compact( }) .collect::>>()?; + if database_checkpoint.is_none() || partition_checkpoint.is_none() { + return NoCheckpoint {}.fail(); + } + let database_checkpoint = database_checkpoint.unwrap(); + let partition_checkpoint = partition_checkpoint.unwrap(); + // Verify if all the provided chunks are contiguous - if !partition.contiguous_chunks(&chunk_ids, &chunk_orders)? { + let order_range = RangeInclusive::new(min_order, max_order); + if !partition.contiguous_chunks(&chunk_ids, &order_range)? { return ChunksNotContiguous {}.fail(); } - // Compute checkpoints for the creating chunk - let (database_checkpoint, partition_checkpoint) = - compute_checkpoints(&partition_addr, &db_ckpts, &part_ckpts); - // drop partition lock std::mem::drop(partition); @@ -281,26 +307,6 @@ struct CompactingOsChunks { partition_checkpoint: PartitionCheckpoint, } -/// Check points of this creating chunk will have min range None and max range is the max of all max -// Todo: ask Marco and Raphael: How about sequencer_number of there are may of them? -fn compute_checkpoints( - partition_addr: &PartitionAddr, - _db_ckpts: &[DatabaseCheckpoint], - _part_ckpts: &[PartitionCheckpoint], -) -> (DatabaseCheckpoint, PartitionCheckpoint) { - let database_checkpoint = DatabaseCheckpoint::new(Default::default()); - let partition_checkpoint = PartitionCheckpoint::new( - Arc::clone(&partition_addr.table_name), - Arc::clone(&partition_addr.partition_key), - Default::default(), - Time::from_timestamp_nanos(0), - ); - - // todo - - (database_checkpoint, partition_checkpoint) -} - /// Create query plan to compact the given DbChunks and return its output stream /// Return: /// . stream of output record batch of the scanned chunks Result diff --git a/server/src/db/lifecycle/error.rs b/server/src/db/lifecycle/error.rs index bf8fcd799d..f2fde81a7a 100644 --- a/server/src/db/lifecycle/error.rs +++ b/server/src/db/lifecycle/error.rs @@ -84,6 +84,14 @@ pub enum Error { ParquetMetaRead { source: parquet_file::metadata::Error, }, + + #[snafu(display( + "Cannot compact chunks because of error during computing max partition checkpoint" + ))] + ComparePartitionCheckpoint {}, + + #[snafu(display("Cannot compact chunks because no checkpoint was computed"))] + NoCheckpoint {}, } pub type Result = std::result::Result;