diff --git a/arrow_util/src/bitset.rs b/arrow_util/src/bitset.rs index 70c212b529..08b4559c4a 100644 --- a/arrow_util/src/bitset.rs +++ b/arrow_util/src/bitset.rs @@ -29,6 +29,12 @@ impl BitSet { bitset } + /// Reserve space for `count` further bits + pub fn reserve(&mut self, count: usize) { + let new_buf_len = (self.len + count + 7) >> 3; + self.buffer.reserve(new_buf_len); + } + /// Appends `count` unset bits pub fn append_unset(&mut self, count: usize) { self.len += count; diff --git a/mutable_batch/src/column.rs b/mutable_batch/src/column.rs index b881fa36c1..842818026a 100644 --- a/mutable_batch/src/column.rs +++ b/mutable_batch/src/column.rs @@ -1,10 +1,9 @@ //! A [`Column`] stores the rows for a given column name use std::fmt::Formatter; -use std::iter::Enumerate; +use std::iter::{Enumerate, Zip}; use std::mem; use std::sync::Arc; -use std::{convert::TryInto, iter::Zip}; use arrow::error::ArrowError; use arrow::{ diff --git a/mutable_batch/src/lib.rs b/mutable_batch/src/lib.rs index fc8d945215..c70a2cde09 100644 --- a/mutable_batch/src/lib.rs +++ b/mutable_batch/src/lib.rs @@ -175,6 +175,20 @@ impl MutableBatch { Ok(()) } + /// Extend this [`MutableBatch`] with `ranges` rows from `other` + pub fn extend_from_ranges( + &mut self, + other: &Self, + ranges: &[Range], + ) -> writer::Result<()> { + let to_insert = ranges.iter().map(|x| x.end - x.start).sum(); + + let mut writer = writer::Writer::new(self, to_insert); + writer.write_batch_ranges(other, ranges)?; + writer.commit(); + Ok(()) + } + /// Returns a reference to the specified column pub(crate) fn column(&self, column: &str) -> Result<&Column> { let idx = self diff --git a/mutable_batch/src/writer.rs b/mutable_batch/src/writer.rs index 9ed436080e..a330f1bf98 100644 --- a/mutable_batch/src/writer.rs +++ b/mutable_batch/src/writer.rs @@ -499,86 +499,105 @@ impl<'a> Writer<'a> { src: &MutableBatch, range: Range, ) -> Result<()> { - if range.start == 0 && range.end == src.row_count { + self.write_batch_ranges(src, &[range]) + } + + /// Write the rows identified by `ranges` to the provided MutableBatch + pub(crate) fn write_batch_ranges( + &mut self, + src: &MutableBatch, + ranges: &[Range], + ) -> Result<()> { + let to_insert = self.to_insert; + + if to_insert == src.row_count { return self.write_batch(src); } - assert_eq!(range.end - range.start, self.to_insert); for (src_col_name, src_col_idx) in &src.column_names { let src_col = &src.columns[*src_col_idx]; let (dst_col_idx, dst_col) = self.column_mut(src_col_name, src_col.influx_type)?; let stats = match (&mut dst_col.data, &src_col.data) { - (ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => { - dst_data.extend_from_slice(&src_data[range.clone()]); - Statistics::F64(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - &src_data[x] - })) - } - (ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => { - dst_data.extend_from_slice(&src_data[range.clone()]); - Statistics::I64(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - &src_data[x] - })) - } - (ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => { - dst_data.extend_from_slice(&src_data[range.clone()]); - Statistics::U64(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - &src_data[x] - })) - } + (ColumnData::F64(dst_data, _), ColumnData::F64(src_data, _)) => Statistics::F64( + write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data), + ), + (ColumnData::I64(dst_data, _), ColumnData::I64(src_data, _)) => Statistics::I64( + write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data), + ), + (ColumnData::U64(dst_data, _), ColumnData::U64(src_data, _)) => Statistics::U64( + write_slice(to_insert, ranges, src_col.valid.bytes(), src_data, dst_data), + ), (ColumnData::Bool(dst_data, _), ColumnData::Bool(src_data, _)) => { - dst_data.extend_from_range(src_data, range.clone()); - Statistics::Bool(compute_bool_stats( - src_col.valid.bytes(), - range.clone(), - src_data, - )) + dst_data.reserve(to_insert); + let mut stats = StatValues::new_empty(); + for range in ranges { + dst_data.extend_from_range(src_data, range.clone()); + compute_bool_stats( + src_col.valid.bytes(), + range.clone(), + src_data, + &mut stats, + ) + } + Statistics::Bool(stats) } (ColumnData::String(dst_data, _), ColumnData::String(src_data, _)) => { - dst_data.extend_from_range(src_data, range.clone()); - Statistics::String(compute_stats(src_col.valid.bytes(), range.clone(), |x| { - src_data.get(x).unwrap() - })) + let mut stats = StatValues::new_empty(); + for range in ranges { + dst_data.extend_from_range(src_data, range.clone()); + compute_stats(src_col.valid.bytes(), range.clone(), &mut stats, |x| { + src_data.get(x).unwrap() + }) + } + Statistics::String(stats) } ( ColumnData::Tag(dst_data, dst_dict, _), ColumnData::Tag(src_data, src_dict, _), ) => { + dst_data.reserve(to_insert); + let mut mapping: Vec<_> = vec![None; src_dict.values().len()]; - let mut stats = StatValues::new_empty(); - dst_data.extend(src_data[range.clone()].iter().map(|src_id| match *src_id { - INVALID_DID => { - stats.update_for_nulls(1); - INVALID_DID - } - _ => { - let maybe_did = &mut mapping[*src_id as usize]; - match maybe_did { - Some(did) => { - stats.total_count += 1; - *did + for range in ranges { + dst_data.extend(src_data[range.clone()].iter().map( + |src_id| match *src_id { + INVALID_DID => { + stats.update_for_nulls(1); + INVALID_DID } - None => { - let value = src_dict.lookup_id(*src_id).unwrap(); - stats.update(value); + _ => { + let maybe_did = &mut mapping[*src_id as usize]; + match maybe_did { + Some(did) => { + stats.total_count += 1; + *did + } + None => { + let value = src_dict.lookup_id(*src_id).unwrap(); + stats.update(value); - let did = dst_dict.lookup_value_or_insert(value); - *maybe_did = Some(did); - did + let did = dst_dict.lookup_value_or_insert(value); + *maybe_did = Some(did); + did + } + } } - } - } - })); + }, + )); + } Statistics::String(stats) } _ => unreachable!(), }; - dst_col - .valid - .extend_from_range(&src_col.valid, range.clone()); + dst_col.valid.reserve(to_insert); + for range in ranges { + dst_col + .valid + .extend_from_range(&src_col.valid, range.clone()); + } self.statistics.push((dst_col_idx, stats)); } @@ -707,12 +726,16 @@ fn append_valid_mask(column: &mut Column, valid_mask: Option<&[u8]>, to_insert: } } -fn compute_bool_stats(valid: &[u8], range: Range, col_data: &BitSet) -> StatValues { +fn compute_bool_stats( + valid: &[u8], + range: Range, + col_data: &BitSet, + stats: &mut StatValues, +) { // There are likely faster ways to do this let indexes = iter_set_positions_with_offset(valid, range.start).take_while(|idx| *idx < range.end); - let mut stats = StatValues::new_empty(); for index in indexes { let value = col_data.get(index); stats.update(&value) @@ -720,11 +743,33 @@ fn compute_bool_stats(valid: &[u8], range: Range, col_data: &BitSet) -> S let count = range.end - range.start; stats.update_for_nulls(count as u64 - stats.total_count); +} + +fn write_slice( + to_insert: usize, + ranges: &[Range], + valid: &[u8], + src_data: &[T], + dst_data: &mut Vec, +) -> StatValues +where + T: Clone + PartialOrd + IsNan, +{ + dst_data.reserve(to_insert); + let mut stats = StatValues::new_empty(); + for range in ranges { + dst_data.extend_from_slice(&src_data[range.clone()]); + compute_stats(valid, range.clone(), &mut stats, |x| &src_data[x]); + } stats } -fn compute_stats<'a, T, U, F>(valid: &[u8], range: Range, accessor: F) -> StatValues -where +fn compute_stats<'a, T, U, F>( + valid: &[u8], + range: Range, + stats: &mut StatValues, + accessor: F, +) where U: 'a + ToOwned + PartialOrd + ?Sized + IsNan, F: Fn(usize) -> &'a U, T: std::borrow::Borrow, @@ -733,14 +778,12 @@ where .take_while(|idx| *idx < range.end) .map(accessor); - let mut stats = StatValues::new_empty(); for value in values { stats.update(value) } let count = range.end - range.start; stats.update_for_nulls(count as u64 - stats.total_count); - stats } impl<'a> Drop for Writer<'a> {