fix: make sure the QueryableParquetChunks are always sorted correctly (#4163)

* fix: make sure the chunks are always sorted correctly

* fix: output

* chore: Apply suggestions from code review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* refactor: make new function for new chunk id

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-03-29 17:36:45 -04:00 committed by GitHub
parent 2b76c31157
commit bfd5568acf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 9 deletions

View File

@ -866,7 +866,7 @@ impl Compactor {
mod tests {
use super::*;
use arrow_util::assert_batches_sorted_eq;
use data_types2::{KafkaPartition, NamespaceId, ParquetFileParams, SequenceNumber};
use data_types2::{ChunkId, KafkaPartition, NamespaceId, ParquetFileParams, SequenceNumber};
use iox_tests::util::TestCatalog;
use object_store::ObjectStoreTestConvenience;
use querier::{
@ -1516,6 +1516,77 @@ mod tests {
}
}
#[tokio::test]
async fn test_sort_queryable_parquet_chunk() {
let catalog = TestCatalog::new();
let lp1 = vec![
"table,tag1=WA field_int=1000 8000",
"table,tag1=VT field_int=10 10000",
"table,tag1=UT field_int=70 20000",
]
.join("\n");
let lp2 = vec![
"table,tag1=WA field_int=1500 28000",
"table,tag1=UT field_int=270 35000",
]
.join("\n");
let ns = catalog.create_namespace("ns").await;
let sequencer = ns.create_sequencer(1).await;
let table = ns.create_table("table").await;
let partition = table
.with_sequencer(&sequencer)
.create_partition("part")
.await;
// 2 files with same min_sequence_number
let pf1 = partition
.create_parquet_file_with_min_max(&lp1, 1, 5, 8000, 20000)
.await
.parquet_file
.clone();
let pf2 = partition
.create_parquet_file_with_min_max(&lp2, 1, 5, 28000, 35000)
.await
.parquet_file
.clone();
// Build 2 QueryableParquetChunks
let pt1 = ParquetFileWithTombstone {
data: Arc::new(pf1),
tombstones: vec![],
};
let pt2 = ParquetFileWithTombstone {
data: Arc::new(pf2),
tombstones: vec![],
};
let pc1 = pt1.to_queryable_parquet_chunk(
Arc::clone(&catalog.object_store),
table.table.name.clone(),
partition.partition.partition_key.clone(),
);
let pc2 = pt2.to_queryable_parquet_chunk(
Arc::clone(&catalog.object_store),
table.table.name.clone(),
partition.partition.partition_key.clone(),
);
// Vector of chunks
let mut chunks = vec![pc2, pc1];
// must same order/min_sequnce_number
assert_eq!(chunks[0].order(), chunks[1].order());
// different id/min_time
assert_eq!(chunks[0].id(), ChunkId::new_test(28000));
assert_eq!(chunks[1].id(), ChunkId::new_test(8000));
// Sort the chunk per order(min_sequnce_number) and id (min_time)
chunks.sort_unstable_by_key(|c| (c.order(), c.id()));
// now the location of the chunk in the vector is reversed
assert_eq!(chunks[0].id(), ChunkId::new_test(8000));
assert_eq!(chunks[1].id(), ChunkId::new_test(28000));
}
#[test]
fn test_level_upgradable() {
let time = Arc::new(SystemProvider::new());

View File

@ -110,15 +110,22 @@ impl QueryChunkMeta for QueryableParquetChunk {
}
impl QueryChunk for QueryableParquetChunk {
// Todo: This function should not be used in this NG chunk context
// For now, since we also use scan for both OG and NG, the chunk id
// is used as second key in build_deduplicate_plan_for_overlapped_chunks
// to sort the chunk to deduplicate them correctly.
// Since we make the first key, order, always different, it is fine
// to have the second key the sames and always 0
// In NG, this function is needed to distinguish the ParquetChunks further if they happen to have the same creation order.
// Ref: chunks.sort_unstable_by_key(|c| (c.order(), c.id())); in provider.rs
// Note: The order of this QueryableParquetChunk is the parquet file's min_sequence_number which
// will be the same for parquet files of splitted compacted data.
//
// This function returns the parquet file's min_time which will be always different for the parquet files of
// same order/min_sequence_number and is good to order the parquet file
//
// Note: parquet_file's id is an uuid which is also the datatype of the ChunkId. However,
// it is not safe to use it for sorting chunk
fn id(&self) -> ChunkId {
// always return id 0 for debugging mode and with reason above
ChunkId::new_test(0)
let timestamp_nano = self.iox_metadata.time_of_first_write.timestamp_nanos();
let timestamp_nano_u128 =
u128::try_from(timestamp_nano).expect("Cannot convert timestamp nano to u128 ");
ChunkId::new_id_for_ng(timestamp_nano_u128)
}
// This function should not be used in this context

View File

@ -238,6 +238,12 @@ impl ChunkId {
Self(Uuid::from_u128(id))
}
/// NG's chunk id is only effective in case the chunk's order is the same
/// with another chunk. Hence collisions are safe in that context
pub fn new_id_for_ng(id: u128) -> Self {
Self(Uuid::from_u128(id))
}
/// Get inner UUID.
pub fn get(&self) -> Uuid {
self.0