Merge pull request #3185 from influxdata/ntran/compact_os_blocks

feat: compute max checkpoints from chunks' IOx Metadata and tests for contiguous chunks
pull/24376/head
kodiakhq[bot] 2021-11-23 22:06:30 +00:00 committed by GitHub
commit 84fab2951e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 245 additions and 43 deletions

View File

@ -732,6 +732,16 @@ impl CatalogChunk {
} }
} }
/// Return parquet chunk if it is persisted
pub fn parquet_chunk(&self) -> Option<&Arc<ParquetChunk>> {
match &self.stage {
ChunkStage::Persisted {
meta: _, parquet, ..
} => Some(parquet),
_ => None,
}
}
/// Set the persisted chunk to be compacting /// Set the persisted chunk to be compacting
pub fn set_compacting_object_store(&mut self, registration: &TaskRegistration) -> Result<()> { pub fn set_compacting_object_store(&mut self, registration: &TaskRegistration) -> Result<()> {
match &self.stage { match &self.stage {

View File

@ -17,6 +17,7 @@ use snafu::{OptionExt, Snafu};
use std::{ use std::{
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet},
fmt::Display, fmt::Display,
ops::RangeInclusive,
sync::Arc, sync::Arc,
}; };
use time::{Time, TimeProvider}; use time::{Time, TimeProvider};
@ -39,6 +40,9 @@ pub enum Error {
#[snafu(display("creating new mutable buffer chunk failed: {}", source))] #[snafu(display("creating new mutable buffer chunk failed: {}", source))]
CreateOpenChunk { source: ChunkError }, CreateOpenChunk { source: ChunkError },
#[snafu(display("checking chunk contiguous fails"))]
ContiguousCheck {},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -211,11 +215,20 @@ impl Partition {
&mut self, &mut self,
chunk: mutable_buffer::MBChunk, chunk: mutable_buffer::MBChunk,
) -> &Arc<RwLock<CatalogChunk>> { ) -> &Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name().as_ref(), self.table_name());
let chunk_id = ChunkId::new(); let chunk_id = ChunkId::new();
let chunk_order = self.next_chunk_order(); let chunk_order = self.next_chunk_order();
self.create_open_chunk_with_specified_id_order(chunk, chunk_id, chunk_order)
}
fn create_open_chunk_with_specified_id_order(
&mut self,
chunk: mutable_buffer::MBChunk,
chunk_id: ChunkId,
chunk_order: ChunkOrder,
) -> &Arc<RwLock<CatalogChunk>> {
assert_eq!(chunk.table_name().as_ref(), self.table_name());
let addr = ChunkAddr::new(&self.addr, chunk_id); let addr = ChunkAddr::new(&self.addr, chunk_id);
let chunk = CatalogChunk::new_open( let chunk = CatalogChunk::new_open(
@ -374,31 +387,24 @@ impl Partition {
/// Return true if there are no other persisted chunks that are in the middle of /// Return true if there are no other persisted chunks that are in the middle of
/// the provided chunk orders /// the provided chunk orders
// NGA todo: There is test_compact_os_non_contiguous_chunks in pub fn contiguous_chunks(
// compact_object_store.rs to test this but I will add more unit tests right here &self,
// when PR #3167 ChunkGenerator is merged chunk_ids: &BTreeSet<ChunkId>,
pub fn contiguous_object_store_chunks(&self, chunk_orders: &BTreeSet<ChunkOrder>) -> bool { chunk_orders: &RangeInclusive<ChunkOrder>,
// Last order in the chunk_orders for comparison ) -> Result<bool> {
let last_order_element = chunk_orders.iter().rev().next(); if chunk_ids.is_empty() {
let last_order = match last_order_element { return Ok(true);
Some(last_order) => last_order, }
None => { let chunk_keys = self.keyed_chunks();
return true; for (id, order, ..) in chunk_keys {
} // provided chunk_orders is empty // this chunk's order is in the middle of the given orders but does
}; // not belong to their chunks
if chunk_orders.contains(&order) && !chunk_ids.contains(&id) {
return Ok(false);
}
}
let chunks = self.chunks(); Ok(true)
for chunk in chunks {
let chunk = chunk.read();
if chunk.is_persisted() {
let order = chunk.order();
// this chunk does not belong to chunk_orders but in the middle of them
if !chunk_orders.contains(&order) && order < *last_order {
return false;
}
}
}
true
} }
/// Return a PartitionSummary for this partition. If the partition /// Return a PartitionSummary for this partition. If the partition
@ -511,6 +517,145 @@ mod tests {
assert_eq!(ids, expected_ids); assert_eq!(ids, expected_ids);
} }
#[test]
fn test_contiguous_chunks_empty() {
// create a partition without chunks
let id_orders = vec![];
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
let ids = BTreeSet::new();
let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN);
// contiguous
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
}
#[test]
fn test_contiguous_chunks_one_chunk() {
// create a partition with one chunk
let id_orders = vec![(1, 1)];
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
let mut ids = BTreeSet::new();
let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN);
// no chunks provided
// --> contiguous
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// provide itself
// --> contiguous
ids.insert(ChunkId::new_test(1));
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// provide same order, different id
// --> not contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(2));
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
}
#[test]
fn test_contiguous_chunks_three_chunk() {
// create a partition with 3 chunks
let id_orders = vec![(1, 1), (2, 2), (3, 3)];
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
// (1,1) and (2,2) are contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(1));
ids.insert(ChunkId::new_test(2));
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (3,3) and (2,2) are contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(3));
ids.insert(ChunkId::new_test(2));
let order_range =
RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(3).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (3,3) and (1,1) are NOT contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(3));
ids.insert(ChunkId::new_test(1));
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(3).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
}
#[test]
fn test_contiguous_chunks_three_chunk_with_duplicated_orders() {
// create a partition with 3 chunks
let id_orders = vec![(1, 1), (2, 2), (3, 2)];
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
// (1,1) is contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(1));
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (2,2) and (3,2) are contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(2));
ids.insert(ChunkId::new_test(3));
let order_range =
RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(2).unwrap());
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
// (1,1) and (2,2) are NOT contiguous because there is chunk (3, 2) with the same order 2
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(1));
ids.insert(ChunkId::new_test(2));
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
// (3,2) and (1,1) are NOT contiguous
let mut ids = BTreeSet::new();
ids.insert(ChunkId::new_test(3));
ids.insert(ChunkId::new_test(1));
let order_range =
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
}
fn make_partitition_for_chunks_with_ids_orders(id_orders: &[(u128, u32)]) -> Partition {
let addr = PartitionAddr {
db_name: "d".into(),
table_name: "t".into(),
partition_key: "p".into(),
};
let registry = Arc::new(metric::Registry::new());
let catalog_metrics = Arc::new(CatalogMetrics::new(
Arc::clone(&addr.db_name),
Arc::clone(&registry),
));
let time_provider = Arc::new(time::SystemProvider::new());
let table_metrics = Arc::new(catalog_metrics.new_table_metrics("t"));
let partition_metrics = table_metrics.new_partition_metrics();
// make chunks for given id_orders
let mut partition = Partition::new(addr, partition_metrics, time_provider);
for &(id, order) in id_orders {
partition.create_open_chunk_with_specified_id_order(
make_mb_chunk("t"),
ChunkId::new_test(id),
ChunkOrder::new(order).unwrap(),
);
}
partition
}
fn make_mb_chunk(table_name: &str) -> MBChunk { fn make_mb_chunk(table_name: &str) -> MBChunk {
write_lp_to_new_chunk(&format!("{} bar=1 10", table_name)) write_lp_to_new_chunk(&format!("{} bar=1 10", table_name))
} }

