Merge branch 'main' into dom/timestamps

pull/24376/head
Dom 2022-06-17 14:45:27 +01:00 committed by GitHub
commit a8c9638e89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 12 additions and 24 deletions

View File

@ -257,18 +257,11 @@ impl QueryChunk for QueryableParquetChunk {
// Order of the chunk so they can be deduplicate correctly
fn order(&self) -> ChunkOrder {
let seq_num = self.min_sequence_number.get();
let seq_num_u32 = u32::try_from(seq_num).unwrap_or_else(|_| {
ChunkOrder::new(seq_num).unwrap_or_else(|| {
panic!(
"Error converting i64 sequence number {} to u32 for partition {}",
"Error converting sequence number {} to ChunkOrder for partition {}",
seq_num, self.partition_id
);
});
ChunkOrder::new(seq_num_u32).unwrap_or_else(|| {
panic!(
"Error converting u32 sequence number {} to ChunkOrder for partition {}",
seq_num_u32, self.partition_id
);
})
}
}

View File

@ -24,7 +24,7 @@ use std::{
convert::TryFrom,
fmt::{Display, Write},
mem::{self, size_of_val},
num::{FpCategory, NonZeroU32, NonZeroU64},
num::{FpCategory, NonZeroU64},
ops::{Add, Deref, RangeInclusive, Sub},
sync::Arc,
};
@ -1077,16 +1077,15 @@ impl From<Uuid> for ChunkId {
/// 1. **upsert order:** chunks with higher order overwrite data in chunks with lower order
/// 2. **locking order:** chunks must be locked in consistent (ascending) order
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ChunkOrder(NonZeroU32);
pub struct ChunkOrder(i64);
impl ChunkOrder {
/// The minimum ordering value a chunk could have. Currently only used in testing.
// TODO: remove `unsafe` once https://github.com/rust-lang/rust/issues/51999 is fixed
pub const MIN: Self = Self(unsafe { NonZeroU32::new_unchecked(1) });
pub const MIN: Self = Self(0);
/// Create a ChunkOrder from the given value.
pub fn new(order: u32) -> Option<Self> {
NonZeroU32::new(order).map(Self)
pub fn new(order: i64) -> Option<Self> {
Some(Self(order))
}
}

View File

@ -353,10 +353,8 @@ impl ChunkAdapter {
let chunk_id = ChunkId::from(Uuid::from_u128(parquet_file_id.get() as _));
let table_name = self.catalog_cache.table().name(table_id).await?;
// Somewhat hacky workaround because of implicit chunk orders, use min sequence number and
// hope it doesn't overflow u32. Order is non-zero, se we need to add 1.
let order = ChunkOrder::new(1 + decoded_parquet_file.min_sequence_number().get() as u32)
.expect("cannot be zero");
let order = ChunkOrder::new(decoded_parquet_file.min_sequence_number().get())
.expect("Error converting min sequence number to chunk order");
// Read partition sort key
let partition_sort_key = self
@ -408,10 +406,8 @@ impl ChunkAdapter {
)
.await;
// Somewhat hacky workaround because of implicit chunk orders, use min sequence number and
// hope it doesn't overflow u32. Order is non-zero, se we need to add 1.
let order = ChunkOrder::new(1 + decoded_parquet_file.min_sequence_number().get() as u32)
.expect("cannot be zero");
let order = ChunkOrder::new(decoded_parquet_file.min_sequence_number().get())
.expect("Error converting min sequence number to chunk order");
// Read partition sort key
let partition_sort_key = self

View File

@ -778,7 +778,7 @@ impl QueryChunk for IngesterPartition {
fn order(&self) -> ChunkOrder {
// since this is always the 'most recent' chunk for this
// partition, put it at the end
ChunkOrder::new(u32::MAX).unwrap()
ChunkOrder::new(i64::MAX).unwrap()
}
}