Merge branch 'main' into er/feat/grpc_binary_tags
commit
976bccfafe
|
@ -22,8 +22,6 @@ use crate::{
|
||||||
fixture_broken_catalog, wait_for_exact_chunk_states, DatabaseBuilder,
|
fixture_broken_catalog, wait_for_exact_chunk_states, DatabaseBuilder,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use chrono::{DateTime, Utc};
|
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tonic::Code;
|
use tonic::Code;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
@ -1455,74 +1453,6 @@ async fn test_unload_read_buffer() {
|
||||||
assert_eq!(chunks[0].storage, storage);
|
assert_eq!(chunks[0].storage, storage);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_chunk_access_time() {
|
|
||||||
let fixture = ServerFixture::create_shared(ServerType::Database).await;
|
|
||||||
let mut write_client = fixture.write_client();
|
|
||||||
let mut management_client = fixture.management_client();
|
|
||||||
let mut flight_client = fixture.flight_client();
|
|
||||||
|
|
||||||
let db_name = rand_name();
|
|
||||||
DatabaseBuilder::new(db_name.clone())
|
|
||||||
.build(fixture.grpc_channel())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
write_client
|
|
||||||
.write_lp(&db_name, "cpu foo=1 10", 0)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let to_datetime = |a: Option<&generated_types::google::protobuf::Timestamp>| -> DateTime<Utc> {
|
|
||||||
a.unwrap().clone().try_into().unwrap()
|
|
||||||
};
|
|
||||||
|
|
||||||
let chunks = management_client.list_chunks(&db_name).await.unwrap();
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
let t0 = to_datetime(chunks[0].time_of_last_access.as_ref());
|
|
||||||
|
|
||||||
flight_client
|
|
||||||
.perform_query(&db_name, "select * from cpu;")
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let chunks = management_client.list_chunks(&db_name).await.unwrap();
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
let t1 = to_datetime(chunks[0].time_of_last_access.as_ref());
|
|
||||||
|
|
||||||
flight_client
|
|
||||||
.perform_query(&db_name, "select * from cpu;")
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let chunks = management_client.list_chunks(&db_name).await.unwrap();
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
let t2 = to_datetime(chunks[0].time_of_last_access.as_ref());
|
|
||||||
|
|
||||||
write_client
|
|
||||||
.write_lp(&db_name, "cpu foo=1 20", 0)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let chunks = management_client.list_chunks(&db_name).await.unwrap();
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
let t3 = to_datetime(chunks[0].time_of_last_access.as_ref());
|
|
||||||
|
|
||||||
// This chunk should be pruned out and therefore not accessed by the query
|
|
||||||
flight_client
|
|
||||||
.perform_query(&db_name, "select * from cpu where foo = 2;")
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let chunks = management_client.list_chunks(&db_name).await.unwrap();
|
|
||||||
assert_eq!(chunks.len(), 1);
|
|
||||||
let t4 = to_datetime(chunks[0].time_of_last_access.as_ref());
|
|
||||||
|
|
||||||
assert!(t0 < t1, "{} {}", t0, t1);
|
|
||||||
assert!(t1 < t2, "{} {}", t1, t2);
|
|
||||||
assert!(t2 < t3, "{} {}", t2, t3);
|
|
||||||
assert_eq!(t3, t4)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_drop_partition() {
|
async fn test_drop_partition() {
|
||||||
use data_types::chunk_metadata::ChunkStorage;
|
use data_types::chunk_metadata::ChunkStorage;
|
||||||
|
|
|
@ -127,10 +127,10 @@ where
|
||||||
loop {
|
loop {
|
||||||
let buffer_size = self.db.buffer_size();
|
let buffer_size = self.db.buffer_size();
|
||||||
if buffer_size < soft_limit {
|
if buffer_size < soft_limit {
|
||||||
info!(%db_name, buffer_size, %soft_limit, "memory use under soft limit");
|
trace!(%db_name, buffer_size, %soft_limit, "memory use under soft limit");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
info!(%db_name, buffer_size, %soft_limit, "memory use over soft limit");
|
trace!(%db_name, buffer_size, %soft_limit, "memory use over soft limit");
|
||||||
|
|
||||||
match candidates.next() {
|
match candidates.next() {
|
||||||
Some(candidate) => {
|
Some(candidate) => {
|
||||||
|
@ -139,7 +139,7 @@ where
|
||||||
Some(chunk) => {
|
Some(chunk) => {
|
||||||
let chunk = chunk.read();
|
let chunk = chunk.read();
|
||||||
if chunk.lifecycle_action().is_some() {
|
if chunk.lifecycle_action().is_some() {
|
||||||
info!(
|
debug!(
|
||||||
%db_name,
|
%db_name,
|
||||||
chunk_id=%candidate.chunk_id.get(),
|
chunk_id=%candidate.chunk_id.get(),
|
||||||
%partition,
|
%partition,
|
||||||
|
@ -183,7 +183,7 @@ where
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => info!(
|
None => debug!(
|
||||||
%db_name,
|
%db_name,
|
||||||
chunk_id=%candidate.chunk_id.get(),
|
chunk_id=%candidate.chunk_id.get(),
|
||||||
%partition,
|
%partition,
|
||||||
|
@ -192,7 +192,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
warn!(%db_name, soft_limit, buffer_size,
|
debug!(%db_name, soft_limit, buffer_size,
|
||||||
"soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules");
|
"soft limited exceeded, but no chunks found that can be evicted. Check lifecycle rules");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -363,9 +363,9 @@ where
|
||||||
"considering for persistence");
|
"considering for persistence");
|
||||||
|
|
||||||
if persistable_row_count >= rules.persist_row_threshold.get() {
|
if persistable_row_count >= rules.persist_row_threshold.get() {
|
||||||
info!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold");
|
debug!(%db_name, %partition, persistable_row_count, "persisting partition as exceeds row threshold");
|
||||||
} else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() as u64 {
|
} else if persistable_age_seconds >= rules.persist_age_threshold_seconds.get() as u64 {
|
||||||
info!(%db_name, %partition, persistable_age_seconds, "persisting partition as exceeds age threshold");
|
debug!(%db_name, %partition, persistable_age_seconds, "persisting partition as exceeds age threshold");
|
||||||
} else {
|
} else {
|
||||||
trace!(%db_name, %partition, persistable_row_count, "partition not eligible for persist");
|
trace!(%db_name, %partition, persistable_row_count, "partition not eligible for persist");
|
||||||
return false;
|
return false;
|
||||||
|
@ -659,7 +659,7 @@ where
|
||||||
// see if we should stall subsequent pull it is
|
// see if we should stall subsequent pull it is
|
||||||
// preventing us from persisting
|
// preventing us from persisting
|
||||||
let stall = action.metadata() == &ChunkLifecycleAction::Compacting;
|
let stall = action.metadata() == &ChunkLifecycleAction::Compacting;
|
||||||
info!(?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action");
|
debug!(?action, chunk=%chunk.addr(), "Chunk to persist has outstanding action");
|
||||||
|
|
||||||
// NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`.
|
// NOTE: This early exit also ensures that we are not "jumping" over chunks sorted by `order`.
|
||||||
return Err(stall);
|
return Err(stall);
|
||||||
|
|
|
@ -3697,6 +3697,56 @@ mod tests {
|
||||||
assert_batches_sorted_eq!(&expected, &batches);
|
assert_batches_sorted_eq!(&expected, &batches);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn chunk_times() {
|
||||||
|
let t0 = Time::from_timestamp(11, 22);
|
||||||
|
let time = Arc::new(time::MockProvider::new(t0));
|
||||||
|
let db = TestDb::builder()
|
||||||
|
.time_provider(Arc::<time::MockProvider>::clone(&time))
|
||||||
|
.build()
|
||||||
|
.await
|
||||||
|
.db;
|
||||||
|
|
||||||
|
write_lp(db.as_ref(), "cpu foo=1 10").await;
|
||||||
|
|
||||||
|
let chunks = db.chunk_summaries().unwrap();
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||||
|
assert_eq!(chunks[0].time_of_last_write, t0);
|
||||||
|
assert_eq!(chunks[0].time_of_last_access.unwrap(), t0);
|
||||||
|
|
||||||
|
let t1 = time.inc(Duration::from_secs(1));
|
||||||
|
|
||||||
|
run_query(Arc::clone(&db), "select * from cpu").await;
|
||||||
|
|
||||||
|
let chunks = db.chunk_summaries().unwrap();
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||||
|
assert_eq!(chunks[0].time_of_last_write, t0);
|
||||||
|
assert_eq!(chunks[0].time_of_last_access.unwrap(), t1);
|
||||||
|
|
||||||
|
let t2 = time.inc(Duration::from_secs(1));
|
||||||
|
|
||||||
|
write_lp(db.as_ref(), "cpu foo=1 20").await;
|
||||||
|
|
||||||
|
let chunks = db.chunk_summaries().unwrap();
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||||
|
assert_eq!(chunks[0].time_of_last_write, t2);
|
||||||
|
assert_eq!(chunks[0].time_of_last_access.unwrap(), t2);
|
||||||
|
|
||||||
|
time.inc(Duration::from_secs(1));
|
||||||
|
|
||||||
|
// This chunk should be pruned out and therefore not accessed by the query
|
||||||
|
run_query(Arc::clone(&db), "select * from cpu where foo = 2;").await;
|
||||||
|
|
||||||
|
let chunks = db.chunk_summaries().unwrap();
|
||||||
|
assert_eq!(chunks.len(), 1);
|
||||||
|
assert_eq!(chunks[0].time_of_first_write, t0);
|
||||||
|
assert_eq!(chunks[0].time_of_last_write, t2);
|
||||||
|
assert_eq!(chunks[0].time_of_last_access.unwrap(), t2);
|
||||||
|
}
|
||||||
|
|
||||||
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, ChunkId) {
|
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, ChunkId) {
|
||||||
write_lp(db, "cpu bar=1 10").await;
|
write_lp(db, "cpu bar=1 10").await;
|
||||||
let partition_key = "1970-01-01T00";
|
let partition_key = "1970-01-01T00";
|
||||||
|
|
|
@ -732,6 +732,16 @@ impl CatalogChunk {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return parquet chunk if it is persisted
|
||||||
|
pub fn parquet_chunk(&self) -> Option<&Arc<ParquetChunk>> {
|
||||||
|
match &self.stage {
|
||||||
|
ChunkStage::Persisted {
|
||||||
|
meta: _, parquet, ..
|
||||||
|
} => Some(parquet),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Set the persisted chunk to be compacting
|
/// Set the persisted chunk to be compacting
|
||||||
pub fn set_compacting_object_store(&mut self, registration: &TaskRegistration) -> Result<()> {
|
pub fn set_compacting_object_store(&mut self, registration: &TaskRegistration) -> Result<()> {
|
||||||
match &self.stage {
|
match &self.stage {
|
||||||
|
|
|
@ -17,6 +17,7 @@ use snafu::{OptionExt, Snafu};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::{BTreeMap, BTreeSet},
|
||||||
fmt::Display,
|
fmt::Display,
|
||||||
|
ops::RangeInclusive,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use time::{Time, TimeProvider};
|
use time::{Time, TimeProvider};
|
||||||
|
@ -39,6 +40,9 @@ pub enum Error {
|
||||||
|
|
||||||
#[snafu(display("creating new mutable buffer chunk failed: {}", source))]
|
#[snafu(display("creating new mutable buffer chunk failed: {}", source))]
|
||||||
CreateOpenChunk { source: ChunkError },
|
CreateOpenChunk { source: ChunkError },
|
||||||
|
|
||||||
|
#[snafu(display("checking chunk contiguous fails"))]
|
||||||
|
ContiguousCheck {},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
@ -211,11 +215,20 @@ impl Partition {
|
||||||
&mut self,
|
&mut self,
|
||||||
chunk: mutable_buffer::MBChunk,
|
chunk: mutable_buffer::MBChunk,
|
||||||
) -> &Arc<RwLock<CatalogChunk>> {
|
) -> &Arc<RwLock<CatalogChunk>> {
|
||||||
assert_eq!(chunk.table_name().as_ref(), self.table_name());
|
|
||||||
|
|
||||||
let chunk_id = ChunkId::new();
|
let chunk_id = ChunkId::new();
|
||||||
let chunk_order = self.next_chunk_order();
|
let chunk_order = self.next_chunk_order();
|
||||||
|
|
||||||
|
self.create_open_chunk_with_specified_id_order(chunk, chunk_id, chunk_order)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_open_chunk_with_specified_id_order(
|
||||||
|
&mut self,
|
||||||
|
chunk: mutable_buffer::MBChunk,
|
||||||
|
chunk_id: ChunkId,
|
||||||
|
chunk_order: ChunkOrder,
|
||||||
|
) -> &Arc<RwLock<CatalogChunk>> {
|
||||||
|
assert_eq!(chunk.table_name().as_ref(), self.table_name());
|
||||||
|
|
||||||
let addr = ChunkAddr::new(&self.addr, chunk_id);
|
let addr = ChunkAddr::new(&self.addr, chunk_id);
|
||||||
|
|
||||||
let chunk = CatalogChunk::new_open(
|
let chunk = CatalogChunk::new_open(
|
||||||
|
@ -374,31 +387,24 @@ impl Partition {
|
||||||
|
|
||||||
/// Return true if there are no other persisted chunks that are in the middle of
|
/// Return true if there are no other persisted chunks that are in the middle of
|
||||||
/// the provided chunk orders
|
/// the provided chunk orders
|
||||||
// NGA todo: There is test_compact_os_non_contiguous_chunks in
|
pub fn contiguous_chunks(
|
||||||
// compact_object_store.rs to test this but I will add more unit tests right here
|
&self,
|
||||||
// when PR #3167 ChunkGenerator is merged
|
chunk_ids: &BTreeSet<ChunkId>,
|
||||||
pub fn contiguous_object_store_chunks(&self, chunk_orders: &BTreeSet<ChunkOrder>) -> bool {
|
chunk_orders: &RangeInclusive<ChunkOrder>,
|
||||||
// Last order in the chunk_orders for comparison
|
) -> Result<bool> {
|
||||||
let last_order_element = chunk_orders.iter().rev().next();
|
if chunk_ids.is_empty() {
|
||||||
let last_order = match last_order_element {
|
return Ok(true);
|
||||||
Some(last_order) => last_order,
|
}
|
||||||
None => {
|
let chunk_keys = self.keyed_chunks();
|
||||||
return true;
|
for (id, order, ..) in chunk_keys {
|
||||||
} // provided chunk_orders is empty
|
// this chunk's order is in the middle of the given orders but does
|
||||||
};
|
// not belong to their chunks
|
||||||
|
if chunk_orders.contains(&order) && !chunk_ids.contains(&id) {
|
||||||
let chunks = self.chunks();
|
return Ok(false);
|
||||||
for chunk in chunks {
|
|
||||||
let chunk = chunk.read();
|
|
||||||
if chunk.is_persisted() {
|
|
||||||
let order = chunk.order();
|
|
||||||
// this chunk does not belong to chunk_orders but in the middle of them
|
|
||||||
if !chunk_orders.contains(&order) && order < *last_order {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
true
|
|
||||||
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a PartitionSummary for this partition. If the partition
|
/// Return a PartitionSummary for this partition. If the partition
|
||||||
|
@ -511,6 +517,145 @@ mod tests {
|
||||||
assert_eq!(ids, expected_ids);
|
assert_eq!(ids, expected_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_contiguous_chunks_empty() {
|
||||||
|
// create a partition without chunks
|
||||||
|
let id_orders = vec![];
|
||||||
|
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
|
||||||
|
|
||||||
|
let ids = BTreeSet::new();
|
||||||
|
let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN);
|
||||||
|
|
||||||
|
// contiguous
|
||||||
|
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_contiguous_chunks_one_chunk() {
|
||||||
|
// create a partition with one chunk
|
||||||
|
let id_orders = vec![(1, 1)];
|
||||||
|
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
|
||||||
|
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
let order_range = RangeInclusive::new(ChunkOrder::MIN, ChunkOrder::MIN);
|
||||||
|
|
||||||
|
// no chunks provided
|
||||||
|
// --> contiguous
|
||||||
|
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
|
||||||
|
// provide itself
|
||||||
|
// --> contiguous
|
||||||
|
ids.insert(ChunkId::new_test(1));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
|
||||||
|
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
|
||||||
|
// provide same order, different id
|
||||||
|
// --> not contiguous
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(2));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
|
||||||
|
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_contiguous_chunks_three_chunk() {
|
||||||
|
// create a partition with 3 chunks
|
||||||
|
let id_orders = vec![(1, 1), (2, 2), (3, 3)];
|
||||||
|
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
|
||||||
|
|
||||||
|
// (1,1) and (2,2) are contiguous
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(1));
|
||||||
|
ids.insert(ChunkId::new_test(2));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
|
||||||
|
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
|
||||||
|
// (3,3) and (2,2) are contiguous
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(3));
|
||||||
|
ids.insert(ChunkId::new_test(2));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(3).unwrap());
|
||||||
|
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
|
||||||
|
// (3,3) and (1,1) are NOT contiguous
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(3));
|
||||||
|
ids.insert(ChunkId::new_test(1));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(3).unwrap());
|
||||||
|
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_contiguous_chunks_three_chunk_with_duplicated_orders() {
|
||||||
|
// create a partition with 3 chunks
|
||||||
|
let id_orders = vec![(1, 1), (2, 2), (3, 2)];
|
||||||
|
let partition = make_partitition_for_chunks_with_ids_orders(&id_orders);
|
||||||
|
|
||||||
|
// (1,1) is contiguous
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(1));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(1).unwrap());
|
||||||
|
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
|
||||||
|
// (2,2) and (3,2) are contiguous
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(2));
|
||||||
|
ids.insert(ChunkId::new_test(3));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(2).unwrap(), ChunkOrder::new(2).unwrap());
|
||||||
|
assert!(partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
|
||||||
|
// (1,1) and (2,2) are NOT contiguous because there is chunk (3, 2) with the same order 2
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(1));
|
||||||
|
ids.insert(ChunkId::new_test(2));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
|
||||||
|
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
|
||||||
|
// (3,2) and (1,1) are NOT contiguous
|
||||||
|
let mut ids = BTreeSet::new();
|
||||||
|
ids.insert(ChunkId::new_test(3));
|
||||||
|
ids.insert(ChunkId::new_test(1));
|
||||||
|
let order_range =
|
||||||
|
RangeInclusive::new(ChunkOrder::new(1).unwrap(), ChunkOrder::new(2).unwrap());
|
||||||
|
assert!(!partition.contiguous_chunks(&ids, &order_range).unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_partitition_for_chunks_with_ids_orders(id_orders: &[(u128, u32)]) -> Partition {
|
||||||
|
let addr = PartitionAddr {
|
||||||
|
db_name: "d".into(),
|
||||||
|
table_name: "t".into(),
|
||||||
|
partition_key: "p".into(),
|
||||||
|
};
|
||||||
|
let registry = Arc::new(metric::Registry::new());
|
||||||
|
let catalog_metrics = Arc::new(CatalogMetrics::new(
|
||||||
|
Arc::clone(&addr.db_name),
|
||||||
|
Arc::clone(®istry),
|
||||||
|
));
|
||||||
|
let time_provider = Arc::new(time::SystemProvider::new());
|
||||||
|
let table_metrics = Arc::new(catalog_metrics.new_table_metrics("t"));
|
||||||
|
let partition_metrics = table_metrics.new_partition_metrics();
|
||||||
|
|
||||||
|
// make chunks for given id_orders
|
||||||
|
let mut partition = Partition::new(addr, partition_metrics, time_provider);
|
||||||
|
for &(id, order) in id_orders {
|
||||||
|
partition.create_open_chunk_with_specified_id_order(
|
||||||
|
make_mb_chunk("t"),
|
||||||
|
ChunkId::new_test(id),
|
||||||
|
ChunkOrder::new(order).unwrap(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
partition
|
||||||
|
}
|
||||||
|
|
||||||
fn make_mb_chunk(table_name: &str) -> MBChunk {
|
fn make_mb_chunk(table_name: &str) -> MBChunk {
|
||||||
write_lp_to_new_chunk(&format!("{} bar=1 10", table_name))
|
write_lp_to_new_chunk(&format!("{} bar=1 10", table_name))
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,8 @@
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
error::{
|
error::{
|
||||||
ChunksNotContiguous, ChunksNotInPartition, EmptyChunks, ParquetChunkError,
|
ChunksNotContiguous, ChunksNotInPartition, ChunksNotPersisted, ComparePartitionCheckpoint,
|
||||||
WritingToObjectStore,
|
EmptyChunks, NoCheckpoint, ParquetChunkError, ParquetMetaRead, WritingToObjectStore,
|
||||||
},
|
},
|
||||||
LockableCatalogChunk, LockableCatalogPartition, Result,
|
LockableCatalogChunk, LockableCatalogPartition, Result,
|
||||||
};
|
};
|
||||||
|
@ -34,9 +34,11 @@ use parquet_file::{
|
||||||
use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint};
|
use persistence_windows::checkpoint::{DatabaseCheckpoint, PartitionCheckpoint};
|
||||||
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
|
use query::{compute_sort_key, exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
|
||||||
use schema::Schema;
|
use schema::Schema;
|
||||||
use snafu::ResultExt;
|
use snafu::{OptionExt, ResultExt};
|
||||||
use std::{
|
use std::{
|
||||||
|
cmp::Ordering,
|
||||||
collections::{BTreeSet, HashSet},
|
collections::{BTreeSet, HashSet},
|
||||||
|
ops::RangeInclusive,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use time::Time;
|
use time::Time;
|
||||||
|
@ -183,19 +185,14 @@ fn mark_chunks_to_compact(
|
||||||
// Mark and snapshot chunks, then drop locks
|
// Mark and snapshot chunks, then drop locks
|
||||||
let mut time_of_first_write = Time::MAX;
|
let mut time_of_first_write = Time::MAX;
|
||||||
let mut time_of_last_write = Time::MIN;
|
let mut time_of_last_write = Time::MIN;
|
||||||
let mut chunk_orders = BTreeSet::new();
|
let mut chunk_ids = BTreeSet::new();
|
||||||
let mut input_rows = 0;
|
let mut input_rows = 0;
|
||||||
let mut delete_predicates: HashSet<Arc<DeletePredicate>> = HashSet::new();
|
let mut delete_predicates: HashSet<Arc<DeletePredicate>> = HashSet::new();
|
||||||
let mut min_order = ChunkOrder::MAX;
|
let mut min_order = ChunkOrder::MAX;
|
||||||
|
let mut max_order = ChunkOrder::MIN;
|
||||||
|
|
||||||
// initialize checkpoints
|
let mut database_checkpoint = DatabaseCheckpoint::new(Default::default());
|
||||||
let database_checkpoint = DatabaseCheckpoint::new(Default::default());
|
let mut partition_checkpoint: Option<PartitionCheckpoint> = None;
|
||||||
let partition_checkpoint = PartitionCheckpoint::new(
|
|
||||||
Arc::clone(&partition_addr.table_name),
|
|
||||||
Arc::clone(&partition_addr.partition_key),
|
|
||||||
Default::default(),
|
|
||||||
Time::from_timestamp_nanos(0),
|
|
||||||
);
|
|
||||||
|
|
||||||
let os_chunks = chunks
|
let os_chunks = chunks
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -223,19 +220,51 @@ fn mark_chunks_to_compact(
|
||||||
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
|
delete_predicates.extend(chunk.delete_predicates().iter().cloned());
|
||||||
|
|
||||||
min_order = min_order.min(chunk.order());
|
min_order = min_order.min(chunk.order());
|
||||||
chunk_orders.insert(chunk.order());
|
max_order = max_order.max(chunk.order());
|
||||||
|
chunk_ids.insert(chunk.id());
|
||||||
|
|
||||||
// Todo:get chunk's datatbase_checkpoint and partition_checkpoint of the chunk and keep max
|
// read IoxMetadata from the parquet chunk's metadata
|
||||||
|
if let Some(parquet_chunk) = chunk.parquet_chunk() {
|
||||||
|
let iox_parquet_metadata = parquet_chunk.parquet_metadata();
|
||||||
|
let iox_metadata = iox_parquet_metadata
|
||||||
|
.decode()
|
||||||
|
.context(ParquetMetaRead)?
|
||||||
|
.read_iox_metadata()
|
||||||
|
.context(ParquetMetaRead)?;
|
||||||
|
|
||||||
|
// fold all database_checkpoints into one for the compacting chunk
|
||||||
|
database_checkpoint.fold(&iox_metadata.database_checkpoint);
|
||||||
|
|
||||||
|
// keep max partition_checkpoint for the compacting chunk
|
||||||
|
if let Some(part_ckpt) = &partition_checkpoint {
|
||||||
|
let ordering = part_ckpt
|
||||||
|
.partial_cmp(&iox_metadata.partition_checkpoint)
|
||||||
|
.context(ComparePartitionCheckpoint)?;
|
||||||
|
if ordering == Ordering::Less {
|
||||||
|
partition_checkpoint = Some(iox_metadata.partition_checkpoint);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
partition_checkpoint = Some(iox_metadata.partition_checkpoint);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return ChunksNotPersisted {}.fail();
|
||||||
|
}
|
||||||
|
|
||||||
// Set chunk in the right action which is compacting object store
|
// Set chunk in the right action which is compacting object store
|
||||||
// This function will also error out if the chunk is not yet persisted
|
// This function will also error out if the chunk is not yet persisted
|
||||||
chunk.set_compacting_object_store(registration)?;
|
chunk.set_compacting_object_store(registration)?;
|
||||||
Ok(DbChunk::snapshot(&*chunk))
|
Ok(DbChunk::parquet_file_snapshot(&*chunk))
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>>>()?;
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
if partition_checkpoint.is_none() {
|
||||||
|
return NoCheckpoint {}.fail();
|
||||||
|
}
|
||||||
|
let partition_checkpoint = partition_checkpoint.unwrap();
|
||||||
|
|
||||||
// Verify if all the provided chunks are contiguous
|
// Verify if all the provided chunks are contiguous
|
||||||
if !partition.contiguous_object_store_chunks(&chunk_orders) {
|
let order_range = RangeInclusive::new(min_order, max_order);
|
||||||
|
if !partition.contiguous_chunks(&chunk_ids, &order_range)? {
|
||||||
return ChunksNotContiguous {}.fail();
|
return ChunksNotContiguous {}.fail();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -405,8 +434,9 @@ mod tests {
|
||||||
mark_chunks_to_compact(partition, vec![chunk], ®istration);
|
mark_chunks_to_compact(partition, vec![chunk], ®istration);
|
||||||
let err = compact_non_persisted_chunks.unwrap_err();
|
let err = compact_non_persisted_chunks.unwrap_err();
|
||||||
assert!(
|
assert!(
|
||||||
err.to_string().contains("Expected Persisted, got Open"),
|
err.to_string()
|
||||||
"Expected Persisted, got Open"
|
.contains("Cannot compact chunks because at least one is not yet persisted"),
|
||||||
|
"Cannot compact chunks because at least one is not yet persisted"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,8 +71,25 @@ pub enum Error {
|
||||||
))]
|
))]
|
||||||
ChunksNotInPartition {},
|
ChunksNotInPartition {},
|
||||||
|
|
||||||
|
#[snafu(display("Cannot compact chunks because at least one is not yet persisted"))]
|
||||||
|
ChunksNotPersisted {},
|
||||||
|
|
||||||
#[snafu(display("Cannot compact the provided persisted chunks. They are not contiguous"))]
|
#[snafu(display("Cannot compact the provided persisted chunks. They are not contiguous"))]
|
||||||
ChunksNotContiguous {},
|
ChunksNotContiguous {},
|
||||||
|
|
||||||
|
#[snafu(display(
|
||||||
|
"Error reading IOx Metadata from Parquet IoxParquetMetadata: {}",
|
||||||
|
source
|
||||||
|
))]
|
||||||
|
ParquetMetaRead {
|
||||||
|
source: parquet_file::metadata::Error,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Cannot compact chunks because of error computing max partition checkpoint"))]
|
||||||
|
ComparePartitionCheckpoint {},
|
||||||
|
|
||||||
|
#[snafu(display("Cannot compact chunks because no checkpoint was computed"))]
|
||||||
|
NoCheckpoint {},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
Loading…
Reference in New Issue