feat: add MutableBatch::extend_from (#2900)
* feat: add MutableBatch::extend_from * chore: fix lint * chore: improve panic message Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
9974a5364c
commit
0b1e4a03a5
|
@ -66,6 +66,11 @@ impl BitSet {
|
|||
self.len = len;
|
||||
}
|
||||
|
||||
/// Extends this [`BitSet`] by the context of `other`
|
||||
pub fn extend_from(&mut self, other: &BitSet) {
|
||||
self.append_bits(other.len, &other.buffer)
|
||||
}
|
||||
|
||||
/// Appends `count` boolean values from the slice of packed bits
|
||||
pub fn append_bits(&mut self, count: usize, to_set: &[u8]) {
|
||||
assert_eq!((count + 7) >> 3, to_set.len());
|
||||
|
|
|
@ -62,6 +62,18 @@ impl<K: AsPrimitive<usize> + FromPrimitive + Zero> PackedStringArray<K> {
|
|||
id
|
||||
}
|
||||
|
||||
/// Extends this [`PackedStringArray`] by the contents of `other`
|
||||
pub fn extend_from(&mut self, other: &PackedStringArray<K>) {
|
||||
let offset = self.storage.len();
|
||||
self.storage.push_str(other.storage.as_str());
|
||||
self.offsets.extend(
|
||||
other
|
||||
.offsets
|
||||
.iter()
|
||||
.map(|x| K::from_usize(x.as_() + offset).expect("failed to fit into offset type")),
|
||||
)
|
||||
}
|
||||
|
||||
/// Get the value at a given index
|
||||
pub fn get(&self, index: usize) -> Option<&str> {
|
||||
let start_offset = self.offsets.get(index)?.as_();
|
||||
|
|
|
@ -156,6 +156,14 @@ impl MutableBatch {
|
|||
self.row_count
|
||||
}
|
||||
|
||||
/// Extend this [`MutableBatch`] with the contents of `other`
|
||||
pub fn extend_from(&mut self, other: &Self) -> writer::Result<()> {
|
||||
let mut writer = writer::Writer::new(self, other.row_count);
|
||||
writer.write_batch(other)?;
|
||||
writer.commit();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns a reference to the specified column
|
||||
pub(crate) fn column(&self, column: &str) -> Result<&Column> {
|
||||
let idx = self
|
||||
|
|
|
@ -433,6 +433,61 @@ impl<'a> Writer<'a> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Write the provided MutableBatch
|
||||
pub(crate) fn write_batch(&mut self, src: &MutableBatch) -> Result<()> {
|
||||
assert_eq!(src.row_count, self.to_insert);
|
||||
|
||||
for (src_col_name, src_col_idx) in &src.column_names {
|
||||
let src_col = &src.columns[*src_col_idx];
|
||||
let (dst_col_idx, dst_col) = self.column_mut(src_col_name, src_col.influx_type)?;
|
||||
|
||||
let stats = match (&mut dst_col.data, &src_col.data) {
|
||||
(ColumnData::F64(dst_data, _), ColumnData::F64(src_data, stats)) => {
|
||||
dst_data.extend_from_slice(src_data);
|
||||
Statistics::F64(stats.clone())
|
||||
}
|
||||
(ColumnData::I64(dst_data, _), ColumnData::I64(src_data, stats)) => {
|
||||
dst_data.extend_from_slice(src_data);
|
||||
Statistics::I64(stats.clone())
|
||||
}
|
||||
(ColumnData::U64(dst_data, _), ColumnData::U64(src_data, stats)) => {
|
||||
dst_data.extend_from_slice(src_data);
|
||||
Statistics::U64(stats.clone())
|
||||
}
|
||||
(ColumnData::Bool(dst_data, _), ColumnData::Bool(src_data, stats)) => {
|
||||
dst_data.extend_from(src_data);
|
||||
Statistics::Bool(stats.clone())
|
||||
}
|
||||
(ColumnData::String(dst_data, _), ColumnData::String(src_data, stats)) => {
|
||||
dst_data.extend_from(src_data);
|
||||
Statistics::String(stats.clone())
|
||||
}
|
||||
(
|
||||
ColumnData::Tag(dst_data, dst_dict, _),
|
||||
ColumnData::Tag(src_data, src_dict, stats),
|
||||
) => {
|
||||
let mapping: Vec<_> = src_dict
|
||||
.values()
|
||||
.iter()
|
||||
.map(|value| dst_dict.lookup_value_or_insert(value))
|
||||
.collect();
|
||||
|
||||
dst_data.extend(src_data.iter().map(|src_id| match *src_id {
|
||||
INVALID_DID => INVALID_DID,
|
||||
_ => mapping[*src_id as usize],
|
||||
}));
|
||||
|
||||
Statistics::String(stats.clone())
|
||||
}
|
||||
_ => unreachable!("src: {}, dst: {}", src_col.data, dst_col.data),
|
||||
};
|
||||
|
||||
dst_col.valid.extend_from(&src_col.valid);
|
||||
self.statistics.push((dst_col_idx, stats));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn column_mut(
|
||||
&mut self,
|
||||
name: &str,
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
use arrow_util::assert_batches_eq;
|
||||
use data_types::partition_metadata::{StatValues, Statistics};
|
||||
use mutable_batch::writer::Writer;
|
||||
use mutable_batch::MutableBatch;
|
||||
use schema::selection::Selection;
|
||||
use std::collections::BTreeMap;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
#[test]
|
||||
fn test_extend() {
|
||||
let mut a = MutableBatch::new();
|
||||
let mut writer = Writer::new(&mut a, 5);
|
||||
|
||||
writer
|
||||
.write_tag(
|
||||
"tag1",
|
||||
Some(&[0b00010101]),
|
||||
vec!["v1", "v1", "v2"].into_iter(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
writer
|
||||
.write_tag(
|
||||
"tag2",
|
||||
Some(&[0b00001101]),
|
||||
vec!["v2", "v1", "v1"].into_iter(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
writer
|
||||
.write_time("time", vec![0, 1, 2, 3, 4].into_iter())
|
||||
.unwrap();
|
||||
|
||||
writer.commit();
|
||||
|
||||
let mut b = MutableBatch::new();
|
||||
let mut writer = Writer::new(&mut b, 8);
|
||||
|
||||
writer
|
||||
.write_tag(
|
||||
"tag1",
|
||||
Some(&[0b10010011]),
|
||||
vec!["v1", "v1", "v3", "v1"].into_iter(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
writer
|
||||
.write_tag(
|
||||
"tag3",
|
||||
None,
|
||||
vec!["v2", "v1", "v3", "v1", "v3", "v5", "v5", "v5"].into_iter(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
writer
|
||||
.write_time("time", vec![5, 6, 7, 8, 9, 10, 11, 12].into_iter())
|
||||
.unwrap();
|
||||
|
||||
writer.commit();
|
||||
|
||||
let a_before = a.to_arrow(Selection::All).unwrap();
|
||||
|
||||
a.extend_from(&b).unwrap();
|
||||
|
||||
assert_batches_eq!(
|
||||
&[
|
||||
"+------+------+--------------------------------+",
|
||||
"| tag1 | tag2 | time |",
|
||||
"+------+------+--------------------------------+",
|
||||
"| v1 | v2 | 1970-01-01T00:00:00Z |",
|
||||
"| | | 1970-01-01T00:00:00.000000001Z |",
|
||||
"| v1 | v1 | 1970-01-01T00:00:00.000000002Z |",
|
||||
"| | v1 | 1970-01-01T00:00:00.000000003Z |",
|
||||
"| v2 | | 1970-01-01T00:00:00.000000004Z |",
|
||||
"+------+------+--------------------------------+",
|
||||
],
|
||||
&[a_before]
|
||||
);
|
||||
|
||||
assert_batches_eq!(
|
||||
&[
|
||||
"+------+------+--------------------------------+",
|
||||
"| tag1 | tag3 | time |",
|
||||
"+------+------+--------------------------------+",
|
||||
"| v1 | v2 | 1970-01-01T00:00:00.000000005Z |",
|
||||
"| v1 | v1 | 1970-01-01T00:00:00.000000006Z |",
|
||||
"| | v3 | 1970-01-01T00:00:00.000000007Z |",
|
||||
"| | v1 | 1970-01-01T00:00:00.000000008Z |",
|
||||
"| v3 | v3 | 1970-01-01T00:00:00.000000009Z |",
|
||||
"| | v5 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| | v5 | 1970-01-01T00:00:00.000000011Z |",
|
||||
"| v1 | v5 | 1970-01-01T00:00:00.000000012Z |",
|
||||
"+------+------+--------------------------------+",
|
||||
],
|
||||
&[b.to_arrow(Selection::All).unwrap()]
|
||||
);
|
||||
|
||||
assert_batches_eq!(
|
||||
&[
|
||||
"+------+------+------+--------------------------------+",
|
||||
"| tag1 | tag2 | tag3 | time |",
|
||||
"+------+------+------+--------------------------------+",
|
||||
"| v1 | v2 | | 1970-01-01T00:00:00Z |",
|
||||
"| | | | 1970-01-01T00:00:00.000000001Z |",
|
||||
"| v1 | v1 | | 1970-01-01T00:00:00.000000002Z |",
|
||||
"| | v1 | | 1970-01-01T00:00:00.000000003Z |",
|
||||
"| v2 | | | 1970-01-01T00:00:00.000000004Z |",
|
||||
"| v1 | | v2 | 1970-01-01T00:00:00.000000005Z |",
|
||||
"| v1 | | v1 | 1970-01-01T00:00:00.000000006Z |",
|
||||
"| | | v3 | 1970-01-01T00:00:00.000000007Z |",
|
||||
"| | | v1 | 1970-01-01T00:00:00.000000008Z |",
|
||||
"| v3 | | v3 | 1970-01-01T00:00:00.000000009Z |",
|
||||
"| | | v5 | 1970-01-01T00:00:00.000000010Z |",
|
||||
"| | | v5 | 1970-01-01T00:00:00.000000011Z |",
|
||||
"| v1 | | v5 | 1970-01-01T00:00:00.000000012Z |",
|
||||
"+------+------+------+--------------------------------+",
|
||||
],
|
||||
&[a.to_arrow(Selection::All).unwrap()]
|
||||
);
|
||||
|
||||
let stats: BTreeMap<_, _> = a.columns().map(|(k, v)| (k.as_str(), v.stats())).collect();
|
||||
|
||||
assert_eq!(
|
||||
stats["tag1"],
|
||||
Statistics::String(StatValues {
|
||||
min: Some("v1".to_string()),
|
||||
max: Some("v3".to_string()),
|
||||
total_count: 13,
|
||||
null_count: 6,
|
||||
distinct_count: Some(NonZeroU64::new(4).unwrap())
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
stats["tag2"],
|
||||
Statistics::String(StatValues {
|
||||
min: Some("v1".to_string()),
|
||||
max: Some("v2".to_string()),
|
||||
total_count: 13,
|
||||
null_count: 10,
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap())
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
stats["tag3"],
|
||||
Statistics::String(StatValues {
|
||||
min: Some("v1".to_string()),
|
||||
max: Some("v5".to_string()),
|
||||
total_count: 13,
|
||||
null_count: 5,
|
||||
distinct_count: Some(NonZeroU64::new(5).unwrap())
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
stats["time"],
|
||||
Statistics::I64(StatValues {
|
||||
min: Some(0),
|
||||
max: Some(12),
|
||||
total_count: 13,
|
||||
null_count: 0,
|
||||
distinct_count: None
|
||||
})
|
||||
)
|
||||
}
|
Loading…
Reference in New Issue