Merge branch 'main' into debuginfo2

pull/24376/head
Marko Mikulicic 2021-06-03 15:59:19 +02:00 committed by GitHub
commit 3e2b4bf7ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 527 additions and 123 deletions

View File

@ -979,44 +979,44 @@ mod test {
String::from_utf8(reg.registry().metrics_as_text()).unwrap(),
vec![
"# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer",
"# TYPE read_buffer_column_bytes gauge",
r#"read_buffer_column_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 108"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXED",log_data_type="f64"} 144"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool"} 1152"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="f64"} 1032"#,
r#"read_buffer_column_bytes{db="mydb",encoding="RLE",log_data_type="string"} 750"#,
"# HELP read_buffer_column_raw_bytes The number of bytes used by all columns if they were uncompressed in the Read Buffer",
"# TYPE read_buffer_column_raw_bytes gauge",
r#"read_buffer_column_raw_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="false"} 144"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXED",log_data_type="f64",null="false"} 144"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXED",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool",null="false"} 81"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="f64",null="false"} 120"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="f64",null="true"} 24"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="RLE",log_data_type="string",null="false"} 324"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
"# HELP read_buffer_column_total The number of columns within the Read Buffer",
"# TYPE read_buffer_column_total gauge",
r#"read_buffer_column_total{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXED",log_data_type="f64"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXEDN",log_data_type="bool"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXEDN",log_data_type="f64"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="RLE",log_data_type="string"} 3"#,
"# HELP read_buffer_column_values The number of values within columns in the Read Buffer",
"# TYPE read_buffer_column_values gauge",
r#"read_buffer_column_values{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXED",log_data_type="f64",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXED",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="f64",null="false"} 6"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="f64",null="true"} 3"#,
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
"",
"# TYPE read_buffer_column_bytes gauge",
r#"read_buffer_column_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 108"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 1032"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXED",log_data_type="f64"} 144"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool"} 1152"#,
r#"read_buffer_column_bytes{db="mydb",encoding="RLE",log_data_type="string"} 750"#,
"# HELP read_buffer_column_raw_bytes The number of bytes used by all columns if they were uncompressed in the Read Buffer",
"# TYPE read_buffer_column_raw_bytes gauge",
r#"read_buffer_column_raw_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="false"} 144"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="false"} 120"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="true"} 24"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXED",log_data_type="f64",null="false"} 144"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXED",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool",null="false"} 81"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="RLE",log_data_type="string",null="false"} 324"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
"# HELP read_buffer_column_total The number of columns within the Read Buffer",
"# TYPE read_buffer_column_total gauge",
r#"read_buffer_column_total{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXED",log_data_type="f64"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXEDN",log_data_type="bool"} 3"#,
r#"read_buffer_column_total{db="mydb",encoding="RLE",log_data_type="string"} 3"#,
"# HELP read_buffer_column_values The number of values within columns in the Read Buffer",
"# TYPE read_buffer_column_values gauge",
r#"read_buffer_column_values{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="false"} 6"#,
r#"read_buffer_column_values{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="true"} 3"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXED",log_data_type="f64",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXED",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="false"} 9"#,
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
"",
]
.join("\n")
);
@ -1029,39 +1029,39 @@ mod test {
"# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer",
"# TYPE read_buffer_column_bytes gauge",
r#"read_buffer_column_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 0"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 0"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXED",log_data_type="f64"} 0"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool"} 0"#,
r#"read_buffer_column_bytes{db="mydb",encoding="FIXEDN",log_data_type="f64"} 0"#,
r#"read_buffer_column_bytes{db="mydb",encoding="RLE",log_data_type="string"} 0"#,
"# HELP read_buffer_column_raw_bytes The number of bytes used by all columns if they were uncompressed in the Read Buffer",
"# TYPE read_buffer_column_raw_bytes gauge",
r#"read_buffer_column_raw_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="false"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="false"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXED",log_data_type="f64",null="false"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXED",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool",null="false"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="f64",null="false"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="FIXEDN",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="RLE",log_data_type="string",null="false"} 0"#,
r#"read_buffer_column_raw_bytes{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
"# HELP read_buffer_column_total The number of columns within the Read Buffer",
"# TYPE read_buffer_column_total gauge",
r#"read_buffer_column_total{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64"} 0"#,
r#"read_buffer_column_total{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64"} 0"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXED",log_data_type="f64"} 0"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXEDN",log_data_type="bool"} 0"#,
r#"read_buffer_column_total{db="mydb",encoding="FIXEDN",log_data_type="f64"} 0"#,
r#"read_buffer_column_total{db="mydb",encoding="RLE",log_data_type="string"} 0"#,
"# HELP read_buffer_column_values The number of values within columns in the Read Buffer",
"# TYPE read_buffer_column_values gauge",
r#"read_buffer_column_values{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="false"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="BT_U32-FIXED",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="false"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FBT_U8-FIXEDN",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXED",log_data_type="f64",null="false"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXED",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="false"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="f64",null="false"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="false"} 0"#,
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
"",

View File

@ -74,12 +74,46 @@ impl Display for ByteTrimmer {
}
}
/// An encoding that forcefully converts logical `f64` values into other integer
/// types. It is the caller's responsibility to ensure that conversion can take
/// place without loss of precision.
#[derive(Debug)]
pub struct FloatByteTrimmer {}
macro_rules! make_float_trimmer {
($type:ty) => {
#[allow(clippy::float_cmp)]
impl Transcoder<$type, f64> for FloatByteTrimmer {
fn encode(&self, v: f64) -> $type {
// shouldn't be too expensive as only called during column
// creation and when passing in single literals for
// predicate evaluation.
assert!(v == (v as $type) as f64);
v as $type
}
fn decode(&self, v: $type) -> f64 {
v.into()
}
}
};
}
make_float_trimmer!(u8);
make_float_trimmer!(i8);
make_float_trimmer!(u16);
make_float_trimmer!(i16);
make_float_trimmer!(u32);
make_float_trimmer!(i32);
impl Display for FloatByteTrimmer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "FBT")
}
}
//
// TODO(edd): soon to be adding the following
//
// * FloatByteTrimmer: a transcoder that will coerce `f64` values into signed
// and unsigned integers.
//
// * FrameOfReferenceTranscoder: a transcoder that will apply a transformation
// to logical values and then optionally apply a byte trimming to the
// result.

