refactor: emit PartitionKey from partitioner
Changes the partitioning code to emit a PartitionKey, instead of a bare String.pull/24376/head
parent
7c60edd38c
commit
61182f506b
|
@ -1,7 +1,7 @@
|
|||
//! Write payload abstractions derived from [`MutableBatch`]
|
||||
|
||||
use crate::{column::ColumnData, MutableBatch, Result};
|
||||
use data_types::PartitionTemplate;
|
||||
use data_types::{PartitionKey, PartitionTemplate};
|
||||
use hashbrown::HashMap;
|
||||
use schema::TIME_COLUMN_NAME;
|
||||
use std::{num::NonZeroUsize, ops::Range};
|
||||
|
@ -102,7 +102,7 @@ impl<'a> PartitionWrite<'a> {
|
|||
table_name: &str,
|
||||
batch: &'a MutableBatch,
|
||||
partition_template: &PartitionTemplate,
|
||||
) -> HashMap<String, Self> {
|
||||
) -> HashMap<PartitionKey, Self> {
|
||||
use hashbrown::hash_map::Entry;
|
||||
let time = get_time_column(batch);
|
||||
|
||||
|
@ -112,7 +112,7 @@ impl<'a> PartitionWrite<'a> {
|
|||
let row_count = NonZeroUsize::new(range.end - range.start).unwrap();
|
||||
let (min_timestamp, max_timestamp) = min_max_time(&time[range.clone()]);
|
||||
|
||||
match partition_ranges.entry(partition) {
|
||||
match partition_ranges.entry(PartitionKey::from(partition)) {
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(PartitionWrite {
|
||||
batch,
|
||||
|
|
|
@ -134,7 +134,7 @@ fn test_encode_decode_null_columns_issue_4272() {
|
|||
// Round-trip the "1970-01-01" partition
|
||||
let mut got = MutableBatch::default();
|
||||
partitions
|
||||
.remove("1970-01-01")
|
||||
.remove(&"1970-01-01".into())
|
||||
.expect("partition not found")
|
||||
.write_to_batch(&mut got)
|
||||
.expect("should write");
|
||||
|
@ -156,7 +156,7 @@ fn test_encode_decode_null_columns_issue_4272() {
|
|||
// And finally assert the "1970-07-05" round-trip
|
||||
let mut got = MutableBatch::default();
|
||||
partitions
|
||||
.remove("1970-07-05")
|
||||
.remove(&"1970-07-05".into())
|
||||
.expect("partition not found")
|
||||
.write_to_batch(&mut got)
|
||||
.expect("should write");
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use super::DmlHandler;
|
||||
use async_trait::async_trait;
|
||||
use data_types::{DatabaseName, DeletePredicate, PartitionTemplate};
|
||||
use data_types::{DatabaseName, DeletePredicate, PartitionKey, PartitionTemplate};
|
||||
use hashbrown::HashMap;
|
||||
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
|
||||
use observability_deps::tracing::*;
|
||||
|
@ -18,13 +18,13 @@ pub enum PartitionError {
|
|||
/// A decorator of `T`, tagging it with the partition key derived from it.
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
pub struct Partitioned<T> {
|
||||
key: String,
|
||||
key: PartitionKey,
|
||||
payload: T,
|
||||
}
|
||||
|
||||
impl<T> Partitioned<T> {
|
||||
/// Wrap `payload` with a partition `key`.
|
||||
pub fn new(key: String, payload: T) -> Self {
|
||||
pub fn new(key: PartitionKey, payload: T) -> Self {
|
||||
Self { key, payload }
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ impl<T> Partitioned<T> {
|
|||
}
|
||||
|
||||
/// Unwrap `Self` returning the inner payload `T` and the partition key.
|
||||
pub fn into_parts(self) -> (String, T) {
|
||||
pub fn into_parts(self) -> (PartitionKey, T) {
|
||||
(self.key, self.payload)
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ impl DmlHandler for Partitioner {
|
|||
_span_ctx: Option<SpanContext>,
|
||||
) -> Result<Self::WriteOutput, Self::WriteError> {
|
||||
// A collection of partition-keyed, per-table MutableBatch instances.
|
||||
let mut partitions: HashMap<_, HashMap<_, MutableBatch>> = HashMap::default();
|
||||
let mut partitions: HashMap<PartitionKey, HashMap<_, MutableBatch>> = HashMap::default();
|
||||
|
||||
for (table_name, batch) in batch {
|
||||
// Partition the table batch according to the configured partition
|
||||
|
@ -94,10 +94,7 @@ impl DmlHandler for Partitioner {
|
|||
|
||||
Ok(partitions
|
||||
.into_iter()
|
||||
.map(|(key, batch)| Partitioned {
|
||||
key,
|
||||
payload: batch,
|
||||
})
|
||||
.map(|(key, batch)| Partitioned::new(key, batch))
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
|
@ -185,11 +182,11 @@ mod tests {
|
|||
(@assert_writes, $got:ident, $($partition_key:expr => $want_tables:expr, )*) => {
|
||||
// Construct the desired writes, keyed by partition key
|
||||
#[allow(unused_mut)]
|
||||
let mut want_writes: HashMap<String, _> = Default::default();
|
||||
let mut want_writes: HashMap<PartitionKey, _> = Default::default();
|
||||
$(
|
||||
let mut want: Vec<String> = $want_tables.into_iter().map(|t| t.to_string()).collect();
|
||||
want.sort();
|
||||
want_writes.insert($partition_key.to_string(), want);
|
||||
want_writes.insert(PartitionKey::from($partition_key), want);
|
||||
)*
|
||||
|
||||
pretty_assertions::assert_eq!(want_writes, $got);
|
||||
|
|
|
@ -216,7 +216,7 @@ mod tests {
|
|||
fn lp_to_writes(lp: &str) -> Partitioned<HashMap<String, MutableBatch>> {
|
||||
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
|
||||
.expect("failed to build test writes from LP");
|
||||
Partitioned::new("key".to_owned(), writes)
|
||||
Partitioned::new("key".into(), writes)
|
||||
}
|
||||
|
||||
// Init a mock write buffer with the given number of sequencers.
|
||||
|
|
Loading…
Reference in New Issue