chore: more readability improvements to sort keys (#5366)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
07729bf153
commit
b834bc630c
|
@ -67,7 +67,7 @@ impl ReorgPlanner {
|
|||
///
|
||||
/// The plan looks like:
|
||||
///
|
||||
/// (Sort on output_sort)
|
||||
/// (Sort on output_sort_key)
|
||||
/// (Scan chunks) <-- any needed deduplication happens here
|
||||
pub fn compact_plan<I>(
|
||||
&self,
|
||||
|
|
|
@ -463,17 +463,22 @@ impl Deduplicater {
|
|||
debug!("output_sort_key is not provided for building deduplicate plan");
|
||||
}
|
||||
|
||||
// This sort key is only used when the chunks are not sorted and output_sort_key is not provided
|
||||
// This sort key is only used when the chunks are not
|
||||
// sorted and output_sort_key is not provided
|
||||
let dedup_sort_key_for_unsorted_chunks =
|
||||
compute_sort_key_for_chunks(&pk_schema, chunks.as_ref());
|
||||
|
||||
// Go over overlapped set, build deduplicate plan for each vector of overlapped chunks
|
||||
// Build a plan for each overlapped set of chunks which may have
|
||||
// duplicated keys in any of the chunks of the set
|
||||
for overlapped_chunks in self.overlapped_chunks_set.iter().cloned() {
|
||||
let chunks_dedup_sort_key = Self::chunks_dedup_sort_key(
|
||||
&output_sort_key,
|
||||
&overlapped_chunks,
|
||||
&dedup_sort_key_for_unsorted_chunks,
|
||||
);
|
||||
// Find a common sort key to use to deduplicate this overlapped set
|
||||
let chunks_dedup_sort_key =
|
||||
output_sort_key.as_ref().cloned().unwrap_or_else(|| {
|
||||
Self::chunks_dedup_sort_key(
|
||||
&overlapped_chunks,
|
||||
&dedup_sort_key_for_unsorted_chunks,
|
||||
)
|
||||
});
|
||||
|
||||
debug!(
|
||||
?chunks_dedup_sort_key,
|
||||
|
@ -493,14 +498,16 @@ impl Deduplicater {
|
|||
)?);
|
||||
}
|
||||
|
||||
// Go over each in_chunk_duplicates_chunks, build deduplicate plan for each
|
||||
// Build a plan for each chunk which may have duplicates,
|
||||
// but only within itself (not with any other chunk)
|
||||
for chunk_with_duplicates in self.in_chunk_duplicates_chunks.iter().cloned() {
|
||||
// Set dedup sort key
|
||||
let chunk_dedup_sort_key = Self::chunks_dedup_sort_key(
|
||||
&output_sort_key,
|
||||
&vec![Arc::clone(&chunk_with_duplicates)],
|
||||
&dedup_sort_key_for_unsorted_chunks,
|
||||
);
|
||||
// Find the sort key to use to deduplicate this chunk
|
||||
let chunk_dedup_sort_key = output_sort_key.as_ref().cloned().unwrap_or_else(|| {
|
||||
Self::chunks_dedup_sort_key(
|
||||
&vec![Arc::clone(&chunk_with_duplicates)],
|
||||
&dedup_sort_key_for_unsorted_chunks,
|
||||
)
|
||||
});
|
||||
|
||||
debug!(
|
||||
?chunk_dedup_sort_key,
|
||||
|
@ -520,7 +527,8 @@ impl Deduplicater {
|
|||
)?);
|
||||
}
|
||||
|
||||
// Go over non_duplicates_chunks, build a plan for it
|
||||
// Build a plan for each chunk that has no duplicates:
|
||||
// neither with any other chunk or within itself
|
||||
if !self.no_duplicates_chunks.is_empty() {
|
||||
debug!(
|
||||
?output_sort_key,
|
||||
|
@ -565,40 +573,41 @@ impl Deduplicater {
|
|||
Ok(plan)
|
||||
}
|
||||
|
||||
// Return sort key for overlapped chunks
|
||||
/// Return a single sort key that can be used to deduplicate the values within all `chunks`.
|
||||
///
|
||||
/// The returned sort key will cover the primary key of all
|
||||
/// `chunks`.
|
||||
///
|
||||
/// In addition, the sort key is chosen so that is "compatible"
|
||||
/// with each chunk's existing sort key if posible. Comatible
|
||||
/// means that the chunks can be deduplicated without
|
||||
/// re-sorting. If this is not possible., `dedup_sort_key` is
|
||||
/// returned.
|
||||
fn chunks_dedup_sort_key(
|
||||
output_sort_key: &Option<SortKey>,
|
||||
chunks: &Vec<Arc<dyn QueryChunk>>,
|
||||
dedup_sort_key_for_unsorted_chunks: &SortKey,
|
||||
) -> SortKey {
|
||||
let chunks_dedup_sort_key = match &output_sort_key {
|
||||
// Use output _sort_key if provided
|
||||
Some(sort_key) => sort_key.clone(),
|
||||
None => {
|
||||
// Use the chunk's partition key if they were persisted chunks
|
||||
if let Some(sort_key) = if chunks.len() == 1 {
|
||||
chunks[0].sort_key()
|
||||
} else {
|
||||
Self::sort_key_of_overlapped_chunks(chunks)
|
||||
} {
|
||||
sort_key.clone()
|
||||
} else {
|
||||
// This happens either:
|
||||
// . In the Ingester to compact ingesting data that is not sorted and not
|
||||
// deduplicated yet
|
||||
// . In the Querier that also includes data sent from Ingester that is also
|
||||
// not yet sorted.
|
||||
// Note: Data sent from Ingester is already deduplicated but if it
|
||||
// overlaps with other chunks, it may include duplicated data with those
|
||||
// chunks
|
||||
debug!(
|
||||
"Sort key is computed during planning for deduplicating overlapped chunks."
|
||||
);
|
||||
dedup_sort_key_for_unsorted_chunks.clone()
|
||||
}
|
||||
}
|
||||
// Use the chunk's sort key if they were persisted chunks
|
||||
let sort_key = if chunks.len() == 1 {
|
||||
chunks[0].sort_key()
|
||||
} else {
|
||||
Self::sort_key_of_overlapped_chunks(chunks)
|
||||
};
|
||||
chunks_dedup_sort_key
|
||||
|
||||
if let Some(sort_key) = sort_key {
|
||||
sort_key.clone()
|
||||
} else {
|
||||
// This happens either:
|
||||
// . In the Ingester to compact ingesting data that is not sorted and not
|
||||
// deduplicated yet
|
||||
// . In the Querier that also includes data sent from Ingester that is also
|
||||
// not yet sorted.
|
||||
// Note: Data sent from Ingester is already deduplicated but if it
|
||||
// overlaps with other chunks, it may include duplicated data with those
|
||||
// chunks
|
||||
debug!("Sort key is computed during planning for deduplicating overlapped chunks.");
|
||||
dedup_sort_key_for_unsorted_chunks.clone()
|
||||
}
|
||||
}
|
||||
|
||||
// Return sort key of overlapped chunks
|
||||
|
|
|
@ -1017,7 +1017,6 @@ impl QueryChunkMeta for IngesterChunk {
|
|||
}
|
||||
|
||||
fn sort_key(&self) -> Option<&SortKey> {
|
||||
//Some(&self.sort_key)
|
||||
// Data is not sorted
|
||||
None
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue