revert: "Merge pull request #7953 from influxdata/dom/partition-key-dedupe"

This reverts commit 5bce4477b7, reversing
changes made to 64fa17b3be.
pull/24376/head
Dom Dwyer 2023-06-14 15:55:33 +02:00
parent 335d9f7357
commit 3c0388fdea
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
2 changed files with 54 additions and 279 deletions

View File

@ -28,7 +28,7 @@ use self::strftime::StrftimeFormatter;
/// An error generating a partition key for a row.
#[allow(missing_copy_implementations)]
#[derive(Debug, Error, PartialEq, Eq, Clone)]
#[derive(Debug, Error, PartialEq, Eq)]
pub enum PartitionKeyError {
/// The partition template defines a [`Template::TimeFormat`] part, but the
/// provided strftime formatter is invalid.
@ -75,7 +75,7 @@ pub fn partition_batch<'a>(
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum Template<'a> {
TagValue(&'a Column, Option<i32>),
TagValue(&'a Column),
TimeFormat(&'a [i64], StrftimeFormatter<'a>),
/// This batch is missing a partitioning tag column.
@ -90,67 +90,26 @@ impl<'a> Template<'a> {
idx: usize,
) -> Result<(), PartitionKeyError> {
match self {
Template::TagValue(col, last_key) if col.valid.get(idx) => match &col.data {
ColumnData::Tag(col_data, dictionary, _) => {
let this_key = col_data[idx];
// Update the "is identical" tracking key for this new,
// potentially different key.
*last_key = Some(this_key);
out.write_str(
encode_key_part(dictionary.lookup_id(this_key).unwrap()).as_ref(),
)?
}
Template::TagValue(col) if col.valid.get(idx) => match &col.data {
ColumnData::Tag(col_data, dictionary, _) => out.write_str(never_empty(
Cow::from(utf8_percent_encode(
dictionary.lookup_id(col_data[idx]).unwrap(),
&ENCODED_PARTITION_KEY_CHARS,
))
.as_ref(),
))?,
_ => return Err(PartitionKeyError::TagValueNotTag(col.influx_type())),
},
Template::TimeFormat(t, fmt) => fmt.render(t[idx], out)?,
// Either a tag that has no value for this given row index, or the
// batch does not contain this tag at all.
Template::TagValue(_, last_key) => {
// This row doesn't have a tag value, which should be carried
// forwards to be checked against the next row.
*last_key = None;
Template::TagValue(_) | Template::MissingTag => {
out.write_str(PARTITION_KEY_VALUE_NULL_STR)?
}
Template::MissingTag => out.write_str(PARTITION_KEY_VALUE_NULL_STR)?,
}
Ok(())
}
/// Returns true if the partition key generated by `self` for `idx` will be
/// identical to the last generated key.
fn is_identical(&self, idx: usize) -> bool {
match self {
Template::TagValue(col, last_key) if col.valid.get(idx) => match &col.data {
ColumnData::Tag(col_data, _, _) => {
let this_key = col_data[idx];
// Check if the dictionary key matches the last dictionary
// key, indicating the same value is going to be rendered.
last_key.map(|v| v == this_key).unwrap_or_default()
}
// This is an error, but for the purposes of identical checks,
// it is treated as not identical, causing the error to be
// raised when formatting is attempted.
_ => false,
},
Template::TimeFormat(t, fmt) => {
// Check if the last value matches the current value, after
// optionally applying the precision reduction optimisation.
fmt.equals_last(t[idx])
}
// The last row did not contain this key, and neither does this.
Template::TagValue(_, None) => true,
// The last row did contain a key, but this one does not (therefore
// it differs).
Template::TagValue(_, Some(_)) => false,
// The batch does not contain this tag at all - it always matches
// with the previous row.
Template::MissingTag => true,
}
}
}
fn encode_key_part(s: &str) -> Cow<'_, str> {
@ -198,15 +157,11 @@ fn encode_key_part(s: &str) -> Cow<'_, str> {
}
}
/// Returns an iterator of partition keys for the given table batch.
///
/// This function performs deduplication on returned keys; the returned iterator
/// yields [`Some`] containing the partition key string when a new key is
/// generated, and [`None`] when the generated key would equal the last key.
/// Returns an iterator of partition keys for the given table batch
fn partition_keys<'a>(
batch: &'a MutableBatch,
template_parts: impl Iterator<Item = TemplatePart<'a>>,
) -> impl Iterator<Item = Option<Result<String, PartitionKeyError>>> + 'a {
) -> impl Iterator<Item = Result<String, PartitionKeyError>> + 'a {
// Extract the timestamp data.
let time = match batch.column(TIME_COLUMN_NAME).map(|v| &v.data) {
Ok(ColumnData::I64(data, _)) => data.as_slice(),
@ -219,7 +174,7 @@ fn partition_keys<'a>(
.map(|v| match v {
TemplatePart::TagValue(col_name) => batch
.column(col_name)
.map_or_else(|_| Template::MissingTag, |v| Template::TagValue(v, None)),
.map_or_else(|_| Template::MissingTag, Template::TagValue),
TemplatePart::TimeFormat(fmt) => {
Template::TimeFormat(time, StrftimeFormatter::new(fmt))
}
@ -235,74 +190,38 @@ fn partition_keys<'a>(
// is temporarily allocated until the resulting string is shrunk down.
let mut last_len = 5;
// The first row in a batch must always be evaluated to produce a key.
//
// Row 0 is guaranteed to exist, otherwise attempting to read the time
// column above would have caused a panic (no rows -> no time column).
let first = std::iter::once(Some(evaluate_template(&mut template, &mut last_len, 0)));
// The subsequent rows in a batch may generate the same key, and therefore a
// dedupe check is used before allocating & populating the partition key.
let rest = (1..batch.row_count).map(move |idx| {
// Check if this partition key is going to be different from the
// last, short-circuiting the check if it is.
if template.iter().all(|t| t.is_identical(idx)) {
return None;
}
Some(evaluate_template(&mut template, &mut last_len, idx))
});
first.chain(rest)
}
/// Evaluate the partition template against the row indexed by `idx`.
///
/// # Panics
///
/// This method panics if `idx` exceeds the number of rows in the batch.
fn evaluate_template(
template: &mut [Template<'_>],
last_len: &mut usize,
idx: usize,
) -> Result<String, PartitionKeyError> {
let mut buf = String::with_capacity(*last_len);
let template_len = template.len();
// Yield a partition key string for each row in `batch`
(0..batch.row_count).map(move |idx| {
let mut string = String::with_capacity(last_len);
// Evaluate each template part for this row
let template_len = template.len();
for (col_idx, col) in template.iter_mut().enumerate() {
// Evaluate the formatter for this template part against the row.
col.fmt_row(&mut buf, idx)?;
col.fmt_row(&mut string, idx)?;
// If this isn't the last element in the template, insert a field
// delimiter.
if col_idx + 1 != template_len {
buf.push(PARTITION_KEY_DELIMITER);
string.push(PARTITION_KEY_DELIMITER);
}
}
*last_len = buf.len();
Ok(buf)
last_len = string.len();
string.shrink_to_fit();
Ok(string)
})
}
/// Takes an iterator of [`Option`] and merges identical consecutive elements
/// together.
///
/// Any [`None`] yielded by `iterator` is added to the range for the previous
/// [`Some`].
fn range_encode<I, T>(mut iterator: I) -> impl Iterator<Item = (T, Range<usize>)>
/// Takes an iterator and merges consecutive elements together
fn range_encode<I>(mut iterator: I) -> impl Iterator<Item = (I::Item, Range<usize>)>
where
I: Iterator<Item = Option<T>>,
T: Eq,
I: Iterator,
I::Item: Eq,
{
let mut last: Option<I::Item> = None;
let mut range: Range<usize> = 0..0;
std::iter::from_fn(move || loop {
match (iterator.next(), last.take()) {
(Some(None), Some(v)) => {
range.end += 1;
last = Some(v);
}
(Some(cur), Some(next)) => match cur == next {
true => {
range.end += 1;
@ -313,14 +232,14 @@ where
range.start = range.end;
range.end += 1;
last = Some(cur);
return Some((next.unwrap(), t));
return Some((next, t));
}
},
(Some(cur), None) => {
range.end += 1;
last = Some(cur);
}
(None, Some(next)) => return Some((next.unwrap(), range.clone())),
(None, Some(next)) => return Some((next, range.clone())),
(None, None) => return None,
}
})
@ -347,30 +266,6 @@ mod tests {
StdRng::seed_from_u64(seed)
}
/// Generates a vector of partition key strings, or an error.
///
/// This function normalises the de-duplicated output of
/// [`partition_keys()`], returning the last observed key when the dedupe
/// [`partition_keys()`] process returns [`None`].
fn generate_denormalised_keys<'a, 'b: 'a>(
batch: &'b MutableBatch,
template_parts: impl Iterator<Item = TemplatePart<'a>>,
) -> Result<Vec<String>, PartitionKeyError> {
let mut last_ret = None;
partition_keys(batch, template_parts)
.map(|v| match v {
Some(this) => {
last_ret = Some(this.clone());
this
}
None => last_ret
.as_ref()
.expect("must have observed prior key")
.clone(),
})
.collect::<Result<Vec<_>, _>>()
}
/// A fixture test asserting the default partition key format, derived from
/// the default partition key template.
#[test]
@ -387,40 +282,15 @@ mod tests {
let template_parts =
TablePartitionTemplateOverride::try_new(None, &Default::default()).unwrap();
let keys: Vec<_> = partition_keys(&batch, template_parts.parts())
.map(|v| v.expect("non-identical consecutive keys"))
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(keys, vec!["1970-01-01".to_string()])
}
#[test]
#[should_panic(expected = r#"error reading time column: ColumnNotFound { column: "time" }"#)]
fn test_zero_sized_batch() {
let batch = MutableBatch::new();
let template_parts = test_table_partition_override(vec![
TemplatePart::TimeFormat("%Y-%m-%d %H:%M:%S"),
TemplatePart::TagValue("region"),
TemplatePart::TagValue("bananas"),
]);
let keys: Vec<_> = partition_batch(&batch, &template_parts).collect::<Vec<_>>();
assert_eq!(keys, vec![])
}
#[test]
fn test_range_encode() {
let collected: Vec<_> =
range_encode(vec![5, 5, 5, 7, 2, 2, 3].into_iter().map(Some)).collect();
assert_eq!(collected, vec![(5, 0..3), (7, 3..4), (2, 4..6), (3, 6..7)])
}
#[test]
fn test_range_encode_sparse() {
let collected: Vec<_> =
range_encode(vec![Some(5), None, None, Some(7), Some(2), None, Some(3)].into_iter())
.collect();
let collected: Vec<_> = range_encode(vec![5, 5, 5, 7, 2, 2, 3].into_iter()).collect();
assert_eq!(collected, vec![(5, 0..3), (7, 3..4), (2, 4..6), (3, 6..7)])
}
@ -431,7 +301,7 @@ mod tests {
.take(1000)
.collect();
let rle: Vec<_> = range_encode(original.iter().cloned().map(Some)).collect();
let rle: Vec<_> = range_encode(original.iter().cloned()).collect();
let mut last_range = rle[0].1.clone();
for (_, range) in &rle[1..] {
@ -474,7 +344,6 @@ mod tests {
writer.commit();
let keys: Vec<_> = partition_keys(&batch, template_parts.into_iter())
.map(|v| v.expect("non-identical consecutive keys"))
.collect::<Result<Vec<_>, _>>()
.unwrap();
@ -490,64 +359,6 @@ mod tests {
)
}
#[test]
fn test_sparse_representation() {
let mut batch = MutableBatch::new();
let mut writer = Writer::new(&mut batch, 6);
writer
.write_time(
"time",
vec![
1,
1,
1,
1685971961464736000,
1685971961464736000,
1685971961464736000,
]
.into_iter(),
)
.unwrap();
writer
.write_tag(
"region",
Some(&[0b00111111]),
vec![
"platanos", "platanos", "platanos", "platanos", "platanos", "bananas",
]
.into_iter(),
)
.unwrap();
let template_parts = [
TemplatePart::TimeFormat("%Y-%m-%d %H:%M:%S"),
TemplatePart::TagValue("region"),
TemplatePart::TagValue("bananas"), // column not present
];
writer.commit();
let mut iter = partition_keys(&batch, template_parts.into_iter());
assert_eq!(
iter.next().unwrap(),
Some(Ok("1970-01-01 00:00:00|platanos|!".to_string()))
);
assert_eq!(iter.next().unwrap(), None);
assert_eq!(iter.next().unwrap(), None);
assert_eq!(
iter.next().unwrap(),
Some(Ok("2023-06-05 13:32:41|platanos|!".to_string()))
);
assert_eq!(iter.next().unwrap(), None);
assert_eq!(
iter.next().unwrap(),
Some(Ok("2023-06-05 13:32:41|bananas|!".to_string()))
);
}
#[test]
fn partitioning_on_fields_panics() {
let mut batch = MutableBatch::new();
@ -569,7 +380,7 @@ mod tests {
writer.commit();
let got: Result<Vec<_>, _> = generate_denormalised_keys(&batch, template_parts.into_iter());
let got: Result<Vec<_>, _> = partition_keys(&batch, template_parts.into_iter()).collect();
assert_matches::assert_matches!(got, Err(PartitionKeyError::TagValueNotTag(_)));
}
@ -623,12 +434,8 @@ mod tests {
writer.commit();
// Generate the full set of partition keys, inserting the
// last observed value when the next key is identical to
// normalise the values.
let keys = generate_denormalised_keys(&batch, template.parts())
.unwrap();
assert_eq!(keys, vec![$want_key.to_string()], "generated key differs");
let keys: Vec<_> = partition_keys(&batch, template.parts()).collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(keys, vec![$want_key.to_string()]);
// Reverse the encoding.
let reversed = build_column_values(&template, &keys[0]);
@ -975,9 +782,7 @@ mod tests {
.collect::<Vec<_>>();
let template = test_table_partition_override(template);
let ret = partition_keys(&batch, template.parts())
.map(|v| v.expect("non-identical consecutive keys"))
.collect::<Result<Vec<_>, _>>();
let ret = partition_keys(&batch, template.parts()).collect::<Result<Vec<_>, _>>();
assert_matches!(ret, Err(PartitionKeyError::InvalidStrftime));
}
@ -1074,7 +879,8 @@ mod tests {
}
writer.commit();
let keys: Vec<_> = generate_denormalised_keys(&batch, template.parts())
let keys: Vec<_> = partition_keys(&batch, template.parts())
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(keys.len(), 1);
@ -1155,9 +961,7 @@ mod tests {
.unwrap();
writer.commit();
let ret = partition_keys(&batch, template.parts())
.map(|v| v.expect("non-identical consecutive keys"))
.collect::<Result<Vec<_>, _>>();
let ret = partition_keys(&batch, template.parts()).collect::<Result<Vec<_>, _>>();
// The is allowed to succeed or fail under this test (but not
// panic), and the returned error/value must match certain

View File

@ -20,8 +20,8 @@ const YMD_SPEC: &str = "%Y-%m-%d";
struct RingBuffer<const N: usize, T> {
buf: [Option<T>; N],
/// Index into to the last wrote value.
last_idx: usize,
/// Index into to the next free/to-be-reused slot.
next_ptr: usize,
}
impl<const N: usize, T> Default for RingBuffer<N, T>
@ -31,7 +31,7 @@ where
fn default() -> Self {
Self {
buf: [(); N].map(|_| Default::default()), // default init for non-const type
last_idx: N - 1,
next_ptr: Default::default(),
}
}
}
@ -48,11 +48,11 @@ where
///
/// This is an O(1) operation.
fn next_slot(&mut self) -> &mut T {
// Advance the next slot pointer
self.last_idx += 1;
self.last_idx %= N;
let v = self.buf[self.next_ptr].get_or_insert_with(Default::default);
let v = self.buf[self.last_idx].get_or_insert_with(Default::default);
// Advance the next slot pointer
self.next_ptr += 1;
self.next_ptr %= N;
v
}
@ -74,11 +74,6 @@ where
}
None
}
/// Return the last wrote value, if any.
fn last(&self) -> Option<&'_ T> {
self.buf[self.last_idx].as_ref()
}
}
/// A strftime-like formatter of epoch timestamps with nanosecond granularity.
@ -231,19 +226,6 @@ impl<'a> StrftimeFormatter<'a> {
}
timestamp - (timestamp % DAY_NANOSECONDS)
}
/// Returns true if the output of rendering `timestamp` will match the last
/// rendered timestamp, after optionally applying the precision reduction
/// optimisation.
pub(crate) fn equals_last(&self, timestamp: i64) -> bool {
// Optionally apply the default format reduction optimisation.
let timestamp = self.maybe_reduce(timestamp);
self.values
.last()
.map(|(ts, _)| *ts == timestamp)
.unwrap_or_default()
}
}
#[cfg(test)]
@ -299,18 +281,7 @@ mod tests {
fmt.values.buf.as_slice(),
[Some((42, _)), Some((12345, _)), None, None, None]
);
assert_eq!(fmt.values.last_idx, 1);
}
#[test]
fn test_ring_buffer_equals_last() {
let mut b = RingBuffer::<4, _>::default();
assert!(b.find(|v| *v == 42).is_none());
*b.next_slot() = 42;
assert_eq!(b.last(), Some(&42));
assert_eq!(fmt.values.next_ptr, 2);
}
const FORMATTER_SPEC_PARTS: &[&str] = &[