feat: MutableBatch write API (#2090) (#2724) (#2882)

* feat: MutableBatch write API (#2090) (#2724)

* chore: fix lint

* fix: handle dictionaries with unused mappings

* chore: review feedback

* chore: further review feedback

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-10-20 09:44:14 +01:00 committed by GitHub
parent b55ca06fe3
commit ce0127a1f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1013 additions and 49 deletions

1
Cargo.lock generated
View File

@ -2261,6 +2261,7 @@ dependencies = [
"data_types",
"entry",
"hashbrown",
"itertools",
"schema",
"snafu",
]

View File

@ -440,14 +440,19 @@ where
Self::new_with_distinct(min, max, total_count, null_count, distinct_count)
}
/// Create new statitics with the specified count and null count
/// Create new statistics with no values
pub fn new_empty() -> Self {
Self::new_with_distinct(None, None, 0, 0, None)
}
/// Create new statistics with the specified count and null count
pub fn new(min: Option<T>, max: Option<T>, total_count: u64, null_count: u64) -> Self {
let distinct_count = None;
Self::new_with_distinct(min, max, total_count, null_count, distinct_count)
}
/// Create new statitics with the specified count and null count and distinct values
fn new_with_distinct(
/// Create new statistics with the specified count and null count and distinct values
pub fn new_with_distinct(
min: Option<T>,
max: Option<T>,
total_count: u64,

View File

@ -12,6 +12,7 @@ entry = { path = "../entry" }
schema = { path = "../schema" }
snafu = "0.6"
hashbrown = "0.11"
itertools = "0.10"
[dev-dependencies]
arrow_util = { path = "../arrow_util" }

View File

@ -1,5 +1,6 @@
//! A [`Column`] stores the rows for a given column name
use std::fmt::Formatter;
use std::iter::Enumerate;
use std::mem;
use std::sync::Arc;
@ -28,10 +29,10 @@ use schema::{IOxValueType, InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE};
///
/// An i32 is used to match the default for Arrow dictionaries
#[allow(clippy::upper_case_acronyms)]
type DID = i32;
pub(crate) type DID = i32;
/// An invalid DID used for NULL rows
const INVALID_DID: DID = -1;
pub(crate) const INVALID_DID: DID = -1;
/// The type of the dictionary used
type Dictionary = arrow_util::dictionary::StringDictionary<DID>;
@ -66,13 +67,13 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// statistics
#[derive(Debug)]
pub struct Column {
influx_type: InfluxColumnType,
valid: BitSet,
data: ColumnData,
pub(crate) influx_type: InfluxColumnType,
pub(crate) valid: BitSet,
pub(crate) data: ColumnData,
}
#[derive(Debug)]
enum ColumnData {
pub(crate) enum ColumnData {
F64(Vec<f64>, StatValues<f64>),
I64(Vec<i64>, StatValues<i64>),
U64(Vec<u64>, StatValues<u64>),
@ -81,6 +82,24 @@ enum ColumnData {
Tag(Vec<DID>, Dictionary, StatValues<String>),
}
impl std::fmt::Display for ColumnData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ColumnData::F64(col_data, _) => write!(f, "F64({})", col_data.len()),
ColumnData::I64(col_data, _) => write!(f, "I64({})", col_data.len()),
ColumnData::U64(col_data, _) => write!(f, "U64({})", col_data.len()),
ColumnData::String(col_data, _) => write!(f, "String({})", col_data.len()),
ColumnData::Bool(col_data, _) => write!(f, "Bool({})", col_data.len()),
ColumnData::Tag(col_data, dictionary, _) => write!(
f,
"Tag(keys:{},values:{})",
col_data.len(),
dictionary.values().len()
),
}
}
}
impl Column {
pub(crate) fn new(row_count: usize, column_type: InfluxColumnType) -> Self {
let mut valid = BitSet::new();

View File

@ -9,11 +9,12 @@
clippy::clone_on_ref_ptr
)]
//! A mutable data structure for a collection of writes
//! A mutable data structure for a collection of writes.
//!
//! Can be viewed as a mutable version of [`RecordBatch`] that remains the exclusive
//! owner of its buffers, permitting mutability. The in-memory layout is similar, however,
//! permitting fast conversion to [`RecordBatch`]
//!
//! Currently supports:
//! - `[TableBatch`] writes
//! - [`RecordBatch`] conversion
use crate::column::Column;
use arrow::record_batch::RecordBatch;
@ -24,6 +25,7 @@ use schema::{builder::SchemaBuilder, Schema};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
pub mod column;
pub mod writer;
#[allow(missing_docs)]
#[derive(Debug, Snafu)]
@ -61,15 +63,23 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// can be appended to and converted into an Arrow `RecordBatch`
#[derive(Debug, Default)]
pub struct MutableBatch {
/// Map of column id from the chunk dictionary to the column
columns: HashMap<String, Column>,
/// Map of column name to index in `MutableBatch::columns`
column_names: HashMap<String, usize>,
/// Columns contained within this MutableBatch
columns: Vec<Column>,
/// The number of rows in this MutableBatch
row_count: usize,
}
impl MutableBatch {
/// Create a new empty batch
pub fn new() -> Self {
Self {
column_names: Default::default(),
columns: Default::default(),
row_count: 0,
}
}
@ -93,7 +103,8 @@ impl MutableBatch {
let mut schema_builder = SchemaBuilder::new();
let schema = match selection {
Selection::All => {
for (column_name, column) in self.columns.iter() {
for (column_name, column_idx) in self.column_names.iter() {
let column = &self.columns[*column_idx];
schema_builder.influx_column(column_name, column.influx_type());
}
@ -121,8 +132,7 @@ impl MutableBatch {
.iter()
.map(|(_, field)| {
let column = self
.columns
.get(field.name())
.column(field.name())
.expect("schema contains non-existent column");
column.to_arrow().context(ColumnError {
@ -136,21 +146,24 @@ impl MutableBatch {
/// Returns an iterator over the columns in this batch in no particular order
pub fn columns(&self) -> impl Iterator<Item = (&String, &Column)> + '_ {
self.columns.iter()
self.column_names
.iter()
.map(move |(name, idx)| (name, &self.columns[*idx]))
}
/// Return the number of rows in this chunk
pub fn rows(&self) -> usize {
self.columns
.values()
.next()
.map(|col| col.len())
.unwrap_or(0)
self.row_count
}
/// Returns a reference to the specified column
pub(crate) fn column(&self, column: &str) -> Result<&Column> {
self.columns.get(column).context(ColumnNotFound { column })
let idx = self
.column_names
.get(column)
.context(ColumnNotFound { column })?;
Ok(&self.columns[*idx])
}
/// Validates the schema of the passed in columns, then adds their values to
@ -189,10 +202,12 @@ impl MutableBatch {
}
);
if let Some(c) = self.columns.get(column.name()) {
c.validate_schema(column).context(ColumnError {
column: column.name(),
})?;
if let Some(c_idx) = self.column_names.get(column.name()) {
self.columns[*c_idx]
.validate_schema(column)
.context(ColumnError {
column: column.name(),
})?;
}
Ok(())
@ -200,19 +215,24 @@ impl MutableBatch {
for fb_column in columns {
let influx_type = fb_column.influx_type();
let columns_len = self.columns.len();
let column = self
.columns
let column_idx = *self
.column_names
.raw_entry_mut()
.from_key(fb_column.name())
.or_insert_with(|| {
(
fb_column.name().to_string(),
Column::new(row_count_before_insert, influx_type),
)
})
.or_insert_with(|| (fb_column.name().to_string(), columns_len))
.1;
if columns_len == column_idx {
self.columns
.push(Column::new(row_count_before_insert, influx_type))
}
let column = &mut self.columns[column_idx];
assert_eq!(column.len(), row_count_before_insert);
column.append(&fb_column, mask).context(ColumnError {
column: fb_column.name(),
})?;
@ -221,9 +241,10 @@ impl MutableBatch {
}
// Pad any columns that did not have values in this batch with NULLs
for c in self.columns.values_mut() {
for c in &mut self.columns {
c.push_nulls_to_len(final_row_count);
}
self.row_count = final_row_count;
Ok(())
}

581
mutable_batch/src/writer.rs Normal file
View File

@ -0,0 +1,581 @@
//! A panic-safe write abstraction for [`MutableBatch`]
use crate::column::{Column, ColumnData, INVALID_DID};
use crate::MutableBatch;
use arrow_util::bitset::iter_set_positions;
use data_types::partition_metadata::{StatValues, Statistics};
use schema::{InfluxColumnType, InfluxFieldType};
use snafu::Snafu;
use std::num::NonZeroU64;
#[allow(missing_docs, missing_copy_implementations)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to insert {} type into a column of {}", inserted, existing))]
TypeMismatch {
existing: InfluxColumnType,
inserted: InfluxColumnType,
},
#[snafu(display("Incorrect number of values provided"))]
InsufficientValues,
#[snafu(display("Key not found in dictionary: {}", key))]
KeyNotFound { key: usize },
}
/// A specialized `Error` for [`Writer`] errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// [`Writer`] provides a panic-safe abstraction to append a number of rows to a [`MutableBatch`]
///
/// If a [`Writer`] is dropped without calling [`Writer::commit`], the [`MutableBatch`] will be
/// truncated to the original number of rows, and the statistics not updated
#[derive(Debug)]
pub struct Writer<'a> {
/// The mutable batch that is being mutated
batch: &'a mut MutableBatch,
/// A list of column index paired with Statistics
///
/// Statistics updates are deferred to commit time
statistics: Vec<(usize, Statistics)>,
/// The initial number of rows in the MutableBatch
initial_rows: usize,
/// The number of rows to insert
to_insert: usize,
/// If this Writer committed successfully
success: bool,
}
impl<'a> Writer<'a> {
/// Create a [`Writer`] for inserting `to_insert` rows to the provided `batch`
///
/// If the writer is dropped without calling commit all changes will be rolled back
pub fn new(batch: &'a mut MutableBatch, to_insert: usize) -> Self {
let initial_rows = batch.rows();
Self {
batch,
statistics: vec![],
initial_rows,
to_insert,
success: false,
}
}
/// Write the f64 typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_f64<I>(
&mut self,
name: &str,
valid_mask: Option<&[u8]>,
mut values: I,
) -> Result<()>
where
I: Iterator<Item = f64>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) =
self.column_mut(name, InfluxColumnType::Field(InfluxFieldType::Float))?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::F64(col_data, _) => {
col_data.resize(initial_rows + to_insert, 0_f64);
for idx in set_position_iterator(valid_mask, to_insert) {
let value = values.next().ok_or(Error::InsufficientValues)?;
col_data[initial_rows + idx] = value;
stats.update(&value);
}
}
x => unreachable!("expected f64 got {} for column \"{}\"", x, name),
}
append_valid_mask(col, valid_mask, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::F64(stats)));
Ok(())
}
/// Write the i64 typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_i64<I>(
&mut self,
name: &str,
valid_mask: Option<&[u8]>,
mut values: I,
) -> Result<()>
where
I: Iterator<Item = i64>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) =
self.column_mut(name, InfluxColumnType::Field(InfluxFieldType::Integer))?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::I64(col_data, _) => {
col_data.resize(initial_rows + to_insert, 0_i64);
for idx in set_position_iterator(valid_mask, to_insert) {
let value = values.next().ok_or(Error::InsufficientValues)?;
col_data[initial_rows + idx] = value;
stats.update(&value);
}
}
x => unreachable!("expected i64 got {} for column \"{}\"", x, name),
}
append_valid_mask(col, valid_mask, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::I64(stats)));
Ok(())
}
/// Write the u64 typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_u64<I>(
&mut self,
name: &str,
valid_mask: Option<&[u8]>,
mut values: I,
) -> Result<()>
where
I: Iterator<Item = u64>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) =
self.column_mut(name, InfluxColumnType::Field(InfluxFieldType::UInteger))?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::U64(col_data, _) => {
col_data.resize(initial_rows + to_insert, 0_u64);
for idx in set_position_iterator(valid_mask, to_insert) {
let value = values.next().ok_or(Error::InsufficientValues)?;
col_data[initial_rows + idx] = value;
stats.update(&value);
}
}
x => unreachable!("expected u64 got {} for column \"{}\"", x, name),
}
append_valid_mask(col, valid_mask, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::U64(stats)));
Ok(())
}
/// Write the boolean typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_bool<I>(
&mut self,
name: &str,
valid_mask: Option<&[u8]>,
mut values: I,
) -> Result<()>
where
I: Iterator<Item = bool>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) =
self.column_mut(name, InfluxColumnType::Field(InfluxFieldType::Boolean))?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::Bool(col_data, _) => {
col_data.append_unset(to_insert);
for idx in set_position_iterator(valid_mask, to_insert) {
let value = values.next().ok_or(Error::InsufficientValues)?;
if value {
col_data.set(initial_rows + idx);
}
stats.update(&value);
}
}
x => unreachable!("expected bool got {} for column \"{}\"", x, name),
}
append_valid_mask(col, valid_mask, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::Bool(stats)));
Ok(())
}
/// Write the string field typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_string<'s, I>(
&mut self,
name: &str,
valid_mask: Option<&[u8]>,
mut values: I,
) -> Result<()>
where
I: Iterator<Item = &'s str>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) =
self.column_mut(name, InfluxColumnType::Field(InfluxFieldType::String))?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::String(col_data, _) => {
for idx in set_position_iterator(valid_mask, to_insert) {
let value = values.next().ok_or(Error::InsufficientValues)?;
col_data.extend(initial_rows + idx - col_data.len());
col_data.append(value);
stats.update(value);
}
}
x => unreachable!("expected tag got {} for column \"{}\"", x, name),
}
append_valid_mask(col, valid_mask, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::String(stats)));
Ok(())
}
/// Write the tag typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_tag<'s, I>(
&mut self,
name: &str,
valid_mask: Option<&[u8]>,
mut values: I,
) -> Result<()>
where
I: Iterator<Item = &'s str>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) = self.column_mut(name, InfluxColumnType::Tag)?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::Tag(col_data, dict, _) => {
col_data.resize(initial_rows + to_insert, INVALID_DID);
for idx in set_position_iterator(valid_mask, to_insert) {
let value = values.next().ok_or(Error::InsufficientValues)?;
col_data[initial_rows + idx] = dict.lookup_value_or_insert(value);
stats.update(value);
}
}
x => unreachable!("expected tag got {} for column \"{}\"", x, name),
}
append_valid_mask(col, valid_mask, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::String(stats)));
Ok(())
}
/// Write the tag typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_tag_dict<'s, K, V>(
&mut self,
name: &str,
valid_mask: Option<&[u8]>,
mut keys: K,
values: V,
) -> Result<()>
where
K: Iterator<Item = usize>,
V: Iterator<Item = &'s str>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) = self.column_mut(name, InfluxColumnType::Tag)?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::Tag(col_data, dict, _) => {
// Lazily compute mappings to handle dictionaries with unused mappings
let mut mapping: Vec<_> = values.map(|value| (value, None)).collect();
col_data.resize(initial_rows + to_insert, INVALID_DID);
for idx in set_position_iterator(valid_mask, to_insert) {
let key = keys.next().ok_or(Error::InsufficientValues)?;
let (value, maybe_did) =
mapping.get_mut(key).ok_or(Error::KeyNotFound { key })?;
match maybe_did {
Some(did) => col_data[initial_rows + idx] = *did,
None => {
let did = dict.lookup_value_or_insert(value);
*maybe_did = Some(did);
col_data[initial_rows + idx] = did
}
}
stats.update(*value);
}
}
x => unreachable!("expected tag got {} for column \"{}\"", x, name),
}
append_valid_mask(col, valid_mask, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::String(stats)));
Ok(())
}
/// Write the time typed column identified by `name`
///
/// For each set bit in `valid_mask` an a value from `values` is inserted at the
/// corresponding index in the column. Nulls are inserted for the other rows
///
/// # Panic
///
/// - panics if this column has already been written to by this `Writer`
///
pub fn write_time<I>(&mut self, name: &str, mut values: I) -> Result<()>
where
I: Iterator<Item = i64>,
{
let initial_rows = self.initial_rows;
let to_insert = self.to_insert;
let (col_idx, col) = self.column_mut(name, InfluxColumnType::Timestamp)?;
let mut stats = StatValues::new_empty();
match &mut col.data {
ColumnData::I64(col_data, _) => {
col_data.resize(initial_rows + to_insert, 0_i64);
for idx in 0..to_insert {
let value = values.next().ok_or(Error::InsufficientValues)?;
col_data[initial_rows + idx] = value;
stats.update(&value)
}
}
x => unreachable!("expected i64 got {} for column \"{}\"", x, name),
}
append_valid_mask(col, None, to_insert);
stats.update_for_nulls(to_insert as u64 - stats.total_count);
self.statistics.push((col_idx, Statistics::I64(stats)));
Ok(())
}
fn column_mut(
&mut self,
name: &str,
influx_type: InfluxColumnType,
) -> Result<(usize, &mut Column)> {
let columns_len = self.batch.columns.len();
let column_idx = *self
.batch
.column_names
.raw_entry_mut()
.from_key(name)
.or_insert_with(|| (name.to_string(), columns_len))
.1;
if columns_len == column_idx {
self.batch
.columns
.push(Column::new(self.initial_rows, influx_type))
}
let col = &mut self.batch.columns[column_idx];
if col.influx_type != influx_type {
return Err(Error::TypeMismatch {
existing: col.influx_type,
inserted: influx_type,
});
}
assert_eq!(
col.valid.len(),
self.initial_rows,
"expected {} rows in column \"{}\" got {} when performing write of {} rows",
self.initial_rows,
name,
col.valid.len(),
self.to_insert
);
Ok((column_idx, col))
}
/// Commits the writes performed on this [`Writer`]. This will update the statistics
/// and pad any unwritten columns with nulls
pub fn commit(mut self) {
let initial_rows = self.initial_rows;
let final_rows = initial_rows + self.to_insert;
self.statistics
.sort_unstable_by_key(|(col_idx, _)| *col_idx);
let mut statistics = self.statistics.iter();
for (col_idx, col) in self.batch.columns.iter_mut().enumerate() {
// All columns should either have received a write and have statistics or not
if col.valid.len() == initial_rows {
col.push_nulls_to_len(final_rows);
} else {
assert_eq!(
col.valid.len(),
final_rows,
"expected {} rows in column index {} got {} when performing write of {} rows",
final_rows,
col_idx,
col.valid.len(),
self.to_insert
);
let (stats_col_idx, stats) = statistics.next().unwrap();
assert_eq!(*stats_col_idx, col_idx);
match (&mut col.data, stats) {
(ColumnData::F64(_, stats), Statistics::F64(new)) => {
stats.update_from(new);
}
(ColumnData::I64(_, stats), Statistics::I64(new)) => {
stats.update_from(new);
}
(ColumnData::U64(_, stats), Statistics::U64(new)) => {
stats.update_from(new);
}
(ColumnData::String(_, stats), Statistics::String(new)) => {
stats.update_from(new);
}
(ColumnData::Bool(_, stats), Statistics::Bool(new)) => {
stats.update_from(new);
}
(ColumnData::Tag(_, dict, stats), Statistics::String(new)) => {
stats.update_from(new);
stats.distinct_count = match stats.null_count {
0 => NonZeroU64::new(dict.values().len() as u64),
_ => NonZeroU64::new(dict.values().len() as u64 + 1),
}
}
_ => unreachable!("column: {}, statistics: {}", col.data, stats.type_name()),
}
}
}
self.batch.row_count = final_rows;
self.success = true;
}
}
fn set_position_iterator(
valid_mask: Option<&[u8]>,
to_insert: usize,
) -> impl Iterator<Item = usize> + '_ {
match valid_mask {
Some(mask) => itertools::Either::Left(
iter_set_positions(mask).take_while(move |idx| *idx < to_insert),
),
None => itertools::Either::Right(0..to_insert),
}
}
fn append_valid_mask(column: &mut Column, valid_mask: Option<&[u8]>, to_insert: usize) {
match valid_mask {
Some(mask) => column.valid.append_bits(to_insert, mask),
None => column.valid.append_set(to_insert),
}
}
impl<'a> Drop for Writer<'a> {
fn drop(&mut self) {
if !self.success {
let initial_rows = self.initial_rows;
for col in &mut self.batch.columns {
col.valid.truncate(initial_rows);
match &mut col.data {
ColumnData::F64(col_data, _) => col_data.truncate(initial_rows),
ColumnData::I64(col_data, _) => col_data.truncate(initial_rows),
ColumnData::U64(col_data, _) => col_data.truncate(initial_rows),
ColumnData::String(col_data, _) => col_data.truncate(initial_rows),
ColumnData::Bool(col_data, _) => col_data.truncate(initial_rows),
ColumnData::Tag(col_data, dict, _) => {
col_data.truncate(initial_rows);
match col_data.iter().max() {
Some(max) => dict.truncate(*max),
None => dict.clear(),
}
}
}
}
}
}
}