View File

@ -1,10 +1,17 @@
use arrow::array::Array;
use arrow::array::Float64Array;
use arrow::array::PrimitiveArray;
use arrow::datatypes::Float64Type;
use arrow::datatypes::Int16Type;
use arrow::datatypes::Int32Type;
use arrow::datatypes::Int8Type;
use arrow::datatypes::UInt16Type;
use arrow::datatypes::UInt32Type;
use arrow::datatypes::UInt8Type;
use std::iter::FromIterator;
use std::mem::size_of;
use super::encoding::scalar::rle;
use super::encoding::scalar::transcoders::NoOpTranscoder;
use super::encoding::scalar::ScalarEncoding;
use super::encoding::scalar::{rle, transcoders::*, ScalarEncoding};
use super::encoding::{
scalar::Fixed,
scalar::{rle::RLE, FixedNull},
@ -19,14 +26,14 @@ use crate::column::{RowIDs, Scalar, Value, Values};
/// Note: an enum to make supporting a logical `f32` type in the future a bit
/// simpler.
pub enum FloatEncoding {
F64(Box<dyn ScalarEncoding<f64>>),
F64(Box<dyn ScalarEncoding<f64>>, String),
}
impl FloatEncoding {
/// The total size in bytes of to store columnar data in memory.
pub fn size(&self) -> usize {
match self {
Self::F64(enc) => enc.size(),
Self::F64(enc, _) => enc.size(),
}
}
@ -35,14 +42,14 @@ impl FloatEncoding {
/// will effectively size each NULL value as 8b if `true`.
pub fn size_raw(&self, include_nulls: bool) -> usize {
match self {
Self::F64(enc) => enc.size_raw(include_nulls),
Self::F64(enc, _) => enc.size_raw(include_nulls),
}
}
/// The total number of rows in the column.
pub fn num_rows(&self) -> u32 {
match self {
Self::F64(enc) => enc.num_rows(),
Self::F64(enc, _) => enc.num_rows(),
}
}
@ -67,14 +74,14 @@ impl FloatEncoding {
/// The total number of NULL values in the column.
pub fn null_count(&self) -> u32 {
match self {
Self::F64(enc) => enc.null_count(),
Self::F64(enc, _) => enc.null_count(),
}
}
/// Determines if the column contains a non-null value.
pub fn has_any_non_null_value(&self) -> bool {
match self {
Self::F64(enc) => enc.has_any_non_null_value(),
Self::F64(enc, _) => enc.has_any_non_null_value(),
}
}
@ -82,14 +89,14 @@ impl FloatEncoding {
/// provided ordinal offsets.
pub fn has_non_null_value(&self, row_ids: &[u32]) -> bool {
match self {
Self::F64(enc) => enc.has_non_null_value(row_ids),
Self::F64(enc, _) => enc.has_non_null_value(row_ids),
}
}
/// Returns the logical value found at the provided ordinal offset.
pub fn value(&self, row_id: u32) -> Value<'_> {
match self {
Self::F64(enc) => match enc.value(row_id) {
Self::F64(enc, _) => match enc.value(row_id) {
Some(v) => Value::Scalar(Scalar::F64(v)),
None => Value::Null,
},
@ -99,7 +106,7 @@ impl FloatEncoding {
/// Returns the logical values found at the provided ordinal offsets.
pub fn values(&self, row_ids: &[u32]) -> Values<'_> {
match self {
Self::F64(enc) => match enc.values(row_ids) {
Self::F64(enc, _) => match enc.values(row_ids) {
either::Either::Left(values) => Values::F64(values),
either::Either::Right(values) => Values::F64N(values),
},
@ -109,7 +116,7 @@ impl FloatEncoding {
/// Returns all logical values in the column.
pub fn all_values(&self) -> Values<'_> {
match self {
Self::F64(enc) => match enc.all_values() {
Self::F64(enc, _) => match enc.all_values() {
either::Either::Left(values) => Values::F64(values),
either::Either::Right(values) => Values::F64N(values),
},
@ -123,7 +130,7 @@ impl FloatEncoding {
/// `row_ids_filter` will panic if this invariant is broken.
pub fn row_ids_filter(&self, op: &cmp::Operator, value: &Scalar, dst: RowIDs) -> RowIDs {
match self {
Self::F64(enc) => enc.row_ids_filter(value.as_f64(), op, dst),
Self::F64(enc, _) => enc.row_ids_filter(value.as_f64(), op, dst),
}
}
@ -139,7 +146,7 @@ impl FloatEncoding {
dst: RowIDs,
) -> RowIDs {
match self {
Self::F64(enc) => {
Self::F64(enc, _) => {
let left = (low.1.as_f64(), low.0);
let right = (high.1.as_f64(), high.0);
enc.row_ids_filter_range(left, right, dst)
@ -149,7 +156,7 @@ impl FloatEncoding {
pub fn min(&self, row_ids: &[u32]) -> Value<'_> {
match self {
Self::F64(enc) => match enc.min(row_ids) {
Self::F64(enc, _) => match enc.min(row_ids) {
Some(min) => Value::Scalar(Scalar::F64(min)),
None => Value::Null,
},
@ -158,7 +165,7 @@ impl FloatEncoding {
pub fn max(&self, row_ids: &[u32]) -> Value<'_> {
match self {
Self::F64(enc) => match enc.max(row_ids) {
Self::F64(enc, _) => match enc.max(row_ids) {
Some(max) => Value::Scalar(Scalar::F64(max)),
None => Value::Null,
},
@ -167,7 +174,7 @@ impl FloatEncoding {
pub fn sum(&self, row_ids: &[u32]) -> Scalar {
match self {
Self::F64(enc) => match enc.sum(row_ids) {
Self::F64(enc, _) => match enc.sum(row_ids) {
Some(sum) => Scalar::F64(sum),
None => Scalar::Null,
},
@ -176,21 +183,21 @@ impl FloatEncoding {
pub fn count(&self, row_ids: &[u32]) -> u32 {
match self {
Self::F64(enc) => enc.count(row_ids),
Self::F64(enc, _) => enc.count(row_ids),
}
}
/// The name of this encoding.
pub fn name(&self) -> &'static str {
pub fn name(&self) -> String {
match self {
Self::F64(enc) => enc.name(),
Self::F64(_, name) => name.clone(),
}
}
/// The logical datatype of this encoding.
pub fn logical_datatype(&self) -> &'static str {
match self {
Self::F64(_) => "f64",
Self::F64(_, _) => "f64",
}
}
}
@ -198,7 +205,7 @@ impl FloatEncoding {
impl std::fmt::Display for FloatEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::F64(enc) => write!(f, "[Float]: {}", enc),
Self::F64(enc, _) => write!(f, "[Float]: {}", enc),
}
}
}
@ -208,58 +215,249 @@ impl std::fmt::Display for FloatEncoding {
/// be reduced by 10%
pub const MIN_RLE_SIZE_REDUCTION: f64 = 0.1; // 10%
#[allow(clippy::float_cmp)]
fn is_natural_number(v: f64) -> bool {
// if `v` can be round tripped to an `i64` and back to an `f64` without loss
// of precision then we can _potentially_ safely store it in 32-bits or
// less.
v == (v as i64) as f64
}
/// Converts a slice of `f64` values into a `FloatEncoding`.
///
/// There are two possible encodings for &[f64]:
/// * "None": effectively store the slice in a vector;
/// * "RLE": for slices that have a sufficiently low cardinality they may
/// benefit from being run-length encoded.
/// There are four possible encodings for &[f64]:
///
/// * "FIXED": Effectively store the slice in a vector as f64.
/// * "BT_X-FIXED": Store floats as integers and trim them to a smaller
/// physical size (X).
/// * "RLE": If the data has sufficiently low cardinality they may
/// benefit from being run-length encoded.
/// * "BT_X-RLE": Convert to byte trimmed integers and then also RLE.
///
/// The encoding is chosen based on the heuristics in the `From` implementation
impl From<&[f64]> for FloatEncoding {
fn from(arr: &[f64]) -> Self {
// The number of rows we would reduce the column by if we encoded it
// as RLE.
let base_size = arr.len() * size_of::<f64>();
let rle_size = rle::estimate_rle_size(arr.iter().map(Some)); // size of a run length
if (base_size as f64 - rle_size as f64) / base_size as f64 >= MIN_RLE_SIZE_REDUCTION {
// Are:
// * all the values natural numbers?
// * all the values able to be represented in 32-bits or less?
//
// Yes to the above means we can convert the data to integers and then
// trim them, potentially applying RLE afterwards.
let mut min = arr[0];
let mut max = arr[0];
let all_z = arr.iter().all(|&v| {
min = min.min(v);
max = max.max(v);
is_natural_number(v)
});
// check they are all natural numbers that can be stored in 32 bits or
// less.
if all_z
&& ((min >= 0.0 && max <= u32::MAX as f64)
|| (min >= i32::MIN as f64 && max <= i32::MAX as f64))
{
return from_slice_with_byte_trimming(arr, (min, max));
}
// Store as f64, potentially with RLE.
// Check if we gain space savings by encoding as RLE.
if should_rle_from_iter(arr.len(), arr.iter().map(Some)) {
let enc = Box::new(RLE::new_from_iter(
arr.iter().cloned(),
NoOpTranscoder {}, // No transcoding of values (store as physical type f64)
));
return Self::F64(enc);
let name = enc.name();
return Self::F64(enc, name.to_string());
}
// Don't apply a compression encoding to the column
let enc = Box::new(Fixed::<f64, f64, _>::new(arr.to_vec(), NoOpTranscoder {}));
Self::F64(enc)
let name = enc.name();
Self::F64(enc, name.to_owned())
}
}
/// Converts an Arrow Float array into a `FloatEncoding`.
// Applies a heuristic to decide whether the input data should be encoded using
// run-length encoding.
fn should_rle_from_iter<T: PartialOrd>(len: usize, iter: impl Iterator<Item = Option<T>>) -> bool {
let base_size = len * size_of::<T>();
(base_size as f64 - rle::estimate_rle_size(iter) as f64) / base_size as f64
>= MIN_RLE_SIZE_REDUCTION
}
// Convert float data to byte-trimmed integers and potentially RLE. It is the
// caller's responsibility to ensure all data are natural numbers that can be
// round tripped from float to int and back without loss of precision.
fn from_slice_with_byte_trimming(arr: &[f64], range: (f64, f64)) -> FloatEncoding {
// determine min and max values.
let min = range.0;
let max = range.1;
// If true then use RLE after byte trimming.
let rle = should_rle_from_iter(arr.len(), arr.iter().map(Some));
// This match is carefully ordered. It prioritises smaller physical
// datatypes that can correctly represent the provided logical data.
let transcoder = FloatByteTrimmer {};
let transcoder_name = format!("{}", &transcoder);
let (enc, name) = match (min, max) {
// encode as u8 values
(min, max) if min >= 0.0 && max <= u8::MAX as f64 => {
let arr = arr
.iter()
.map::<u8, _>(|v| transcoder.encode(*v))
.collect::<Vec<_>>();
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter(arr.into_iter(), transcoder))
} else {
Box::new(Fixed::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_U8-{}", transcoder_name, name))
}
// encode as i8 values
(min, max) if min >= i8::MIN as f64 && max <= i8::MAX as f64 => {
let arr = arr
.iter()
.map(|v| transcoder.encode(*v))
.collect::<Vec<i8>>();
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter(arr.into_iter(), transcoder))
} else {
Box::new(Fixed::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_I8-{}", transcoder_name, name))
}
// encode as u16 values
(min, max) if min >= 0.0 && max <= u16::MAX as f64 => {
let arr = arr
.iter()
.map::<u16, _>(|v| transcoder.encode(*v))
.collect::<Vec<u16>>();
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter(arr.into_iter(), transcoder))
} else {
Box::new(Fixed::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_U16-{}", transcoder_name, name))
}
// encode as i16 values
(min, max) if min >= i16::MIN as f64 && max <= i16::MAX as f64 => {
let arr = arr
.iter()
.map(|v| transcoder.encode(*v))
.collect::<Vec<i16>>();
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter(arr.into_iter(), transcoder))
} else {
Box::new(Fixed::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_I16-{}", transcoder_name, name))
}
// encode as u32 values
(min, max) if min >= 0.0 && max <= u32::MAX as f64 => {
let arr = arr
.iter()
.map(|v| transcoder.encode(*v))
.collect::<Vec<u32>>();
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter(arr.into_iter(), transcoder))
} else {
Box::new(Fixed::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_U32-{}", transcoder_name, name))
}
// encode as i32 values
(min, max) if min >= i32::MIN as f64 && max <= i32::MAX as f64 => {
let arr = arr
.iter()
.map(|v| transcoder.encode(*v))
.collect::<Vec<i32>>();
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter(arr.into_iter(), transcoder))
} else {
Box::new(Fixed::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_I32-{}", transcoder_name, name))
}
(_, _) => unreachable!(
"float column not byte trimmable: range [{:?}, {:?}]",
min, max
),
};
FloatEncoding::F64(enc, name)
}
/// Converts an Arrow `Float64Array` into a `FloatEncoding`.
///
/// There are two possible encodings for an Arrow array:
/// * "None": effectively keep the data in its Arrow array;
/// * "RLE": for arrays that have a sufficiently large number of NULL values
/// they may benefit from being run-length encoded.
/// There are four possible encodings for `Float64Array`:
///
/// * "FIXEDN": Effectively store in the `Float64Array`.
/// * "BT_X-FIXEDN": Store floats as integers and trim them to a smaller
/// physical size (X). Backed by Arrow array.
/// * "RLE": If the data has sufficiently low cardinality they may
/// benefit from being run-length encoded.
/// * "BT_X-RLE": Convert to byte trimmed integers and then also RLE.
///
/// The encoding is chosen based on the heuristics in the `From` implementation
impl From<arrow::array::Float64Array> for FloatEncoding {
fn from(arr: arrow::array::Float64Array) -> Self {
impl From<Float64Array> for FloatEncoding {
fn from(arr: Float64Array) -> Self {
if arr.null_count() == 0 {
return Self::from(arr.values());
}
// Are:
// * all the values natural numbers?
// * all the values able to be represented in 32-bits or less?
//
// Yes to the above means we can convert the data to integers and then
// trim them, potentially applying RLE afterwards.
let min = arrow::compute::kernels::aggregate::min(&arr);
let max = arrow::compute::kernels::aggregate::max(&arr);
let all_z = arr.iter().all(|v| match v {
Some(v) => is_natural_number(v),
None => true,
});
// Column is all NULL - encode as RLE u8
if min.is_none() {
let arr: PrimitiveArray<UInt8Type> =
PrimitiveArray::from_iter(arr.iter().map::<Option<u8>, _>(|_| None));
let enc: Box<dyn ScalarEncoding<f64>> =
Box::new(RLE::new_from_iter_opt(arr.iter(), FloatByteTrimmer {}));
let name = enc.name();
return Self::F64(enc, name.to_owned());
}
let min = min.unwrap();
let max = max.unwrap();
// check they are all natural numbers that can be stored in 32 bits or
// less.
if all_z
&& ((min >= 0.0 && max <= u32::MAX as f64)
|| (min >= i32::MIN as f64 && max <= i32::MAX as f64))
{
return from_array_with_byte_trimming(arr, (min, max));
}
// Store as f64, potentially with RLE.
// The number of rows we would reduce the column by if we encoded it
// as RLE.
let base_size = arr.len() * size_of::<f64>();
let rle_size = rle::estimate_rle_size(arr.iter()); // size of a run length
if (base_size as f64 - rle_size as f64) / base_size as f64 >= MIN_RLE_SIZE_REDUCTION {
if should_rle_from_iter(arr.len(), arr.iter()) {
let enc = Box::new(RLE::new_from_iter_opt(
arr.iter(),
NoOpTranscoder {}, // No transcoding of values (store as physical type f64)
));
return Self::F64(enc);
let name = enc.name();
return Self::F64(enc, name.to_owned());
}
// Just store as nullable vector.
@ -267,46 +465,218 @@ impl From<arrow::array::Float64Array> for FloatEncoding {
arr,
NoOpTranscoder {},
));
Self::F64(enc)
let name = enc.name();
Self::F64(enc, name.to_owned())
}
}
// Convert float data to byte-trimmed integers and potentially RLE. It is the
// caller's responsibility to ensure all data are natural numbers that can be
// round tripped from float to int and back without loss of precision.
fn from_array_with_byte_trimming(arr: Float64Array, range: (f64, f64)) -> FloatEncoding {
// determine min and max values.
let min = range.0;
let max = range.1;
// If true then use RLE after byte trimming.
let rle = should_rle_from_iter(arr.len(), arr.iter());
// This match is carefully ordered. It prioritises smaller physical
// datatypes that can correctly represent the provided logical data.
let transcoder = FloatByteTrimmer {};
let transcoder_name = format!("{}", &transcoder);
let (enc, name) = match (min, max) {
// encode as u8 values
(min, max) if min >= 0.0 && max <= u8::MAX as f64 => {
let arr: PrimitiveArray<UInt8Type> =
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
} else {
Box::new(FixedNull::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_U8-{}", transcoder_name, name))
}
// encode as i8 values
(min, max) if min >= i8::MIN as f64 && max <= i8::MAX as f64 => {
let arr: PrimitiveArray<Int8Type> =
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
} else {
Box::new(FixedNull::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_I8-{}", transcoder_name, name))
}
// encode as u16 values
(min, max) if min >= 0.0 && max <= u16::MAX as f64 => {
let arr: PrimitiveArray<UInt16Type> =
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
} else {
Box::new(FixedNull::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_U16-{}", transcoder_name, name))
}
// encode as i16 values
(min, max) if min >= i16::MIN as f64 && max <= i16::MAX as f64 => {
let arr: PrimitiveArray<Int16Type> =
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
} else {
Box::new(FixedNull::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_I16-{}", transcoder_name, name))
}
// encode as u32 values
(min, max) if min >= 0.0 && max <= u32::MAX as f64 => {
let arr: PrimitiveArray<UInt32Type> =
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
} else {
Box::new(FixedNull::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_U32-{}", transcoder_name, name))
}
// encode as i32 values
(min, max) if min >= i32::MIN as f64 && max <= i32::MAX as f64 => {
let arr: PrimitiveArray<Int32Type> =
PrimitiveArray::from_iter(arr.into_iter().map(|v| v.map(|v| transcoder.encode(v))));
let enc: Box<dyn ScalarEncoding<f64>> = if rle {
Box::new(RLE::new_from_iter_opt(arr.iter(), transcoder))
} else {
Box::new(FixedNull::new(arr, transcoder))
};
let name = enc.name();
(enc, format!("{}_I32-{}", transcoder_name, name))
}
(_, _) => unreachable!(
"float column not byte trimmable: range [{:?}, {:?}]",
min, max
),
};
FloatEncoding::F64(enc, name)
}
#[cfg(test)]
mod test {
use std::iter;
use arrow::array::{Float64Array, PrimitiveArray};
use super::*;
use crate::column::encoding::scalar::{fixed, fixed_null, rle};
use cmp::Operator;
#[test]
// Tests that input data with natural numbers gets byte trimmed correctly.
fn from_slice_f64() {
let cases = vec![
(vec![0.0, 2.0, 245.0, 3.0], "FBT_U8-FIXED"), // u8 fixed array
(vec![0.0, -120.0, 127.0, 3.0], "FBT_I8-FIXED"), // i8 fixed array
(vec![399.0, 2.0, 2452.0, 3.0], "FBT_U16-FIXED"), // u16 fixed array
(vec![-399.0, 2.0, 2452.0, 3.0], "FBT_I16-FIXED"), // i16 fixed array
(vec![u32::MAX as f64, 2.0, 245.0, 3.0], "FBT_U32-FIXED"), // u32 fixed array
(vec![u32::MAX as f64, 0.0], "FBT_U32-FIXED"), // u32 fixed array
(vec![i32::MIN as f64, i32::MAX as f64], "FBT_I32-FIXED"), // i32 fixed array
(vec![i32::MIN as f64, 2.0, 245.0, 3.0], "FBT_I32-FIXED"), // i32 fixed array
(vec![u32::MAX as f64 + 1.0, 2.0, 245.0, 3.0], "FIXED"), // f64 fixed array
(vec![u32::MAX as f64, -1.0], "FIXED"), // can't byte trim due to range
(vec![i32::MAX as f64 + 1.0, -1.0], "FIXED"), // can't byte trim due to range
(vec![i32::MIN as f64 - 1.0, -1.0], "FIXED"), // can't byte trim due to range
];
for (case, name) in cases.into_iter() {
let enc = FloatEncoding::from(case.as_slice());
assert_eq!(enc.name(), name, "failed: {:?}", enc);
}
// Verify RLE conversion
let input = vec![1_f64; 1000]
.into_iter()
.chain(vec![2_f64; 1000]) // 1,1,1,1,1,2,2,2,2,2,2....
.collect::<Vec<f64>>();
let enc = FloatEncoding::from(input.as_slice());
assert_eq!(enc.name(), "FBT_U8-RLE", "failed: {:?}", enc);
let input = vec![f64::MAX; 1000]
.into_iter()
.chain(vec![f64::MIN; 1000])
.collect::<Vec<f64>>();
let enc = FloatEncoding::from(input.as_slice());
assert_eq!(enc.name(), "RLE", "failed: {:?}", enc);
}
#[test]
fn from_arrow_array() {
// Rows not reduced
let input: Vec<Option<f64>> = vec![Some(33.2), Some(1.2), Some(2.2), None, Some(3.2)];
let arr = Float64Array::from(input);
// Verify that implementation falls back to "fixed" encodings when there
// are no NULL values present.
let arr = Float64Array::from(vec![0.0, 2.0, 245.0, 3.0]);
let enc = FloatEncoding::from(arr);
assert_eq!(enc.name(), fixed_null::ENCODING_NAME);
assert_eq!(enc.name(), "FBT_U8-FIXED", "failed: {:?}", enc);
// Rows not reduced and no nulls so can go in `Fixed64`.
let input: Vec<Option<f64>> = vec![Some(33.2), Some(1.2), Some(2.2), Some(3.2)];
let arr = Float64Array::from(input);
let enc = FloatEncoding::from(arr);
assert_eq!(enc.name(), fixed::ENCODING_NAME);
// Verify that byte trimming works on Arrow arrays.
let cases = vec![
(vec![Some(0.0), Some(2.0), None], "FBT_U8-FIXEDN"), // u8 fixed array
(vec![Some(0.0), Some(-120.0), None], "FBT_I8-FIXEDN"), // i8 fixed array
(vec![Some(399.0), None, Some(2452.0)], "FBT_U16-FIXEDN"), // u16 fixed array
(vec![Some(-399.0), Some(2452.0), None], "FBT_I16-FIXEDN"), // i16 fixed array
(vec![Some(u32::MAX as f64), None], "FBT_U32-FIXEDN"), // u32 fixed array
// i32 fixed array
(
vec![Some(i32::MIN as f64), None, Some(i32::MAX as f64)],
"FBT_I32-FIXEDN",
),
// i32 fixed array
(
vec![Some(i32::MIN as f64), Some(2.0), None],
"FBT_I32-FIXEDN",
),
(vec![Some(u32::MAX as f64 + 1.0), Some(2.0), None], "FIXEDN"), // f64 fixed array
// can't byte trim due to range
(
vec![Some(u32::MAX as f64 - 1.0), Some(-1.0), None],
"FIXEDN",
),
// can't byte trim due to range
(
vec![Some(i32::MAX as f64 + 1.0), None, Some(-1.0)],
"FIXEDN",
),
// can't byte trim due to range
(
vec![Some(i32::MIN as f64 - 1.0), None, Some(-1.0)],
"FIXEDN",
),
];
// Goldilocks - encode as RLE
let input: Vec<Option<f64>> = vec![Some(33.2); 10];
let arr = Float64Array::from(input);
let enc = FloatEncoding::from(arr);
assert_eq!(enc.name(), rle::ENCODING_NAME);
for (case, name) in cases.into_iter() {
let arr = Float64Array::from(case);
let enc = FloatEncoding::from(arr);
assert_eq!(enc.name(), name, "failed: {:?}", enc);
}
// Goldilocks - encode as RLE
let mut input: Vec<Option<f64>> = vec![Some(33.2); 10];
input.extend(iter::repeat(None).take(10));
// Verify RLE conversion
let input = vec![1_f64; 1000]
.into_iter()
.chain(vec![2_f64; 1000]) // 1,1,1,1,1,2,2,2,2,2,2....
.collect::<Vec<f64>>();
let arr = Float64Array::from(input);
let enc = FloatEncoding::from(arr);
assert_eq!(enc.name(), rle::ENCODING_NAME);
assert_eq!(enc.name(), "FBT_U8-RLE", "failed: {:?}", enc);
let input = vec![f64::MAX; 1000]
.into_iter()
.chain(vec![f64::MIN; 1000])
.collect::<Vec<f64>>();
let arr = Float64Array::from(input);
let enc = FloatEncoding::from(arr);
assert_eq!(enc.name(), "RLE", "failed: {:?}", enc);
}
#[test]

View File

@ -1473,7 +1473,7 @@ mod tests {
// verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap();
db.write_chunk_to_object_store("1970-01-01T00", "cpu", 0, &Default::default())
.await
@ -1493,7 +1493,7 @@ mod tests {
.unwrap();
let expected_parquet_size = 759;
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap();
// now also in OS
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
@ -1663,7 +1663,7 @@ mod tests {
.unwrap();
// verify chunk size updated (chunk moved from moved to writing to written)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap();
// drop, the chunk from the read buffer
db.drop_chunk(partition_key, "cpu", mb_chunk.id()).unwrap();
@ -1673,7 +1673,7 @@ mod tests {
);
// verify size is reported until chunk dropped
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1616).unwrap();
std::mem::drop(rb_chunk);
// verify chunk size updated (chunk dropped from moved state)
@ -1746,7 +1746,7 @@ mod tests {
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(3231.0)
.sample_sum_eq(3189.0)
.unwrap();
let rb = collect_read_filter(&rb_chunk).await;
@ -1848,7 +1848,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2389.0)
.sample_sum_eq(2375.0)
.unwrap();
// it should be the same chunk!
@ -1956,7 +1956,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(2389.0)
.sample_sum_eq(2375.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -2356,7 +2356,7 @@ mod tests {
Arc::from("cpu"),
0,
ChunkStorage::ReadBufferAndObjectStore,
2380, // size of RB and OS chunks
2373, // size of RB and OS chunks
1,
),
ChunkSummary::new_without_timestamps(
@ -2407,7 +2407,7 @@ mod tests {
.memory()
.read_buffer()
.get_total(),
1621
1614
);
assert_eq!(
db.catalog.state().metrics().memory().parquet().get_total(),