fix: correct dedupe of strftime values
This fixes the root cause of influxdata/idpe#17765; the code was performing a "is this the last value you saw" check by comparing it to the last generated partition key which is not the same thing - a cache hit would not generate a new key, and therefore would not return the correct answer after. The end result is that for a subset of writes with a problematic sequence of timestamps would cause the wrong partition key to be assigned. Because all users are using the default YYYY-MM-DD partitioning scheme, the impact was relatively low, as most of the time that partition key had the same YYYY-MM-DD representation as the last.pull/24376/head
parent
df88b542f1
commit
2acbaefa18
|
@ -347,6 +347,39 @@ mod tests {
|
|||
StdRng::seed_from_u64(seed)
|
||||
}
|
||||
|
||||
/// Reproducer for https://github.com/influxdata/idpe/issues/17765
|
||||
#[test]
|
||||
fn test_equals_last() {
|
||||
let ts = [
|
||||
1686756903736785920, // last_eq=false, render, set last_ptr
|
||||
42, // last_eq=false, render, set last_ptr
|
||||
1686756903736785920, // last_eq=false, re-use, don't change last_ptr
|
||||
1686756903736785920, // last_eq=false, re-use, don't change last_ptr
|
||||
42, // last_eq=true (wrong), re-use
|
||||
];
|
||||
|
||||
let mut batch = MutableBatch::new();
|
||||
let mut writer = Writer::new(&mut batch, ts.len());
|
||||
|
||||
writer.write_time("time", ts.into_iter()).unwrap();
|
||||
writer.commit();
|
||||
|
||||
let keys =
|
||||
generate_denormalised_keys(&batch, TablePartitionTemplateOverride::default().parts())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
keys,
|
||||
&[
|
||||
"2023-06-14",
|
||||
"1970-01-01",
|
||||
"2023-06-14",
|
||||
"2023-06-14",
|
||||
"1970-01-01",
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Generates a vector of partition key strings, or an error.
|
||||
///
|
||||
/// This function normalises the de-duplicated output of
|
||||
|
|
|
@ -74,11 +74,6 @@ where
|
|||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return the last wrote value, if any.
|
||||
fn last(&self) -> Option<&'_ T> {
|
||||
self.buf[self.last_idx].as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// A strftime-like formatter of epoch timestamps with nanosecond granularity.
|
||||
|
@ -147,6 +142,14 @@ pub(super) struct StrftimeFormatter<'a> {
|
|||
/// A set of 5 most recently added timestamps, and the formatted string they
|
||||
/// map to.
|
||||
values: RingBuffer<5, (i64, String)>,
|
||||
|
||||
/// The last observed timestamp.
|
||||
///
|
||||
/// This value changes each time a timestamp is returned to the user, either
|
||||
/// from the cache of pre-generated strings, or by generating a new one and
|
||||
/// MUST always track the last timestamp given to
|
||||
/// [`StrftimeFormatter::render()`].
|
||||
last_ts: Option<i64>,
|
||||
}
|
||||
|
||||
impl<'a> StrftimeFormatter<'a> {
|
||||
|
@ -170,6 +173,7 @@ impl<'a> StrftimeFormatter<'a> {
|
|||
format: StrftimeItems::new(format),
|
||||
is_ymd_format: is_default_format,
|
||||
values: RingBuffer::default(),
|
||||
last_ts: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -182,6 +186,9 @@ impl<'a> StrftimeFormatter<'a> {
|
|||
// Optionally apply the default format reduction optimisation.
|
||||
let timestamp = self.maybe_reduce(timestamp);
|
||||
|
||||
// Retain this timestamp as the last observed timestamp.
|
||||
self.last_ts = Some(timestamp);
|
||||
|
||||
// Check if this timestamp has already been rendered.
|
||||
if let Some(v) = self.values.find(|(t, _v)| *t == timestamp) {
|
||||
// It has! Re-use the existing formatted string.
|
||||
|
@ -239,10 +246,7 @@ impl<'a> StrftimeFormatter<'a> {
|
|||
// Optionally apply the default format reduction optimisation.
|
||||
let timestamp = self.maybe_reduce(timestamp);
|
||||
|
||||
self.values
|
||||
.last()
|
||||
.map(|(ts, _)| *ts == timestamp)
|
||||
.unwrap_or_default()
|
||||
self.last_ts.map(|v| v == timestamp).unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -302,17 +306,6 @@ mod tests {
|
|||
assert_eq!(fmt.values.last_idx, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ring_buffer_equals_last() {
|
||||
let mut b = RingBuffer::<4, _>::default();
|
||||
|
||||
assert!(b.find(|v| *v == 42).is_none());
|
||||
|
||||
*b.next_slot() = 42;
|
||||
|
||||
assert_eq!(b.last(), Some(&42));
|
||||
}
|
||||
|
||||
const FORMATTER_SPEC_PARTS: &[&str] = &[
|
||||
"%Y", "%m", "%d", "%H", "%m", "%.9f", "%r", "%+", "%t", "%n", "%A", "%c",
|
||||
];
|
||||
|
|
Loading…
Reference in New Issue