View File

@ -0,0 +1,336 @@
use arrow_util::assert_batches_eq;
use data_types::partition_metadata::{StatValues, Statistics};
use mutable_batch::writer::Writer;
use mutable_batch::MutableBatch;
use schema::selection::Selection;
use std::num::NonZeroU64;
fn get_stats(batch: &MutableBatch) -> Vec<(&str, Statistics)> {
let mut stats: Vec<_> = batch
.columns()
.map(|(name, col)| (name.as_str(), col.stats()))
.collect();
stats.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));
stats
}
#[test]
fn test_basic() {
let mut batch = MutableBatch::new();
let mut writer = Writer::new(&mut batch, 5);
writer
.write_bool(
"b1",
None,
vec![true, true, false, false, false].into_iter(),
)
.unwrap();
writer
.write_bool(
"b2",
Some(&[0b00011101]),
vec![true, false, false, true].into_iter(),
)
.unwrap();
writer
.write_f64(
"f64",
Some(&[0b00011011]),
vec![343.3, 443., 477., -24.].into_iter(),
)
.unwrap();
writer
.write_i64("i64", None, vec![234, 6, 2, 6, -3].into_iter())
.unwrap();
writer
.write_i64("i64_2", Some(&[0b00000001]), vec![-8].into_iter())
.unwrap();
writer
.write_u64("u64", Some(&[0b00001001]), vec![23, 5].into_iter())
.unwrap();
writer
.write_time("time", vec![7, 5, 7, 3, 5].into_iter())
.unwrap();
writer
.write_tag("tag1", None, vec!["v1", "v1", "v2", "v2", "v1"].into_iter())
.unwrap();
writer
.write_tag(
"tag2",
Some(&[0b00001011]),
vec!["v1", "v2", "v2"].into_iter(),
)
.unwrap();
writer
.write_tag_dict(
"tag3",
Some(&[0b00011011]),
vec![1, 0, 0, 1].into_iter(),
vec!["v1", "v2"].into_iter(),
)
.unwrap();
writer.commit();
let stats: Vec<_> = get_stats(&batch);
let expected_data = &[
"+-------+-------+-------+-----+-------+------+------+------+--------------------------------+-----+",
"| b1 | b2 | f64 | i64 | i64_2 | tag1 | tag2 | tag3 | time | u64 |",
"+-------+-------+-------+-----+-------+------+------+------+--------------------------------+-----+",
"| true | true | 343.3 | 234 | -8 | v1 | v1 | v2 | 1970-01-01T00:00:00.000000007Z | 23 |",
"| true | | 443 | 6 | | v1 | v2 | v1 | 1970-01-01T00:00:00.000000005Z | |",
"| false | false | | 2 | | v2 | | | 1970-01-01T00:00:00.000000007Z | |",
"| false | false | 477 | 6 | | v2 | v2 | v1 | 1970-01-01T00:00:00.000000003Z | 5 |",
"| false | true | -24 | -3 | | v1 | | v2 | 1970-01-01T00:00:00.000000005Z | |",
"+-------+-------+-------+-----+-------+------+------+------+--------------------------------+-----+",
];
let expected_stats = vec![
(
"b1",
Statistics::Bool(StatValues::new(Some(false), Some(true), 5, 0)),
),
(
"b2",
Statistics::Bool(StatValues::new(Some(false), Some(true), 5, 1)),
),
(
"f64",
Statistics::F64(StatValues::new(Some(-24.), Some(477.), 5, 1)),
),
(
"i64",
Statistics::I64(StatValues::new(Some(-3), Some(234), 5, 0)),
),
(
"i64_2",
Statistics::I64(StatValues::new(Some(-8), Some(-8), 5, 4)),
),
(
"tag1",
Statistics::String(StatValues::new_with_distinct(
Some("v1".to_string()),
Some("v2".to_string()),
5,
0,
Some(NonZeroU64::new(2).unwrap()),
)),
),
(
"tag2",
Statistics::String(StatValues::new_with_distinct(
Some("v1".to_string()),
Some("v2".to_string()),
5,
2,
Some(NonZeroU64::new(3).unwrap()),
)),
),
(
"tag3",
Statistics::String(StatValues::new_with_distinct(
Some("v1".to_string()),
Some("v2".to_string()),
5,
1,
Some(NonZeroU64::new(3).unwrap()),
)),
),
(
"time",
Statistics::I64(StatValues::new(Some(3), Some(7), 5, 0)),
),
(
"u64",
Statistics::U64(StatValues::new(Some(5), Some(23), 5, 3)),
),
];
assert_batches_eq!(expected_data, &[batch.to_arrow(Selection::All).unwrap()]);
assert_eq!(stats, expected_stats);
let mut writer = Writer::new(&mut batch, 4);
writer
.write_time("time", vec![4, 6, 21, 7].into_iter())
.unwrap();
writer
.write_tag("tag1", None, vec!["v6", "v7", "v8", "v4"].into_iter())
.unwrap();
std::mem::drop(writer);
let stats: Vec<_> = get_stats(&batch);
// Writer dropped, should not impact stats or data
assert_batches_eq!(expected_data, &[batch.to_arrow(Selection::All).unwrap()]);
assert_eq!(stats, expected_stats);
let err = Writer::new(&mut batch, 1)
.write_tag("b1", None, vec!["err"].into_iter())
.unwrap_err()
.to_string();
assert_eq!(err.as_str(), "Unable to insert iox::column_type::tag type into a column of iox::column_type::field::boolean");
let err = Writer::new(&mut batch, 1)
.write_i64("f64", None, vec![3].into_iter())
.unwrap_err()
.to_string();
assert_eq!(err.as_str(), "Unable to insert iox::column_type::field::integer type into a column of iox::column_type::field::float");
let err = Writer::new(&mut batch, 1)
.write_string("tag3", None, vec!["sd"].into_iter())
.unwrap_err()
.to_string();
assert_eq!(err.as_str(), "Unable to insert iox::column_type::field::string type into a column of iox::column_type::tag");
let err = Writer::new(&mut batch, 1)
.write_tag_dict("tag3", None, vec![1].into_iter(), vec!["v1"].into_iter())
.unwrap_err()
.to_string();
assert_eq!(err.as_str(), "Key not found in dictionary: 1");
let stats: Vec<_> = get_stats(&batch);
// Writer not committed, should not impact stats or data
assert_batches_eq!(expected_data, &[batch.to_arrow(Selection::All).unwrap()]);
assert_eq!(stats, expected_stats);
let mut writer = Writer::new(&mut batch, 17);
writer.write_time("time", (0..17).into_iter()).unwrap();
writer
.write_f64(
"f64",
Some(&[0b01000010, 0b00100100, 0b00000001]),
vec![4., 945., -222., 4., 7.].into_iter(),
)
.unwrap();
writer
.write_tag("tag3", None, std::iter::repeat("v2"))
.unwrap();
writer
.write_tag_dict(
"tag2",
Some(&[0b11011111, 0b11011101, 0b00000000]),
vec![0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1].into_iter(),
vec!["v4", "v1", "v7"].into_iter(), // Intentional extra key
)
.unwrap();
writer.commit();
let stats: Vec<_> = get_stats(&batch);
let expected_data = &[
"+-------+-------+-------+-----+-------+------+------+------+--------------------------------+-----+",
"| b1 | b2 | f64 | i64 | i64_2 | tag1 | tag2 | tag3 | time | u64 |",
"+-------+-------+-------+-----+-------+------+------+------+--------------------------------+-----+",
"| true | true | 343.3 | 234 | -8 | v1 | v1 | v2 | 1970-01-01T00:00:00.000000007Z | 23 |",
"| true | | 443 | 6 | | v1 | v2 | v1 | 1970-01-01T00:00:00.000000005Z | |",
"| false | false | | 2 | | v2 | | | 1970-01-01T00:00:00.000000007Z | |",
"| false | false | 477 | 6 | | v2 | v2 | v1 | 1970-01-01T00:00:00.000000003Z | 5 |",
"| false | true | -24 | -3 | | v1 | | v2 | 1970-01-01T00:00:00.000000005Z | |",
"| | | | | | | v4 | v2 | 1970-01-01T00:00:00Z | |",
"| | | 4 | | | | v1 | v2 | 1970-01-01T00:00:00.000000001Z | |",
"| | | | | | | v1 | v2 | 1970-01-01T00:00:00.000000002Z | |",
"| | | | | | | v4 | v2 | 1970-01-01T00:00:00.000000003Z | |",
"| | | | | | | v1 | v2 | 1970-01-01T00:00:00.000000004Z | |",
"| | | | | | | | v2 | 1970-01-01T00:00:00.000000005Z | |",
"| | | 945 | | | | v1 | v2 | 1970-01-01T00:00:00.000000006Z | |",
"| | | | | | | v1 | v2 | 1970-01-01T00:00:00.000000007Z | |",
"| | | | | | | v4 | v2 | 1970-01-01T00:00:00.000000008Z | |",
"| | | | | | | | v2 | 1970-01-01T00:00:00.000000009Z | |",
"| | | -222 | | | | v4 | v2 | 1970-01-01T00:00:00.000000010Z | |",
"| | | | | | | v4 | v2 | 1970-01-01T00:00:00.000000011Z | |",
"| | | | | | | v4 | v2 | 1970-01-01T00:00:00.000000012Z | |",
"| | | 4 | | | | | v2 | 1970-01-01T00:00:00.000000013Z | |",
"| | | | | | | v1 | v2 | 1970-01-01T00:00:00.000000014Z | |",
"| | | | | | | v1 | v2 | 1970-01-01T00:00:00.000000015Z | |",
"| | | 7 | | | | | v2 | 1970-01-01T00:00:00.000000016Z | |",
"+-------+-------+-------+-----+-------+------+------+------+--------------------------------+-----+",
];
let expected_stats = vec![
(
"b1",
Statistics::Bool(StatValues::new(Some(false), Some(true), 22, 17)),
),
(
"b2",
Statistics::Bool(StatValues::new(Some(false), Some(true), 22, 18)),
),
(
"f64",
Statistics::F64(StatValues::new(Some(-222.), Some(945.), 22, 13)),
),
(
"i64",
Statistics::I64(StatValues::new(Some(-3), Some(234), 22, 17)),
),
(
"i64_2",
Statistics::I64(StatValues::new(Some(-8), Some(-8), 22, 21)),
),
(
"tag1",
Statistics::String(StatValues::new_with_distinct(
Some("v1".to_string()),
Some("v2".to_string()),
22,
17,
Some(NonZeroU64::new(3).unwrap()),
)),
),
(
"tag2",
Statistics::String(StatValues::new_with_distinct(
Some("v1".to_string()),
Some("v4".to_string()),
22,
6,
Some(NonZeroU64::new(4).unwrap()),
)),
),
(
"tag3",
Statistics::String(StatValues::new_with_distinct(
Some("v1".to_string()),
Some("v2".to_string()),
22,
1,
Some(NonZeroU64::new(3).unwrap()),
)),
),
(
"time",
Statistics::I64(StatValues::new(Some(0), Some(16), 22, 0)),
),
(
"u64",
Statistics::U64(StatValues::new(Some(5), Some(23), 22, 20)),
),
];
assert_batches_eq!(expected_data, &[batch.to_arrow(Selection::All).unwrap()]);
assert_eq!(stats, expected_stats);
}

