feat: add MutableBatch::extend_from_ranges (#2961)

pull/24376/head
Raphael Taylor-Davies 2021-10-25 15:41:25 +01:00 committed by GitHub
parent 820e0d56bb
commit f1fd40390a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 124 additions and 62 deletions

View File

@ -29,6 +29,12 @@ impl BitSet {
bitset
}
/// Reserve space for `count` further bits
pub fn reserve(&mut self, count: usize) {
let new_buf_len = (self.len + count + 7) >> 3;
self.buffer.reserve(new_buf_len);
}
/// Appends `count` unset bits
pub fn append_unset(&mut self, count: usize) {
self.len += count;

View File

@ -1,10 +1,9 @@
//! A [`Column`] stores the rows for a given column name
use std::fmt::Formatter;
use std::iter::Enumerate;
use std::iter::{Enumerate, Zip};
use std::mem;
use std::sync::Arc;
use std::{convert::TryInto, iter::Zip};
use arrow::error::ArrowError;
use arrow::{

View File

@ -175,6 +175,20 @@ impl MutableBatch {
Ok(())
}
/// Extend this [`MutableBatch`] with `ranges` rows from `other`
pub fn extend_from_ranges(
&mut self,
other: &Self,
ranges: &[Range<usize>],
) -> writer::Result<()> {
let to_insert = ranges.iter().map(|x| x.end - x.start).sum();
let mut writer = writer::Writer::new(self, to_insert);
writer.write_batch_ranges(other, ranges)?;
writer.commit();
Ok(())
}
/// Returns a reference to the specified column
pub(crate) fn column(&self, column: &str) -> Result<&Column> {
let idx = self

View File

@ -499,86 +499,105 @@ impl<'a> Writer<'a> {
src: &MutableBatch,
range: Range<usize>,
) -> Result<()> {
if range.start == 0 && range.end == src.row_count {
self.write_batch_ranges(src, &[range])
}
/// Write the rows identified by `ranges` to the provided MutableBatch
pub(crate) fn write_batch_ranges(
&mut self,
src: &MutableBatch,
ranges: &[Range<usize>],
) -> Result<()> {
let to_insert = self.to_insert;
if to_insert == src.row_count {
return self.write_batch(src);
}
assert_eq!(range.end - range.start, self.to_insert);
for (src_col_name, src_col_idx) in &src.column_names {
let src_col = &src.columns[*src_col_idx];
let (dst_col_idx, dst_col) = self.column_mut(src_col_name, src_col.influx_type)?;
let stats = match (&mut dst_col.data, &src_col.data) {
(ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => {
dst_data.extend_from_slice(&src_data[range.clone()]);
Statistics::F64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
&src_data[x]
}))
}
(ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => {
dst_data.extend_from_slice(&src_data[range.clone()]);
Statistics::I64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
&src_data[x]
}))
}
(ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => {
dst_data.extend_from_slice(&src_data[range.clone()]);
Statistics::U64(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
&src_data[x]
}))
}
(ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => Statistics::F64(
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
),
(ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => Statistics::I64(
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
),
(ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => Statistics::U64(
write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data),
),
(ColumnData::Bool(dst_data, _), ColumnData::Bool(src_data, _)) => {
dst_data.extend_from_range(src_data, range.clone());
Statistics::Bool(compute_bool_stats(
src_col.valid.bytes(),
range.clone(),
src_data,
))
dst_data.reserve(to_insert);
let mut stats = StatValues::new_empty();
for range in ranges {
dst_data.extend_from_range(src_data, range.clone());
compute_bool_stats(
src_col.valid.bytes(),
range.clone(),
src_data,
&mut stats,
)
}
Statistics::Bool(stats)
}
(ColumnData::String(dst_data, _), ColumnData::String(src_data, _)) => {
dst_data.extend_from_range(src_data, range.clone());
Statistics::String(compute_stats(src_col.valid.bytes(), range.clone(), |x| {
src_data.get(x).unwrap()
}))
let mut stats = StatValues::new_empty();
for range in ranges {
dst_data.extend_from_range(src_data, range.clone());
compute_stats(src_col.valid.bytes(), range.clone(), &mut stats, |x| {
src_data.get(x).unwrap()
})
}
Statistics::String(stats)
}
(
ColumnData::Tag(dst_data, dst_dict, _),
ColumnData::Tag(src_data, src_dict, _),
) => {
dst_data.reserve(to_insert);
let mut mapping: Vec<_> = vec![None; src_dict.values().len()];
let mut stats = StatValues::new_empty();
dst_data.extend(src_data[range.clone()].iter().map(|src_id| match *src_id {
INVALID_DID => {
stats.update_for_nulls(1);
INVALID_DID
}
_ => {
let maybe_did = &mut mapping[*src_id as usize];
match maybe_did {
Some(did) => {
stats.total_count += 1;
*did
for range in ranges {
dst_data.extend(src_data[range.clone()].iter().map(
|src_id| match *src_id {
INVALID_DID => {
stats.update_for_nulls(1);
INVALID_DID
}
None => {
let value = src_dict.lookup_id(*src_id).unwrap();
stats.update(value);
_ => {
let maybe_did = &mut mapping[*src_id as usize];
match maybe_did {
Some(did) => {
stats.total_count += 1;
*did
}
None => {
let value = src_dict.lookup_id(*src_id).unwrap();
stats.update(value);
let did = dst_dict.lookup_value_or_insert(value);
*maybe_did = Some(did);
did
let did = dst_dict.lookup_value_or_insert(value);
*maybe_did = Some(did);
did
}
}
}
}
}
}));
},
));
}
Statistics::String(stats)
}
_ => unreachable!(),
};
dst_col
.valid
.extend_from_range(&src_col.valid, range.clone());
dst_col.valid.reserve(to_insert);
for range in ranges {
dst_col
.valid
.extend_from_range(&src_col.valid, range.clone());
}
self.statistics.push((dst_col_idx, stats));
}
@ -707,12 +726,16 @@ fn append_valid_mask(column: &mut Column, valid_mask: Option<&[u8]>, to_insert:
}
}
fn compute_bool_stats(valid: &[u8], range: Range<usize>, col_data: &BitSet) -> StatValues<bool> {
fn compute_bool_stats(
valid: &[u8],
range: Range<usize>,
col_data: &BitSet,
stats: &mut StatValues<bool>,
) {
// There are likely faster ways to do this
let indexes =
iter_set_positions_with_offset(valid, range.start).take_while(|idx| *idx < range.end);
let mut stats = StatValues::new_empty();
for index in indexes {
let value = col_data.get(index);
stats.update(&value)
@ -720,11 +743,33 @@ fn compute_bool_stats(valid: &[u8], range: Range<usize>, col_data: &BitSet) -> S
let count = range.end - range.start;
stats.update_for_nulls(count as u64 - stats.total_count);
}
fn write_slice<T>(
to_insert: usize,
ranges: &[Range<usize>],
valid: &[u8],
src_data: &[T],
dst_data: &mut Vec<T>,
) -> StatValues<T>
where
T: Clone + PartialOrd + IsNan,
{
dst_data.reserve(to_insert);
let mut stats = StatValues::new_empty();
for range in ranges {
dst_data.extend_from_slice(&src_data[range.clone()]);
compute_stats(valid, range.clone(), &mut stats, |x| &src_data[x]);
}
stats
}
fn compute_stats<'a, T, U, F>(valid: &[u8], range: Range<usize>, accessor: F) -> StatValues<T>
where
fn compute_stats<'a, T, U, F>(
valid: &[u8],
range: Range<usize>,
stats: &mut StatValues<T>,
accessor: F,
) where
U: 'a + ToOwned<Owned = T> + PartialOrd + ?Sized + IsNan,
F: Fn(usize) -> &'a U,
T: std::borrow::Borrow<U>,
@ -733,14 +778,12 @@ where
.take_while(|idx| *idx < range.end)
.map(accessor);
let mut stats = StatValues::new_empty();
for value in values {
stats.update(value)
}
let count = range.end - range.start;
stats.update_for_nulls(count as u64 - stats.total_count);
stats
}
impl<'a> Drop for Writer<'a> {