refactor: address review comments and compute max checkpoints

pull/24376/head
Nga Tran 2021-11-23 15:32:46 -05:00
parent 124f5bb4c5
commit e64ac10c40
4 changed files with 86 additions and 96 deletions

View File

@ -733,11 +733,11 @@ impl CatalogChunk {
}
/// Return parquet chunk if it is persisted
pub fn parquet_chunk(&self) -> Option<Arc<ParquetChunk>> {
pub fn parquet_chunk(&self) -> Option<&Arc<ParquetChunk>> {
match &self.stage {
ChunkStage::Persisted {
meta: _, parquet, ..
} => Some(Arc::clone(parquet)),
} => Some(parquet),
_ => None,
}
}

View File

@ -17,6 +17,7 @@ use snafu::{OptionExt, Snafu};
use std::{
collections::{BTreeMap, BTreeSet},
fmt::Display,
ops::RangeInclusive,
sync::Arc,
};
use time::{Time, TimeProvider};
@ -220,7 +221,7 @@ impl Partition {
self.create_open_chunk_with_specified_id_order(chunk, chunk_id, chunk_order)
}
pub fn create_open_chunk_with_specified_id_order(
fn create_open_chunk_with_specified_id_order(
&mut self,
chunk: mutable_buffer::MBChunk,
chunk_id: ChunkId,
@ -389,36 +390,16 @@ impl Partition {
pub fn contiguous_chunks(
&self,
chunk_ids: &BTreeSet<ChunkId>,
chunk_orders: &BTreeSet<ChunkOrder>,
chunk_orders: &RangeInclusive<ChunkOrder>,
) -> Result<bool> {
if chunk_orders.is_empty() {
if chunk_ids.is_empty() {
return Ok(true);
}
// First and last order in the chunk_orders for comparison
let first_order_element = chunk_orders.iter().next();
let first_order = match first_order_element {
Some(first_order) => first_order,
_ => {
return ContiguousCheck {}.fail();
}
};
let last_order_element = chunk_orders.iter().rev().next();
let last_order = match last_order_element {
Some(last_order) => last_order,
_ => {
return ContiguousCheck {}.fail();
}
};
let chunks = self.chunks();
for chunk in chunks {
let chunk = chunk.read();
let order = chunk.order();
let id = chunk.id();
let chunk_keys = self.keyed_chunks();
for (id, order, ..) in chunk_keys {
// this chunk's order is in the middle of the given orders but does
// not belong to their chunks
if order >= *first_order && order <= *last_order && !chunk_ids.contains(&id) {
if chunk_orders.contains(&order) && !chunk_ids.contains(&id) {
return Ok(false);
}
}
@ -543,10 +524,10 @@ mod tests {
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
let ids = BTreeSet::new();
let orders = BTreeSet::new();
let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN);
// contiguous
assert!(partition.contiguous_chunks(&ids, &orders).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
}
#[test]
@ -556,25 +537,26 @@ mod tests {
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
let mut ids = BTreeSet::new();
let mut orders = BTreeSet::new();
let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN);
// no chunks provided
// --> contiguous
assert!(partition.contiguous_chunks(&ids, &orders).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// provide itself
// --> contiguous
ids.insert(ChunkId::new_test(1));
orders.insert(ChunkOrder::new(1).unwrap());
assert!(partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// provide same order, different id
// --> not contiguos
// --> not contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(2));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(1).unwrap());
assert!(!partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
}
#[test]
@ -587,28 +569,25 @@ mod tests {
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(1));
ids.insert(ChunkId::new_test(2));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(1).unwrap());
orders.insert(ChunkOrder::new(2).unwrap());
assert!(partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (3,3) and (2,2) are contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(3));
ids.insert(ChunkId::new_test(2));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(3).unwrap());
orders.insert(ChunkOrder::new(2).unwrap());
assert!(partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(3).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (3,3) and (1,1) are NOT contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(3));
ids.insert(ChunkId::new_test(1));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(3).unwrap());
orders.insert(ChunkOrder::new(1).unwrap());
assert!(!partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(3).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
}
#[test]
@ -620,36 +599,33 @@ mod tests {
// (1,1) is contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(1));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(1).unwrap());
assert!(partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (2,2) and (3,2) are contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(2));
ids.insert(ChunkId::new_test(3));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(3).unwrap());
orders.insert(ChunkOrder::new(2).unwrap());
assert!(partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(2).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (1,1) and (2,2) are NOT contiguous because there is chunk (3, 2) with the same order 2
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(1));
ids.insert(ChunkId::new_test(2));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(1).unwrap());
orders.insert(ChunkOrder::new(2).unwrap());
assert!(!partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
// (3,2) and (1,1) are NOT contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(3));
ids.insert(ChunkId::new_test(1));
let mut orders = BTreeSet::new();
orders.insert(ChunkOrder::new(2).unwrap());
orders.insert(ChunkOrder::new(1).unwrap());
assert!(!partition.contiguous_chunks(&ids, &orders).unwrap());
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
}
fn make_partitition_for_chunks_with_ids_orders(id_orders: &[(u128, u32)]) -> Partition {

View File

@ -2,8 +2,8 @@
use super::{
error::{
ChunksNotContiguous, ChunksNotInPartition, ChunksNotPersisted, EmptyChunks,
ParquetChunkError, ParquetMetaRead, WritingToObjectStore,
ChunksNotContiguous, ChunksNotInPartition, ChunksNotPersisted, ComparePartitionCheckpoint,
EmptyChunks, NoCheckpoint, ParquetChunkError, ParquetMetaRead, WritingToObjectStore,
},
LockableCatalogChunk, LockableCatalogPartition, Result,
};
@ -36,7 +36,9 @@ use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner,
use schema::Schema;
use snafu::ResultExt;
use std::{
cmp::Ordering,
collections::{BTreeSet, HashSet},
ops::RangeInclusive,
sync::Arc,
};
use time::Time;
@ -188,9 +190,10 @@ fn mark_chunks_to_compact(
let mut input_rows = 0;
let mut delete_predicates: HashSet<Arc<DeletePredicate>> = HashSet::new();
let mut min_order = ChunkOrder::MAX;
let mut max_order = ChunkOrder::MIN;
let mut db_ckpts = vec![];
let mut part_ckpts = vec![];
let mut database_checkpoint: Option<DatabaseCheckpoint> = None;
let mut partition_checkpoint: Option<PartitionCheckpoint> = None;
let os_chunks = chunks
.into_iter()
@ -218,6 +221,7 @@ fn mark_chunks_to_compact(
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order());
max_order = max_order.max(chunk.order());
chunk_orders.insert(chunk.order());
chunk_ids.insert(chunk.id());
@ -230,8 +234,27 @@ fn mark_chunks_to_compact(
.read_iox_metadata()
.context(ParquetMetaRead)?;
db_ckpts.push(iox_metadata.database_checkpoint);
part_ckpts.push(iox_metadata.partition_checkpoint);
// fold all database_checkpoints into one for the compacting chunk
if let Some(db_ckpt) = &mut database_checkpoint {
db_ckpt.fold(&iox_metadata.database_checkpoint);
} else {
database_checkpoint = Some(iox_metadata.database_checkpoint);
}
// keep max partition_checkpoint for the compacting chunk
if let Some(part_ckpt) = &partition_checkpoint {
match part_ckpt.partial_cmp(&iox_metadata.partition_checkpoint) {
None => {
return ComparePartitionCheckpoint {}.fail();
}
Some(Ordering::Less) => {
partition_checkpoint = Some(iox_metadata.partition_checkpoint);
}
_ => {}
}
} else {
partition_checkpoint = Some(iox_metadata.partition_checkpoint);
}
} else {
return ChunksNotPersisted {}.fail();
}
@ -243,15 +266,18 @@ fn mark_chunks_to_compact(
})
.collect::<Result<Vec<_>>>()?;
if database_checkpoint.is_none() || partition_checkpoint.is_none() {
return NoCheckpoint {}.fail();
}
let database_checkpoint = database_checkpoint.unwrap();
let partition_checkpoint = partition_checkpoint.unwrap();
// Verify if all the provided chunks are contiguous
if !partition.contiguous_chunks(&chunk_ids, &chunk_orders)? {
let order_range = RangeInclusive::new(min_order, max_order);
if !partition.contiguous_chunks(&chunk_ids, &order_range)? {
return ChunksNotContiguous {}.fail();
}
// Compute checkpoints for the creating chunk
let (database_checkpoint, partition_checkpoint) =
compute_checkpoints(&partition_addr, &db_ckpts, &part_ckpts);
// drop partition lock
std::mem::drop(partition);
@ -281,26 +307,6 @@ struct CompactingOsChunks {
partition_checkpoint: PartitionCheckpoint,
}
/// Check points of this creating chunk will have min range None and max range is the max of all max
// Todo: ask Marco and Raphael: How about sequencer_number of there are may of them?
fn compute_checkpoints(
partition_addr: &PartitionAddr,
_db_ckpts: &[DatabaseCheckpoint],
_part_ckpts: &[PartitionCheckpoint],
) -> (DatabaseCheckpoint, PartitionCheckpoint) {
let database_checkpoint = DatabaseCheckpoint::new(Default::default());
let partition_checkpoint = PartitionCheckpoint::new(
Arc::clone(&partition_addr.table_name),
Arc::clone(&partition_addr.partition_key),
Default::default(),
Time::from_timestamp_nanos(0),
);
// todo
(database_checkpoint, partition_checkpoint)
}
/// Create query plan to compact the given DbChunks and return its output stream
/// Return:
/// . stream of output record batch of the scanned chunks Result<SendableRecordBatchStream>

View File

@ -84,6 +84,14 @@ pub enum Error {
ParquetMetaRead {
source: parquet_file::metadata::Error,
},
#[snafu(display(
"Cannot compact chunks because of error during computing max partition checkpoint"
))]
ComparePartitionCheckpoint {},
#[snafu(display("Cannot compact chunks because no checkpoint was computed"))]
NoCheckpoint {},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;