View File

@ -183,8 +183,8 @@ async fn sql_select_from_system_chunks() {
"+---------------+------------+-------------------+--------------+-----------+",
"| partition_key | table_name | storage | memory_bytes | row_count |",
"+---------------+------------+-------------------+--------------+-----------+",
"| 1970-01-01T00 | h2o | OpenMutableBuffer | 1639 | 3 |",
"| 1970-01-01T00 | o2 | OpenMutableBuffer | 1635 | 2 |",
"| 1970-01-01T00 | h2o | OpenMutableBuffer | 1671 | 3 |",
"| 1970-01-01T00 | o2 | OpenMutableBuffer | 1667 | 2 |",
"+---------------+------------+-------------------+--------------+-----------+",
];
run_sql_test_case(

View File

@ -1738,7 +1738,7 @@ mod tests {
assert_storage_gauge(registry, "catalog_loaded_rows", "object_store", 0);
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 700);
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 732);
// write into same chunk again.
time.inc(Duration::from_secs(1));
@ -1754,7 +1754,7 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=5 50").await;
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 764);
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 796);
// Still only one chunk open
assert_storage_gauge(registry, "catalog_loaded_chunks", "mutable_buffer", 1);
@ -2605,7 +2605,7 @@ mod tests {
id: ChunkId::new_test(0),
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action: None,
memory_bytes: 1006, // memory_size
memory_bytes: 1038, // memory_size
object_store_bytes: 0, // os_size
row_count: 1,
time_of_last_access: None,
@ -2864,7 +2864,7 @@ mod tests {
id: chunk_summaries[2].id,
storage: ChunkStorage::OpenMutableBuffer,
lifecycle_action,
memory_bytes: 1303,
memory_bytes: 1335,
object_store_bytes: 0, // no OS chunks
row_count: 1,
time_of_last_access: None,
@ -2885,7 +2885,7 @@ mod tests {
);
}
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1303);
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 1335);
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2550);
assert_eq!(db.catalog.metrics().memory().object_store(), 1529);
}

