diff --git a/arrow_util/src/bitset.rs b/arrow_util/src/bitset.rs index 5b0f5f5879..eb01217f59 100644 --- a/arrow_util/src/bitset.rs +++ b/arrow_util/src/bitset.rs @@ -66,6 +66,11 @@ impl BitSet { self.len = len; } + /// Extends this [`BitSet`] by the context of `other` + pub fn extend_from(&mut self, other: &BitSet) { + self.append_bits(other.len, &other.buffer) + } + /// Appends `count` boolean values from the slice of packed bits pub fn append_bits(&mut self, count: usize, to_set: &[u8]) { assert_eq!((count + 7) >> 3, to_set.len()); diff --git a/arrow_util/src/string.rs b/arrow_util/src/string.rs index 896ddd5914..76bd7c5baa 100644 --- a/arrow_util/src/string.rs +++ b/arrow_util/src/string.rs @@ -62,6 +62,18 @@ impl + FromPrimitive + Zero> PackedStringArray { id } + /// Extends this [`PackedStringArray`] by the contents of `other` + pub fn extend_from(&mut self, other: &PackedStringArray) { + let offset = self.storage.len(); + self.storage.push_str(other.storage.as_str()); + self.offsets.extend( + other + .offsets + .iter() + .map(|x| K::from_usize(x.as_() + offset).expect("failed to fit into offset type")), + ) + } + /// Get the value at a given index pub fn get(&self, index: usize) -> Option<&str> { let start_offset = self.offsets.get(index)?.as_(); diff --git a/mutable_batch/src/lib.rs b/mutable_batch/src/lib.rs index 2fc8fe9d8c..6208fee15d 100644 --- a/mutable_batch/src/lib.rs +++ b/mutable_batch/src/lib.rs @@ -156,6 +156,14 @@ impl MutableBatch { self.row_count } + /// Extend this [`MutableBatch`] with the contents of `other` + pub fn extend_from(&mut self, other: &Self) -> writer::Result<()> { + let mut writer = writer::Writer::new(self, other.row_count); + writer.write_batch(other)?; + writer.commit(); + Ok(()) + } + /// Returns a reference to the specified column pub(crate) fn column(&self, column: &str) -> Result<&Column> { let idx = self diff --git a/mutable_batch/src/writer.rs b/mutable_batch/src/writer.rs index 76803b12fd..a747741a4c 100644 --- a/mutable_batch/src/writer.rs +++ b/mutable_batch/src/writer.rs @@ -433,6 +433,61 @@ impl<'a> Writer<'a> { Ok(()) } + /// Write the provided MutableBatch + pub(crate) fn write_batch(&mut self, src: &MutableBatch) -> Result<()> { + assert_eq!(src.row_count, self.to_insert); + + for (src_col_name, src_col_idx) in &src.column_names { + let src_col = &src.columns[*src_col_idx]; + let (dst_col_idx, dst_col) = self.column_mut(src_col_name, src_col.influx_type)?; + + let stats = match (&mut dst_col.data, &src_col.data) { + (ColumnData::F64(dst_data, _), ColumnData::F64(src_data, stats)) => { + dst_data.extend_from_slice(src_data); + Statistics::F64(stats.clone()) + } + (ColumnData::I64(dst_data, _), ColumnData::I64(src_data, stats)) => { + dst_data.extend_from_slice(src_data); + Statistics::I64(stats.clone()) + } + (ColumnData::U64(dst_data, _), ColumnData::U64(src_data, stats)) => { + dst_data.extend_from_slice(src_data); + Statistics::U64(stats.clone()) + } + (ColumnData::Bool(dst_data, _), ColumnData::Bool(src_data, stats)) => { + dst_data.extend_from(src_data); + Statistics::Bool(stats.clone()) + } + (ColumnData::String(dst_data, _), ColumnData::String(src_data, stats)) => { + dst_data.extend_from(src_data); + Statistics::String(stats.clone()) + } + ( + ColumnData::Tag(dst_data, dst_dict, _), + ColumnData::Tag(src_data, src_dict, stats), + ) => { + let mapping: Vec<_> = src_dict + .values() + .iter() + .map(|value| dst_dict.lookup_value_or_insert(value)) + .collect(); + + dst_data.extend(src_data.iter().map(|src_id| match *src_id { + INVALID_DID => INVALID_DID, + _ => mapping[*src_id as usize], + })); + + Statistics::String(stats.clone()) + } + _ => unreachable!("src: {}, dst: {}", src_col.data, dst_col.data), + }; + + dst_col.valid.extend_from(&src_col.valid); + self.statistics.push((dst_col_idx, stats)); + } + Ok(()) + } + fn column_mut( &mut self, name: &str, diff --git a/mutable_batch/tests/extend.rs b/mutable_batch/tests/extend.rs new file mode 100644 index 0000000000..09129b9d5e --- /dev/null +++ b/mutable_batch/tests/extend.rs @@ -0,0 +1,166 @@ +use arrow_util::assert_batches_eq; +use data_types::partition_metadata::{StatValues, Statistics}; +use mutable_batch::writer::Writer; +use mutable_batch::MutableBatch; +use schema::selection::Selection; +use std::collections::BTreeMap; +use std::num::NonZeroU64; + +#[test] +fn test_extend() { + let mut a = MutableBatch::new(); + let mut writer = Writer::new(&mut a, 5); + + writer + .write_tag( + "tag1", + Some(&[0b00010101]), + vec!["v1", "v1", "v2"].into_iter(), + ) + .unwrap(); + + writer + .write_tag( + "tag2", + Some(&[0b00001101]), + vec!["v2", "v1", "v1"].into_iter(), + ) + .unwrap(); + + writer + .write_time("time", vec![0, 1, 2, 3, 4].into_iter()) + .unwrap(); + + writer.commit(); + + let mut b = MutableBatch::new(); + let mut writer = Writer::new(&mut b, 8); + + writer + .write_tag( + "tag1", + Some(&[0b10010011]), + vec!["v1", "v1", "v3", "v1"].into_iter(), + ) + .unwrap(); + + writer + .write_tag( + "tag3", + None, + vec!["v2", "v1", "v3", "v1", "v3", "v5", "v5", "v5"].into_iter(), + ) + .unwrap(); + + writer + .write_time("time", vec![5, 6, 7, 8, 9, 10, 11, 12].into_iter()) + .unwrap(); + + writer.commit(); + + let a_before = a.to_arrow(Selection::All).unwrap(); + + a.extend_from(&b).unwrap(); + + assert_batches_eq!( + &[ + "+------+------+--------------------------------+", + "| tag1 | tag2 | time |", + "+------+------+--------------------------------+", + "| v1 | v2 | 1970-01-01T00:00:00Z |", + "| | | 1970-01-01T00:00:00.000000001Z |", + "| v1 | v1 | 1970-01-01T00:00:00.000000002Z |", + "| | v1 | 1970-01-01T00:00:00.000000003Z |", + "| v2 | | 1970-01-01T00:00:00.000000004Z |", + "+------+------+--------------------------------+", + ], + &[a_before] + ); + + assert_batches_eq!( + &[ + "+------+------+--------------------------------+", + "| tag1 | tag3 | time |", + "+------+------+--------------------------------+", + "| v1 | v2 | 1970-01-01T00:00:00.000000005Z |", + "| v1 | v1 | 1970-01-01T00:00:00.000000006Z |", + "| | v3 | 1970-01-01T00:00:00.000000007Z |", + "| | v1 | 1970-01-01T00:00:00.000000008Z |", + "| v3 | v3 | 1970-01-01T00:00:00.000000009Z |", + "| | v5 | 1970-01-01T00:00:00.000000010Z |", + "| | v5 | 1970-01-01T00:00:00.000000011Z |", + "| v1 | v5 | 1970-01-01T00:00:00.000000012Z |", + "+------+------+--------------------------------+", + ], + &[b.to_arrow(Selection::All).unwrap()] + ); + + assert_batches_eq!( + &[ + "+------+------+------+--------------------------------+", + "| tag1 | tag2 | tag3 | time |", + "+------+------+------+--------------------------------+", + "| v1 | v2 | | 1970-01-01T00:00:00Z |", + "| | | | 1970-01-01T00:00:00.000000001Z |", + "| v1 | v1 | | 1970-01-01T00:00:00.000000002Z |", + "| | v1 | | 1970-01-01T00:00:00.000000003Z |", + "| v2 | | | 1970-01-01T00:00:00.000000004Z |", + "| v1 | | v2 | 1970-01-01T00:00:00.000000005Z |", + "| v1 | | v1 | 1970-01-01T00:00:00.000000006Z |", + "| | | v3 | 1970-01-01T00:00:00.000000007Z |", + "| | | v1 | 1970-01-01T00:00:00.000000008Z |", + "| v3 | | v3 | 1970-01-01T00:00:00.000000009Z |", + "| | | v5 | 1970-01-01T00:00:00.000000010Z |", + "| | | v5 | 1970-01-01T00:00:00.000000011Z |", + "| v1 | | v5 | 1970-01-01T00:00:00.000000012Z |", + "+------+------+------+--------------------------------+", + ], + &[a.to_arrow(Selection::All).unwrap()] + ); + + let stats: BTreeMap<_, _> = a.columns().map(|(k, v)| (k.as_str(), v.stats())).collect(); + + assert_eq!( + stats["tag1"], + Statistics::String(StatValues { + min: Some("v1".to_string()), + max: Some("v3".to_string()), + total_count: 13, + null_count: 6, + distinct_count: Some(NonZeroU64::new(4).unwrap()) + }) + ); + + assert_eq!( + stats["tag2"], + Statistics::String(StatValues { + min: Some("v1".to_string()), + max: Some("v2".to_string()), + total_count: 13, + null_count: 10, + distinct_count: Some(NonZeroU64::new(3).unwrap()) + }) + ); + + assert_eq!( + stats["tag3"], + Statistics::String(StatValues { + min: Some("v1".to_string()), + max: Some("v5".to_string()), + total_count: 13, + null_count: 5, + distinct_count: Some(NonZeroU64::new(5).unwrap()) + }) + ); + + assert_eq!( + stats["time"], + Statistics::I64(StatValues { + min: Some(0), + max: Some(12), + total_count: 13, + null_count: 0, + distinct_count: None + }) + ) +}