Merge pull request #8069 from influxdata/savage/use-u64-for-sequence-number
refactor(ingester): Use unsigned sequence number, remove its `Sqlx::Type`pull/24376/head
commit
5815df5e6d
|
@ -160,33 +160,32 @@ impl std::fmt::Display for TableId {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A sequence number from a `router::Shard` (kafka partition)
|
/// A sequence number from an ingester
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||||
#[sqlx(transparent)]
|
pub struct SequenceNumber(u64);
|
||||||
pub struct SequenceNumber(i64);
|
|
||||||
|
|
||||||
#[allow(missing_docs)]
|
#[allow(missing_docs)]
|
||||||
impl SequenceNumber {
|
impl SequenceNumber {
|
||||||
pub fn new(v: i64) -> Self {
|
pub fn new(v: u64) -> Self {
|
||||||
Self(v)
|
Self(v)
|
||||||
}
|
}
|
||||||
pub fn get(&self) -> i64 {
|
pub fn get(&self) -> u64 {
|
||||||
self.0
|
self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Add<i64> for SequenceNumber {
|
impl Add<u64> for SequenceNumber {
|
||||||
type Output = Self;
|
type Output = Self;
|
||||||
|
|
||||||
fn add(self, other: i64) -> Self {
|
fn add(self, other: u64) -> Self {
|
||||||
Self(self.0 + other)
|
Self(self.0 + other)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Sub<i64> for SequenceNumber {
|
impl Sub<u64> for SequenceNumber {
|
||||||
type Output = Self;
|
type Output = Self;
|
||||||
|
|
||||||
fn sub(self, other: i64) -> Self {
|
fn sub(self, other: u64) -> Self {
|
||||||
Self(self.0 - other)
|
Self(self.0 - other)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -207,18 +207,18 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_intersect() {
|
fn test_intersect() {
|
||||||
let a = [0, i64::MAX, 40, 41, 42, 43, 44, 45]
|
let a = [0, u64::MAX, 40, 41, 42, 43, 44, 45]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(SequenceNumber::new)
|
.map(SequenceNumber::new)
|
||||||
.collect::<SequenceNumberSet>();
|
.collect::<SequenceNumberSet>();
|
||||||
|
|
||||||
let b = [1, 5, i64::MAX, 42]
|
let b = [1, 5, u64::MAX, 42]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(SequenceNumber::new)
|
.map(SequenceNumber::new)
|
||||||
.collect::<SequenceNumberSet>();
|
.collect::<SequenceNumberSet>();
|
||||||
|
|
||||||
let intersection = intersect(&a, &b);
|
let intersection = intersect(&a, &b);
|
||||||
let want = [i64::MAX, 42]
|
let want = [u64::MAX, 42]
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(SequenceNumber::new)
|
.map(SequenceNumber::new)
|
||||||
.collect::<SequenceNumberSet>();
|
.collect::<SequenceNumberSet>();
|
||||||
|
@ -226,21 +226,17 @@ mod tests {
|
||||||
assert_eq!(intersection, want);
|
assert_eq!(intersection, want);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Yield vec's of [`SequenceNumber`] derived from u64 values and cast to
|
/// Yield vec's of [`SequenceNumber`] derived from u64 values.
|
||||||
/// i64.
|
|
||||||
///
|
///
|
||||||
/// This matches how the ingester allocates [`SequenceNumber`] - from a u64
|
/// This matches how the ingester allocates [`SequenceNumber`] - from a u64
|
||||||
/// source.
|
/// source.
|
||||||
fn sequence_number_vec() -> impl Strategy<Value = Vec<SequenceNumber>> {
|
fn sequence_number_vec() -> impl Strategy<Value = Vec<SequenceNumber>> {
|
||||||
prop::collection::vec(0..u64::MAX, 0..1024).prop_map(|vec| {
|
prop::collection::vec(0..u64::MAX, 0..1024)
|
||||||
vec.into_iter()
|
.prop_map(|vec| vec.into_iter().map(SequenceNumber::new).collect())
|
||||||
.map(|v| SequenceNumber::new(v as i64))
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// The following tests compare to an order-independent HashSet, as the
|
// The following tests compare to an order-independent HashSet, as the
|
||||||
// SequenceNumber uses the PartialOrd impl of the inner i64 for ordering,
|
// SequenceNumber uses the PartialOrd impl of the inner u64 for ordering,
|
||||||
// resulting in incorrect output when compared to an ordered set of cast as
|
// resulting in incorrect output when compared to an ordered set of cast as
|
||||||
// u64.
|
// u64.
|
||||||
//
|
//
|
||||||
|
|
|
@ -377,9 +377,7 @@ where
|
||||||
// ingester, but they are only used for internal ordering of operations at
|
// ingester, but they are only used for internal ordering of operations at
|
||||||
// runtime.
|
// runtime.
|
||||||
let timestamp = Arc::new(TimestampOracle::new(
|
let timestamp = Arc::new(TimestampOracle::new(
|
||||||
max_sequence_number
|
max_sequence_number.map(|v| v.get()).unwrap_or(0),
|
||||||
.map(|v| u64::try_from(v.get()).expect("sequence number overflow"))
|
|
||||||
.unwrap_or(0),
|
|
||||||
));
|
));
|
||||||
|
|
||||||
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||||
|
|
|
@ -199,9 +199,7 @@ where
|
||||||
op,
|
op,
|
||||||
} = op;
|
} = op;
|
||||||
|
|
||||||
let sequence_number = SequenceNumber::new(
|
let sequence_number = SequenceNumber::new(sequence_number);
|
||||||
i64::try_from(sequence_number).expect("sequence number overflow"),
|
|
||||||
);
|
|
||||||
|
|
||||||
max_sequence = max_sequence.max(Some(sequence_number));
|
max_sequence = max_sequence.max(Some(sequence_number));
|
||||||
|
|
||||||
|
|
|
@ -280,7 +280,7 @@ pub(crate) fn make_write_op(
|
||||||
namespace_id: NamespaceId,
|
namespace_id: NamespaceId,
|
||||||
table_name: &str,
|
table_name: &str,
|
||||||
table_id: TableId,
|
table_id: TableId,
|
||||||
sequence_number: i64,
|
sequence_number: u64,
|
||||||
lines: &str,
|
lines: &str,
|
||||||
span_ctx: Option<SpanContext>,
|
span_ctx: Option<SpanContext>,
|
||||||
) -> WriteOperation {
|
) -> WriteOperation {
|
||||||
|
|
|
@ -32,7 +32,7 @@ impl TimestampOracle {
|
||||||
// or diverge between threads.
|
// or diverge between threads.
|
||||||
let v = self.0.fetch_add(1, Ordering::Relaxed);
|
let v = self.0.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
SequenceNumber::new(v as i64)
|
SequenceNumber::new(v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,6 +106,6 @@ mod tests {
|
||||||
timestamps
|
timestamps
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(expected)
|
.zip(expected)
|
||||||
.for_each(|(got, want)| assert_eq!(got, want as i64));
|
.for_each(|(got, want)| assert_eq!(got, want as u64));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,7 +248,7 @@ mod tests {
|
||||||
/// Return a [`SequenceNumberSet`] containing `vals`.
|
/// Return a [`SequenceNumberSet`] containing `vals`.
|
||||||
fn new_set<T>(vals: T) -> SequenceNumberSet
|
fn new_set<T>(vals: T) -> SequenceNumberSet
|
||||||
where
|
where
|
||||||
T: IntoIterator<Item = i64>,
|
T: IntoIterator<Item = u64>,
|
||||||
{
|
{
|
||||||
vals.into_iter().map(SequenceNumber::new).collect()
|
vals.into_iter().map(SequenceNumber::new).collect()
|
||||||
}
|
}
|
||||||
|
@ -257,7 +257,7 @@ mod tests {
|
||||||
/// [`SequenceNumberSet`] values.
|
/// [`SequenceNumberSet`] values.
|
||||||
fn new_note<T>(vals: T) -> Arc<CompletedPersist>
|
fn new_note<T>(vals: T) -> Arc<CompletedPersist>
|
||||||
where
|
where
|
||||||
T: IntoIterator<Item = i64>,
|
T: IntoIterator<Item = u64>,
|
||||||
{
|
{
|
||||||
Arc::new(CompletedPersist::new(
|
Arc::new(CompletedPersist::new(
|
||||||
ParquetFileParams {
|
ParquetFileParams {
|
||||||
|
|
|
@ -105,10 +105,7 @@ impl WalAppender for Arc<wal::Wal> {
|
||||||
let partition_sequence_numbers = w
|
let partition_sequence_numbers = w
|
||||||
.tables()
|
.tables()
|
||||||
.map(|(table_id, data)| {
|
.map(|(table_id, data)| {
|
||||||
(
|
(*table_id, data.partitioned_data().sequence_number().get())
|
||||||
*table_id,
|
|
||||||
data.partitioned_data().sequence_number().get() as u64,
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
.collect::<HashMap<TableId, u64>>();
|
.collect::<HashMap<TableId, u64>>();
|
||||||
(
|
(
|
||||||
|
|
|
@ -242,7 +242,7 @@ where
|
||||||
namespace: &str,
|
namespace: &str,
|
||||||
lp: &str,
|
lp: &str,
|
||||||
partition_key: PartitionKey,
|
partition_key: PartitionKey,
|
||||||
sequence_number: i64,
|
sequence_number: u64,
|
||||||
) {
|
) {
|
||||||
// Resolve the namespace ID needed to construct the DML op
|
// Resolve the namespace ID needed to construct the DML op
|
||||||
let namespace_id = self.namespace_id(namespace).await;
|
let namespace_id = self.namespace_id(namespace).await;
|
||||||
|
|
Loading…
Reference in New Issue