View File

@ -527,7 +527,7 @@ async fn test_chunk_get() {
id: ChunkId::new_test(0).into(),
storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action,
memory_bytes: 1016,
memory_bytes: 1048,
object_store_bytes: 0,
row_count: 2,
time_of_last_access: None,
@ -541,7 +541,7 @@ async fn test_chunk_get() {
id: ChunkId::new_test(0).into(),
storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action,
memory_bytes: 1018,
memory_bytes: 1050,
object_store_bytes: 0,
row_count: 1,
time_of_last_access: None,
@ -712,7 +712,7 @@ async fn test_list_partition_chunks() {
id: ChunkId::new_test(0).into(),
storage: ChunkStorage::OpenMutableBuffer.into(),
lifecycle_action: ChunkLifecycleAction::Unspecified.into(),
memory_bytes: 1016,
memory_bytes: 1048,
object_store_bytes: 0,
row_count: 2,
time_of_last_access: None,

View File

@ -482,7 +482,7 @@ async fn test_get_chunks() {
.and(predicate::str::contains(
r#""storage": "CHUNK_STORAGE_OPEN_MUTABLE_BUFFER","#,
))
.and(predicate::str::contains(r#""memoryBytes": "1016""#))
.and(predicate::str::contains(r#""memoryBytes": "1048""#))
// Check for a non empty timestamp such as
// "time_of_first_write": "2021-03-30T17:11:10.723866Z",
.and(predicate::str::contains(r#""timeOfFirstWrite": "20"#));