View File

@ -2,8 +2,8 @@
use super::{ use super::{
error::{ error::{
ChunksNotContiguous, ChunksNotInPartition, EmptyChunks, ParquetChunkError, ChunksNotContiguous, ChunksNotInPartition, ChunksNotPersisted, ComparePartitionCheckpoint,
WritingToObjectStore, EmptyChunks, NoCheckpoint, ParquetChunkError, ParquetMetaRead, WritingToObjectStore,
}, },
LockableCatalogChunk, LockableCatalogPartition, Result, LockableCatalogChunk, LockableCatalogPartition, Result,
}; };
@ -34,9 +34,11 @@ use parquet_file::{
use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint}; use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint};
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
use schema::Schema; use schema::Schema;
use snafu::ResultExt; use snafu::{OptionExt, ResultExt};
use std::{ use std::{
cmp::Ordering,
collections::{BTreeSet, HashSet}, collections::{BTreeSet, HashSet},
ops::RangeInclusive,
sync::Arc, sync::Arc,
}; };
use time::Time; use time::Time;
@ -183,19 +185,14 @@ fn mark_chunks_to_compact(
// Mark and snapshot chunks, then drop locks // Mark and snapshot chunks, then drop locks
let mut time_of_first_write = Time::MAX; let mut time_of_first_write = Time::MAX;
let mut time_of_last_write = Time::MIN; let mut time_of_last_write = Time::MIN;
let mut chunk_orders = BTreeSet::new(); let mut chunk_ids = BTreeSet::new();
let mut input_rows = 0; let mut input_rows = 0;
let mut delete_predicates: HashSet<Arc<DeletePredicate>> = HashSet::new(); let mut delete_predicates: HashSet<Arc<DeletePredicate>> = HashSet::new();
let mut min_order = ChunkOrder::MAX; let mut min_order = ChunkOrder::MAX;
let mut max_order = ChunkOrder::MIN;
// initialize checkpoints let mut database_checkpoint = DatabaseCheckpoint::new(Default::default());
let database_checkpoint = DatabaseCheckpoint::new(Default::default()); let mut partition_checkpoint: Option<PartitionCheckpoint> = None;
let partition_checkpoint = PartitionCheckpoint::new(
Arc::clone(&partition_addr.table_name),
Arc::clone(&partition_addr.partition_key),
Default::default(),
Time::from_timestamp_nanos(0),
);
let os_chunks = chunks let os_chunks = chunks
.into_iter() .into_iter()
@ -223,19 +220,51 @@ fn mark_chunks_to_compact(
delete_predicates.extend(chunk.delete_predicates().iter().cloned()); delete_predicates.extend(chunk.delete_predicates().iter().cloned());
min_order = min_order.min(chunk.order()); min_order = min_order.min(chunk.order());
chunk_orders.insert(chunk.order()); max_order = max_order.max(chunk.order());
chunk_ids.insert(chunk.id());
// Todo:get chunk's datatbase_checkpoint and partition_checkpoint of the chunk and keep max // read IoxMetadata from the parquet chunk's metadata
if let Some(parquet_chunk) = chunk.parquet_chunk() {
let iox_parquet_metadata = parquet_chunk.parquet_metadata();
let iox_metadata = iox_parquet_metadata
.decode()
.context(ParquetMetaRead)?
.read_iox_metadata()
.context(ParquetMetaRead)?;
// fold all database_checkpoints into one for the compacting chunk
database_checkpoint.fold(&iox_metadata.database_checkpoint);
// keep max partition_checkpoint for the compacting chunk
if let Some(part_ckpt) = &partition_checkpoint {
let ordering = part_ckpt
.partial_cmp(&iox_metadata.partition_checkpoint)
.context(ComparePartitionCheckpoint)?;
if ordering == Ordering::Less {
partition_checkpoint = Some(iox_metadata.partition_checkpoint);
}
} else {
partition_checkpoint = Some(iox_metadata.partition_checkpoint);
}
} else {
return ChunksNotPersisted {}.fail();
}
// Set chunk in the right action which is compacting object store // Set chunk in the right action which is compacting object store
// This function will also error out if the chunk is not yet persisted // This function will also error out if the chunk is not yet persisted
chunk.set_compacting_object_store(registration)?; chunk.set_compacting_object_store(registration)?;
Ok(DbChunk::snapshot(&*chunk)) Ok(DbChunk::parquet_file_snapshot(&*chunk))
}) })
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
if partition_checkpoint.is_none() {
return NoCheckpoint {}.fail();
}
let partition_checkpoint = partition_checkpoint.unwrap();
// Verify if all the provided chunks are contiguous // Verify if all the provided chunks are contiguous
if !partition.contiguous_object_store_chunks(&chunk_orders) { let order_range = RangeInclusive::new(min_order, max_order);
if !partition.contiguous_chunks(&chunk_ids, &order_range)? {
return ChunksNotContiguous {}.fail(); return ChunksNotContiguous {}.fail();
} }
@ -405,8 +434,9 @@ mod tests {
mark_chunks_to_compact(partition, vec![chunk], &registration); mark_chunks_to_compact(partition, vec![chunk], &registration);
let err = compact_non_persisted_chunks.unwrap_err(); let err = compact_non_persisted_chunks.unwrap_err();
assert!( assert!(
err.to_string().contains("Expected Persisted, got Open"), err.to_string()
"Expected Persisted, got Open" .contains("Cannot compact chunks because at least one is not yet persisted"),
"Cannot compact chunks because at least one is not yet persisted"
); );
} }

View File

@ -71,8 +71,25 @@ pub enum Error {
))] ))]
ChunksNotInPartition {}, ChunksNotInPartition {},
#[snafu(display("Cannot compact chunks because at least one is not yet persisted"))]
ChunksNotPersisted {},
#[snafu(display("Cannot compact the provided persisted chunks. They are not contiguous"))] #[snafu(display("Cannot compact the provided persisted chunks. They are not contiguous"))]
ChunksNotContiguous {}, ChunksNotContiguous {},
#[snafu(display(
"Error reading IOx Metadata from Parquet IoxParquetMetadata: {}",
source
))]
ParquetMetaRead {
source: parquet_file::metadata::Error,
},
#[snafu(display("Cannot compact chunks because of error computing max partition checkpoint"))]
ComparePartitionCheckpoint {},
#[snafu(display("Cannot compact chunks because no checkpoint was computed"))]
NoCheckpoint {},
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;