refactor: grouping overlaps now uses the same overlap function in both compactor and deduplication (#4420)

* refactor: grouping overlaps is now use the same overlap function in both compactor and deduplication

* chore: commit missing file

* chore: address review comments

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-04-25 16:32:51 -04:00 committed by GitHub
parent e3caf24954
commit 0a440bb638
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 112 additions and 58 deletions

View File

@ -20,6 +20,7 @@ use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64Histogra
use object_store::DynObjectStore;
use observability_deps::tracing::{debug, info, warn};
use parquet_file::metadata::{IoxMetadata, IoxParquetMetaData};
use query::provider::overlap::group_potential_duplicates;
use query::{
exec::{Executor, ExecutorType},
frontend::reorg::ReorgPlanner,
@ -868,60 +869,30 @@ impl Compactor {
fn overlapped_groups(
parquet_files: Vec<ParquetFileWithMetadata>,
) -> Vec<GroupWithMinTimeAndSize> {
let num_files = parquet_files.len();
let mut grouper = Vec::with_capacity(num_files * 2);
// group overlap files
let overlapped_groups =
group_potential_duplicates(parquet_files).expect("Error grouping overlapped chunks");
enum StartEnd<I> {
Start,
End(I),
}
struct GrouperRecord<I, V: PartialOrd> {
value: V,
start_end: StartEnd<I>,
}
for file in parquet_files {
grouper.push(GrouperRecord {
value: file.min_time,
start_end: StartEnd::Start,
});
grouper.push(GrouperRecord {
value: file.max_time,
start_end: StartEnd::End(file),
});
}
grouper.sort_by_key(|gr| gr.value);
let mut cumulative_sum = 0;
let mut groups: Vec<GroupWithMinTimeAndSize> = Vec::with_capacity(num_files);
for gr in grouper {
cumulative_sum += match gr.start_end {
StartEnd::Start => 1,
StartEnd::End(_) => -1,
// Compute min time and total size for each overlapped group
let mut groups_with_min_time_and_size = Vec::with_capacity(overlapped_groups.len());
for group in overlapped_groups {
let mut group_with_min_time_and_size = GroupWithMinTimeAndSize {
parquet_files: Vec::with_capacity(group.len()),
min_time: Timestamp::new(i64::MAX),
total_file_size_bytes: 0,
};
if matches!(gr.start_end, StartEnd::Start) && cumulative_sum == 1 {
let group = GroupWithMinTimeAndSize {
parquet_files: Vec::with_capacity(num_files),
min_time: Timestamp::new(i64::MAX),
total_file_size_bytes: 0,
};
groups.push(group); //Vec::with_capacity(num_files));
for file in group {
group_with_min_time_and_size.min_time =
group_with_min_time_and_size.min_time.min(file.min_time);
group_with_min_time_and_size.total_file_size_bytes += file.file_size_bytes;
group_with_min_time_and_size.parquet_files.push(file);
}
if let StartEnd::End(item) = gr.start_end {
let group = groups
.last_mut()
.expect("a start should have pushed at least one empty group");
group.min_time = group.min_time.min(item.min_time);
group.total_file_size_bytes += item.file_size_bytes;
group.parquet_files.push(item);
}
groups_with_min_time_and_size.push(group_with_min_time_and_size);
}
groups
groups_with_min_time_and_size
}
// Compute time to split data

View File

@ -286,6 +286,35 @@ where
}
}
/// Implement ChunkMeta for Arc<dyn QueryChunk>
impl QueryChunkMeta for Arc<dyn QueryChunk> {
fn summary(&self) -> Option<&TableSummary> {
self.as_ref().summary()
}
fn schema(&self) -> Arc<Schema> {
self.as_ref().schema()
}
fn partition_id(&self) -> Option<PartitionId> {
self.as_ref().partition_id()
}
fn sort_key(&self) -> Option<&SortKey> {
self.as_ref().sort_key()
}
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
let pred = self.as_ref().delete_predicates();
debug!(?pred, "Delete predicate in QueryChunkMeta");
pred
}
fn timestamp_min_max(&self) -> Option<TimestampMinMax> {
self.as_ref().timestamp_min_max()
}
}
/// return true if all the chunks inlcude statistics
pub fn chunks_have_stats(chunks: &[Arc<dyn QueryChunk>]) -> bool {
// If at least one of the provided chunk cannot provide stats,

View File

@ -34,7 +34,7 @@ use snafu::{ResultExt, Snafu};
mod adapter;
mod deduplicate;
mod overlap;
pub mod overlap;
mod physical;
use self::overlap::{group_potential_duplicates, group_potential_duplicates_og};
pub(crate) use deduplicate::DeduplicateExec;

View File

@ -4,12 +4,16 @@
//! DataModel have been written in via multiple distinct line protocol
//! writes (and thus are stored in separate rows)
use data_types::partition_metadata::{ColumnSummary, StatOverlap, Statistics};
use schema::TIME_COLUMN_NAME;
use data_types::{
partition_metadata::{ColumnSummary, StatOverlap, Statistics},
timestamp::TimestampMinMax,
};
use data_types2::{DeletePredicate, ParquetFileWithMetadata, PartitionId, TableSummary};
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use snafu::Snafu;
use std::{cmp::Ordering, sync::Arc};
use crate::QueryChunk;
use crate::{QueryChunk, QueryChunkMeta};
#[derive(Debug, Snafu)]
pub enum Error {
@ -107,11 +111,45 @@ pub fn group_potential_duplicates_og(
Ok(groups)
}
/// Groups [`QueryChunk`] objects into disjoint sets of overlapped time range.
// Implement QueryChunkMeta for ParquetFileWithMetadata to have group_potential_duplicates
// work on ParquetFileWithMetadata. Since group_potential_duplicates only needs 2 functions
// partition_id and timestamp_min_max, other functions are left `umimplemneted` on purpose
impl QueryChunkMeta for ParquetFileWithMetadata {
fn summary(&self) -> Option<&TableSummary> {
unimplemented!()
}
fn schema(&self) -> Arc<Schema> {
unimplemented!()
}
fn partition_id(&self) -> Option<PartitionId> {
Some(self.partition_id)
}
fn sort_key(&self) -> Option<&SortKey> {
unimplemented!()
}
fn timestamp_min_max(&self) -> Option<TimestampMinMax> {
Some(TimestampMinMax {
min: self.min_time.get(),
max: self.max_time.get(),
})
}
/// return a reference to delete predicates of the chunk
fn delete_predicates(&self) -> &[Arc<DeletePredicate>] {
unimplemented!()
}
}
/// Groups [`QueryChunkMeta`] objects into disjoint sets of overlapped time range.
/// Does not preserve or guarantee any ordering.
pub fn group_potential_duplicates(
chunks: Vec<Arc<dyn QueryChunk>>,
) -> Result<Vec<Vec<Arc<dyn QueryChunk>>>> {
pub fn group_potential_duplicates<T>(chunks: Vec<T>) -> Result<Vec<Vec<T>>>
where
T: QueryChunkMeta,
{
// If at least one of the chunks has no time range,
// all chunks are considered to overlap with each other.
if chunks.iter().any(|c| c.timestamp_min_max().is_none()) {
@ -415,6 +453,7 @@ mod test {
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec!["Group 0: [chunk1, chunk3, chunk2, chunk4]"];
assert_groups_eq!(expected, groups);
@ -429,6 +468,7 @@ mod test {
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec!["Group 0: [chunk1, chunk2, chunk3, chunk4]"];
assert_groups_eq!(expected, groups);
@ -443,6 +483,7 @@ mod test {
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec!["Group 0: [chunk1, chunk2]", "Group 1: [chunk3, chunk4]"];
assert_groups_eq!(expected, groups);
@ -457,6 +498,7 @@ mod test {
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let groups = group_potential_duplicates(vec![c1, c4, c3, c2]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec![
"Group 0: [chunk1, chunk2]",
@ -472,6 +514,7 @@ mod test {
let c1 = Arc::new(TestChunk::new("chunk1").with_timestamp_min_max(1, 10));
let groups = group_potential_duplicates(vec![c1]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec!["Group 0: [chunk1]"];
assert_groups_eq!(expected, groups);
@ -479,7 +522,8 @@ mod test {
#[test]
fn overlap_no_groups() {
let groups = group_potential_duplicates(vec![]).expect("grouping succeeded");
let groups: Vec<Vec<TestChunk>> =
group_potential_duplicates(vec![]).expect("grouping succeeded");
assert!(groups.is_empty());
}
@ -501,6 +545,7 @@ mod test {
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let groups = group_potential_duplicates(vec![c1, c2, c3, c4]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec!["Group 0: [chunk1, chunk3, chunk2, chunk4]"];
assert_groups_eq!(expected, groups);
@ -516,6 +561,7 @@ mod test {
);
let groups = group_potential_duplicates(vec![c1]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec!["Group 0: [chunk1]"];
assert_groups_eq!(expected, groups);
@ -538,6 +584,7 @@ mod test {
let c4 = Arc::new(TestChunk::new("chunk4").with_timestamp_min_max(25, 35));
let groups = group_potential_duplicates(vec![c1, c4, c3, c2]).expect("grouping succeeded");
let groups = to_group_query_chunks(groups);
let expected = vec![
"Group 0: [chunk1, chunk2]",
@ -959,7 +1006,6 @@ mod test {
}
// --- Test infrastructure --
fn to_string(groups: Vec<Vec<Arc<dyn QueryChunk>>>) -> Vec<String> {
let mut s = vec![];
for (idx, group) in groups.iter().enumerate() {
@ -968,4 +1014,12 @@ mod test {
}
s
}
// convert from Vec<Vec<Arc<TestChunk>>> to Vec<Vec<Arc<dyn QueryChunk>>>
fn to_group_query_chunks(groups: Vec<Vec<Arc<TestChunk>>>) -> Vec<Vec<Arc<dyn QueryChunk>>> {
groups
.into_iter()
.map(|chunks| chunks.into_iter().map(|c| c as _).collect::<Vec<_>>())
.collect::<Vec<_>>()
}
}