feat: add PartitionWrite (#2724) (#2974)

* feat: add PartitionWrite (#2724)

* chore: review feedback
pull/24376/head
Raphael Taylor-Davies 2021-10-28 07:18:10 +01:00 committed by GitHub
parent 3010ff4176
commit ec07c11d36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 295 additions and 26 deletions

View File

@ -66,4 +66,9 @@ impl TimestampSummary {
self.counts[timestamp.minute() as usize] += 1;
self.stats.update(&timestamp.timestamp_nanos())
}
/// Records a timestamp value from nanos
pub fn record_nanos(&mut self, timestamp_nanos: i64) {
self.record(Time::from_timestamp_nanos(timestamp_nanos))
}
}

View File

@ -7,17 +7,19 @@ use crate::MutableBatch;
use schema::TIME_COLUMN_NAME;
use std::ops::Range;
/// Given a [`MutableBatch`] and a time predicate, returns the row indexes that pass the predicate
/// Given a [`MutableBatch`] a time predicate and a set of row ranges, returns the row
/// indexes that pass the predicate
///
/// # Panic
///
/// Panics if `batch` does not contain a time column of the correct type
pub fn filter_time<'a, F>(
batch: &'a MutableBatch,
predicate: F,
) -> impl Iterator<Item = Range<usize>> + 'a
ranges: &'a [Range<usize>],
mut predicate: F,
) -> Vec<Range<usize>>
where
F: 'a + Fn(i64) -> bool,
F: FnMut(i64) -> bool,
{
let col_idx = *batch
.column_names
@ -31,19 +33,26 @@ where
};
// Time column is not nullable so can skip checking mask
filter_slice(col_data, predicate)
let mut ret = vec![];
for range in ranges {
let offset = range.start;
ret.extend(
filter_slice(&col_data[range.clone()], &mut predicate)
.map(|r| (r.start + offset)..(r.end + offset)),
)
}
ret
}
fn filter_slice<'a, T, F>(
col_data: &'a [T],
predicate: F,
predicate: &'a mut F,
) -> impl Iterator<Item = Range<usize>> + 'a
where
T: Copy,
F: 'a + Fn(T) -> bool,
F: 'a + FnMut(T) -> bool,
{
let mut range: Range<usize> = 0..0;
let mut values = col_data.iter();
std::iter::from_fn(move || loop {
@ -72,6 +81,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::writer::Writer;
use rand::prelude::*;
fn make_rng() -> StdRng {
@ -83,11 +93,11 @@ mod tests {
#[test]
fn test_filter_slice() {
let collected: Vec<_> =
filter_slice(&[0, 1, 2, 3, 4, 5, 6], |x| x != 1 && x != 4).collect();
filter_slice(&[0, 1, 2, 3, 4, 5, 6], &mut |x| x != 1 && x != 4).collect();
assert_eq!(collected, vec![0..1, 2..4, 5..7]);
let collected: Vec<_> =
filter_slice(&[0, 1, 2, 3, 4, 5, 6], |x| x == 1 || x == 2 || x == 6).collect();
filter_slice(&[0, 1, 2, 3, 4, 5, 6], &mut |x| x == 1 || x == 2 || x == 6).collect();
assert_eq!(collected, vec![1..3, 6..7])
}
@ -98,9 +108,9 @@ mod tests {
.take(1000)
.collect();
let predicate = |x: u32| x & 1 == 0;
let mut predicate = |x: u32| x & 1 == 0;
let indexes: Vec<_> = filter_slice(&data, predicate).flatten().collect();
let indexes: Vec<_> = filter_slice(&data, &mut predicate).flatten().collect();
let expected: Vec<_> = data
.iter()
@ -113,4 +123,34 @@ mod tests {
assert_eq!(indexes, expected);
}
#[test]
fn test_filter_batch() {
let mut batch = MutableBatch::new();
let mut rng = make_rng();
let data: Vec<_> = std::iter::from_fn(|| Some(rng.next_u32() as i64))
.take(1000)
.collect();
let ranges = &[0..87, 90..442, 634..800];
let mut predicate = |x: i64| x & 1 == 0;
let mut writer = Writer::new(&mut batch, 1000);
writer.write_time("time", data.iter().cloned()).unwrap();
writer.commit();
let actual: Vec<_> = filter_time(&batch, ranges, &mut predicate)
.into_iter()
.flatten()
.collect();
let expected: Vec<_> = ranges
.iter()
.cloned()
.flatten()
.filter(|idx| predicate(data[*idx]))
.collect();
assert_eq!(actual, expected);
}
}

View File

@ -16,18 +16,21 @@
//! permitting fast conversion to [`RecordBatch`]
//!
use crate::column::Column;
use crate::column::{Column, ColumnData};
use arrow::record_batch::RecordBatch;
use data_types::database_rules::PartitionTemplate;
use data_types::write_summary::TimestampSummary;
use entry::TableBatch;
use hashbrown::HashMap;
use schema::selection::Selection;
use schema::{builder::SchemaBuilder, Schema};
use schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::num::NonZeroUsize;
use std::ops::Range;
pub mod column;
pub mod filter;
pub mod partition;
mod filter;
mod partition;
pub mod writer;
#[allow(missing_docs)]
@ -57,6 +60,9 @@ pub enum Error {
#[snafu(display("Mask had {} rows, expected {}", expected, actual))]
IncorrectMaskLength { expected: usize, actual: usize },
#[snafu(context(false))]
WriterError { source: writer::Error },
}
/// A specialized `Error` for [`MutableBatch`] errors
@ -159,8 +165,24 @@ impl MutableBatch {
self.row_count
}
/// Returns a summary of the write timestamps in this chunk if a
/// time column exists
pub fn timestamp_summary(&self) -> Option<TimestampSummary> {
let time = self.column_names.get(TIME_COLUMN_NAME)?;
let mut summary = TimestampSummary::default();
match &self.columns[*time].data {
ColumnData::I64(col_data, _) => {
for t in col_data {
summary.record_nanos(*t)
}
}
_ => unreachable!(),
}
Some(summary)
}
/// Extend this [`MutableBatch`] with the contents of `other`
pub fn extend_from(&mut self, other: &Self) -> writer::Result<()> {
pub fn extend_from(&mut self, other: &Self) -> Result<()> {
let mut writer = writer::Writer::new(self, other.row_count);
writer.write_batch(other)?;
writer.commit();
@ -168,7 +190,7 @@ impl MutableBatch {
}
/// Extend this [`MutableBatch`] with `range` rows from `other`
pub fn extend_from_range(&mut self, other: &Self, range: Range<usize>) -> writer::Result<()> {
pub fn extend_from_range(&mut self, other: &Self, range: Range<usize>) -> Result<()> {
let mut writer = writer::Writer::new(self, range.end - range.start);
writer.write_batch_range(other, range)?;
writer.commit();
@ -176,11 +198,7 @@ impl MutableBatch {
}
/// Extend this [`MutableBatch`] with `ranges` rows from `other`
pub fn extend_from_ranges(
&mut self,
other: &Self,
ranges: &[Range<usize>],
) -> writer::Result<()> {
pub fn extend_from_ranges(&mut self, other: &Self, ranges: &[Range<usize>]) -> Result<()> {
let to_insert = ranges.iter().map(|x| x.end - x.start).sum();
let mut writer = writer::Writer::new(self, to_insert);
@ -190,7 +208,7 @@ impl MutableBatch {
}
/// Returns a reference to the specified column
pub(crate) fn column(&self, column: &str) -> Result<&Column> {
pub fn column(&self, column: &str) -> Result<&Column> {
let idx = self
.column_names
.get(column)
@ -243,7 +261,8 @@ impl MutableBatch {
})?;
}
Ok(())
// https://github.com/shepmaster/snafu/issues/315
Ok::<_, Error>(())
})?;
for fb_column in columns {
@ -283,6 +302,156 @@ impl MutableBatch {
}
}
/// A payload that can be written to a mutable batch
pub trait WritePayload {
/// Write this payload to `batch`
fn write_to_batch(&self, batch: &mut MutableBatch) -> Result<()>;
}
impl WritePayload for MutableBatch {
fn write_to_batch(&self, batch: &mut MutableBatch) -> Result<()> {
batch.extend_from(self)
}
}
/// A [`MutableBatch`] with a non-zero set of row ranges to write
#[derive(Debug)]
pub struct PartitionWrite<'a> {
batch: &'a MutableBatch,
ranges: Vec<Range<usize>>,
min_timestamp: i64,
max_timestamp: i64,
row_count: NonZeroUsize,
}
impl<'a> PartitionWrite<'a> {
/// Create a new [`PartitionWrite`] with the entire range of the provided batch
///
/// # Panic
///
/// Panics if the batch has no rows
pub fn new(batch: &'a MutableBatch) -> Self {
let row_count = NonZeroUsize::new(batch.row_count).unwrap();
let time = get_time_column(batch);
let (min_timestamp, max_timestamp) = min_max_time(time);
Self {
batch,
ranges: vec![0..batch.row_count],
min_timestamp,
max_timestamp,
row_count,
}
}
/// Returns the minimum timestamp in the write
pub fn min_timestamp(&self) -> i64 {
self.min_timestamp
}
/// Returns the maximum timestamp in the write
pub fn max_timestamp(&self) -> i64 {
self.max_timestamp
}
/// Returns the number of rows in the write
pub fn rows(&self) -> NonZeroUsize {
self.row_count
}
/// Returns a [`PartitionWrite`] containing just the rows of `Self` that pass
/// the provided time predicate, or None if no rows
pub fn filter(&self, predicate: impl Fn(i64) -> bool) -> Option<PartitionWrite<'a>> {
let mut min_timestamp = i64::MAX;
let mut max_timestamp = i64::MIN;
let mut row_count = 0_usize;
// Construct a predicate that lets us inspect the timestamps as they are filtered
let inspect = |t| match predicate(t) {
true => {
min_timestamp = min_timestamp.min(t);
max_timestamp = max_timestamp.max(t);
row_count += 1;
true
}
false => false,
};
let ranges: Vec<_> = filter::filter_time(self.batch, &self.ranges, inspect);
let row_count = NonZeroUsize::new(row_count)?;
Some(PartitionWrite {
batch: self.batch,
ranges,
min_timestamp,
max_timestamp,
row_count,
})
}
/// Create a collection of [`PartitionWrite`] indexed by partition key
/// from a [`MutableBatch`] and [`PartitionTemplate`]
pub fn partition(
table_name: &str,
batch: &'a MutableBatch,
partition_template: &PartitionTemplate,
) -> HashMap<String, Self> {
use hashbrown::hash_map::Entry;
let time = get_time_column(batch);
let mut partition_ranges = HashMap::new();
for (partition, range) in partition::partition_batch(batch, table_name, partition_template)
{
let row_count = NonZeroUsize::new(range.end - range.start).unwrap();
let (min_timestamp, max_timestamp) = min_max_time(&time[range.clone()]);
match partition_ranges.entry(partition) {
Entry::Vacant(v) => {
v.insert(PartitionWrite {
batch,
ranges: vec![range],
min_timestamp,
max_timestamp,
row_count,
});
}
Entry::Occupied(mut o) => {
let pw = o.get_mut();
pw.min_timestamp = pw.min_timestamp.min(min_timestamp);
pw.max_timestamp = pw.max_timestamp.max(max_timestamp);
pw.row_count = NonZeroUsize::new(pw.row_count.get() + row_count.get()).unwrap();
pw.ranges.push(range);
}
}
}
partition_ranges
}
}
impl<'a> WritePayload for PartitionWrite<'a> {
fn write_to_batch(&self, batch: &mut MutableBatch) -> Result<()> {
batch.extend_from_ranges(self.batch, &self.ranges)
}
}
fn get_time_column(batch: &MutableBatch) -> &[i64] {
let time_column = batch.column(TIME_COLUMN_NAME).expect("time column");
match &time_column.data {
ColumnData::I64(col_data, _) => col_data,
x => unreachable!("expected i64 got {} for time column", x),
}
}
fn min_max_time(col: &[i64]) -> (i64, i64) {
let mut min_timestamp = i64::MAX;
let mut max_timestamp = i64::MIN;
for t in col {
min_timestamp = min_timestamp.min(*t);
max_timestamp = max_timestamp.max(*t);
}
(min_timestamp, max_timestamp)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,5 +1,6 @@
use arrow_util::assert_batches_eq;
use data_types::partition_metadata::{StatValues, Statistics};
use data_types::write_summary::TimestampSummary;
use mutable_batch::writer::Writer;
use mutable_batch::MutableBatch;
use schema::selection::Selection;
@ -333,4 +334,14 @@ fn test_basic() {
assert_batches_eq!(expected_data, &[batch.to_arrow(Selection::All).unwrap()]);
assert_eq!(stats, expected_stats);
let mut expected_timestamps = TimestampSummary::default();
for t in [
7, 5, 7, 3, 5, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
] {
expected_timestamps.record_nanos(t)
}
let timestamps = batch.timestamp_summary().unwrap();
assert_eq!(timestamps, expected_timestamps);
}

View File

@ -19,9 +19,10 @@ use hashbrown::HashSet;
use rand::prelude::*;
use arrow_util::bitset::BitSet;
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use data_types::partition_metadata::{IsNan, StatValues, Statistics};
use mutable_batch::writer::Writer;
use mutable_batch::MutableBatch;
use mutable_batch::{MutableBatch, PartitionWrite, WritePayload};
use schema::selection::Selection;
fn make_rng() -> StdRng {
@ -409,3 +410,46 @@ fn test_writer_fuzz() {
assert_eq!(actual_statistics, expected_statistics);
}
#[test]
fn test_partition_write() {
let mut rng = make_rng();
let mut batch = MutableBatch::new();
let expected = extend_batch(&mut rng, &mut batch);
let w = PartitionWrite::new(&batch);
assert_eq!(w.rows().get(), expected.tag_expected.len());
let verify_write = |write: &PartitionWrite<'_>| {
// Verify that the time and row statistics computed by the PartitionWrite
// match what actually gets written to a MutableBatch
let mut temp = MutableBatch::new();
write.write_to_batch(&mut temp).unwrap();
let stats = match temp.column("time").unwrap().stats() {
Statistics::I64(stats) => stats,
_ => unreachable!(),
};
assert_eq!(write.min_timestamp(), stats.min.unwrap());
assert_eq!(write.max_timestamp(), stats.max.unwrap());
assert_eq!(write.rows().get() as u64, stats.total_count);
};
let partitioned = PartitionWrite::partition(
"table",
&batch,
&PartitionTemplate {
parts: vec![TemplatePart::Column("b1".to_string())],
},
);
for (_, write) in &partitioned {
verify_write(write);
match write.filter(|x| x & 1 == 0) {
Some(filtered) => verify_write(&filtered),
None => continue,
}
}
}