Merge pull request #1468 from influxdata/er/feat/rb_metrics

feat: add in-depth column encoding metrics
pull/24376/head
kodiakhq[bot] 2021-05-11 12:45:08 +00:00 committed by GitHub
commit be74eab3fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 501 additions and 52 deletions

1
Cargo.lock generated
View File

@ -2963,6 +2963,7 @@ dependencies = [
"hashbrown 0.11.2",
"internal_types",
"itertools 0.9.0",
"metrics",
"observability_deps",
"packers",
"parking_lot",

View File

@ -19,6 +19,7 @@ either = "1.6.1"
hashbrown = "0.11"
internal_types = { path = "../internal_types" }
itertools = "0.9.0"
metrics = { path = "../metrics" }
observability_deps = { path = "../observability_deps" }
packers = { path = "../packers" }
parking_lot = "0.11"

View File

@ -14,7 +14,7 @@ const ONE_MS: i64 = 1_000_000;
fn table_names(c: &mut Criterion) {
let rb = generate_row_group(500_000);
let chunk = Chunk::new(0);
let chunk = Chunk::new(0, &metrics::MetricRegistry::new());
chunk.upsert_table("table_a", rb);
// no predicate - return all the tables

View File

@ -3,6 +3,7 @@ use std::{
convert::TryFrom,
};
use metrics::{KeyValue, MetricRegistry};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};
@ -12,11 +13,11 @@ use internal_types::{schema::builder::Error as SchemaError, schema::Schema, sele
use observability_deps::tracing::info;
use tracker::{MemRegistry, MemTracker};
use crate::row_group::RowGroup;
use crate::row_group::{ColumnName, Predicate};
use crate::schema::{AggregateType, ResultSchema};
use crate::table;
use crate::table::Table;
use crate::{column::Statistics, row_group::RowGroup};
type TableName = String;
@ -49,6 +50,9 @@ pub struct Chunk {
// The unique identifier for this chunk.
id: u32,
// All metrics for the chunk.
metrics: ColumnMetrics,
// A chunk's data is held in a collection of mutable tables and
// mutable meta data (`TableData`).
//
@ -115,22 +119,28 @@ impl TableData {
impl Chunk {
/// Initialises a new `Chunk` with the associated chunk ID.
pub fn new(id: u32) -> Self {
pub fn new(id: u32, metrics_registry: &MetricRegistry) -> Self {
Self {
id,
chunk_data: RwLock::new(TableData::default()),
metrics: ColumnMetrics::new(metrics_registry),
}
}
/// Initialises a new `Chunk` with the associated chunk ID. The returned
/// `Chunk` will be tracked according to the provided memory tracker
/// registry and internal metrics will be registered on the provided metrics
/// registry.
pub fn new_with_memory_tracker(id: u32, registry: &MemRegistry) -> Self {
let chunk = Self::new(id);
pub fn new_with_registries(
id: u32,
mem_registry: &MemRegistry,
metrics_registry: &MetricRegistry,
) -> Self {
let chunk = Self::new(id, metrics_registry);
{
let mut chunk_data = chunk.chunk_data.write();
chunk_data.tracker = registry.register();
chunk_data.tracker = mem_registry.register();
let size = Self::base_size() + chunk_data.size();
chunk_data.tracker.set_bytes(size);
}
@ -150,6 +160,7 @@ impl Chunk {
data: vec![(table.name().to_owned(), table)].into_iter().collect(),
tracker: MemRegistry::new().register(),
}),
metrics: ColumnMetrics::new(&metrics::MetricRegistry::new()),
}
}
@ -277,6 +288,9 @@ impl Chunk {
chunk_data.rows += row_group.rows() as u64;
chunk_data.row_groups += 1;
// track new row group statistics to update column-based metrics.
let storage_statistics = row_group.column_storage_statistics();
// create a new table if one doesn't exist, or add the table data to
// the existing table.
match chunk_data.data.entry(table_name.clone()) {
@ -293,6 +307,10 @@ impl Chunk {
// Get and set new size of chunk on memory tracker
let size = Self::base_size() + chunk_data.size();
chunk_data.tracker.set_bytes(size);
// update column metrics associated with column storage
std::mem::drop(chunk_data); // drop write lock
self.update_column_storage_statistics(&storage_statistics, false);
}
/// Removes the table specified by `name` along with all of its contained
@ -589,6 +607,51 @@ impl Chunk {
.fail(),
}
}
// Updates column storage statistics for the Read Buffer.
// `drop` indicates whether to decrease the metrics (because the chunk is
// being dropped), or to increase the metrics because it's being created.
fn update_column_storage_statistics(&self, statistics: &[Statistics], drop: bool) {
// whether to increase/decrease the metrics
let sign = if drop { -1.0 } else { 1.0 };
for stat in statistics {
let labels = &[
KeyValue::new("encoding", stat.enc_type),
KeyValue::new("log_data_type", stat.log_data_type),
];
// update number of columns
self.metrics
.columns_total
.add_with_labels(1.0 * sign, labels);
// update bytes associated with columns
self.metrics
.column_bytes_total
.add_with_labels(stat.bytes as f64 * sign, labels);
// update number of NULL values
self.metrics.column_values_total.add_with_labels(
stat.nulls as f64 * sign,
&[
KeyValue::new("encoding", stat.enc_type),
KeyValue::new("log_data_type", stat.log_data_type),
KeyValue::new("null", "true"),
],
);
// update number of non-NULL values
self.metrics.column_values_total.add_with_labels(
(stat.values - stat.nulls) as f64 * sign,
&[
KeyValue::new("encoding", stat.enc_type),
KeyValue::new("log_data_type", stat.log_data_type),
KeyValue::new("null", "false"),
],
);
}
}
}
impl std::fmt::Debug for Chunk {
@ -597,6 +660,58 @@ impl std::fmt::Debug for Chunk {
}
}
struct ColumnMetrics {
// This metric tracks the total number of columns in read buffer.
columns_total: metrics::Gauge,
// This metric tracks the total number of values stored in read buffer
// column encodings further segmented by nullness.
column_values_total: metrics::Gauge,
// This metric tracks the total number of bytes used by read buffer columns
column_bytes_total: metrics::Gauge,
}
impl ColumnMetrics {
pub fn new(registry: &MetricRegistry) -> Self {
let domain = registry.register_domain("read_buffer");
Self {
columns_total: domain.register_gauge_metric(
"column",
Some("total"),
"The number of columns within the Read Buffer",
),
column_values_total: domain.register_gauge_metric(
"column",
Some("values"),
"The number of values within columns in the Read Buffer",
),
column_bytes_total: domain.register_gauge_metric(
"column",
Some("bytes"),
"The number of bytes used by all columns in the Read Buffer",
),
}
}
}
// When a chunk is dropped from the Read Buffer we need to adjust the metrics
// associated with column storage.
impl Drop for Chunk {
fn drop(&mut self) {
let storage_statistics = {
let chunk_data = self.chunk_data.read();
chunk_data
.data
.values()
.map(|table| table.column_storage_statistics())
.flatten()
.collect::<Vec<_>>()
};
self.update_column_storage_statistics(&storage_statistics, true);
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
@ -775,7 +890,8 @@ mod test {
#[test]
fn add_remove_tables() {
let chunk = Chunk::new(22);
let reg = metrics::TestMetricRegistry::new(Arc::new(metrics::MetricRegistry::new()));
let chunk = Chunk::new(22, &reg.registry());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -831,11 +947,73 @@ mod test {
assert_eq!(table.rows(), 6);
assert_eq!(table.row_groups(), 2);
}
assert_eq!(
String::from_utf8(reg.registry().metrics_as_text()).unwrap(),
vec![
"# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer",
"# TYPE read_buffer_column_bytes gauge",
r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 108"#,
r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 1152"#,
r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 1176"#,
r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 1014"#,
r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#,
r#"# TYPE read_buffer_column_total gauge"#,
r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 3"#,
r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 3"#,
r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 6"#,
r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 3"#,
r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#,
r#"# TYPE read_buffer_column_values gauge"#,
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 9"#,
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 9"#,
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 15"#,
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 3"#,
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 9"#,
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#,
"",
]
.join("\n")
);
// when the chunk is dropped the metics are all correctly decreased
std::mem::drop(chunk);
assert_eq!(
String::from_utf8(reg.registry().metrics_as_text()).unwrap(),
vec![
"# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer",
"# TYPE read_buffer_column_bytes gauge",
r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 0"#,
r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 0"#,
r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 0"#,
r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 0"#,
r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#,
r#"# TYPE read_buffer_column_total gauge"#,
r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 0"#,
r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 0"#,
r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 0"#,
r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 0"#,
r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#,
r#"# TYPE read_buffer_column_values gauge"#,
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 0"#,
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#,
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 0"#,
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#,
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 0"#,
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 0"#,
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 0"#,
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#,
"",
]
.join("\n")
);
}
#[test]
fn read_filter_table_schema() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -879,7 +1057,7 @@ mod test {
#[test]
fn has_table() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -889,7 +1067,7 @@ mod test {
#[test]
fn table_summaries() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
let schema = SchemaBuilder::new()
.non_null_tag("env")
@ -1003,7 +1181,7 @@ mod test {
#[test]
fn read_filter() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a bunch of row groups to a single table in a single chunk
for &i in &[100, 200, 300] {
@ -1102,7 +1280,7 @@ mod test {
#[test]
fn could_pass_predicate() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -1228,7 +1406,7 @@ mod test {
#[test]
fn column_names() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
let schema = SchemaBuilder::new()
.non_null_tag("region")
@ -1302,7 +1480,7 @@ mod test {
#[test]
fn column_values() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
let schema = SchemaBuilder::new()
.non_null_tag("region")

View File

@ -153,6 +153,18 @@ impl Column {
}
}
// Returns statistics about the physical layout of columns
pub(crate) fn storage_stats(&self) -> Statistics {
match &self {
Self::String(_, data) => data.storage_stats(),
Self::Float(_, data) => data.storage_stats(),
Self::Integer(_, data) => data.storage_stats(),
Self::Unsigned(_, data) => data.storage_stats(),
Self::Bool(_, data) => data.storage_stats(),
Self::ByteArray(_, data) => data.storage_stats(),
}
}
pub fn properties(&self) -> &ColumnProperties {
match &self {
Self::String(meta, _) => &meta.properties,
@ -1309,6 +1321,15 @@ impl Iterator for RowIDsIterator<'_> {
}
}
// Statistics about the composition of a column
pub(crate) struct Statistics {
pub enc_type: &'static str,
pub log_data_type: &'static str,
pub values: u32,
pub nulls: u32,
pub bytes: usize,
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -1,5 +1,5 @@
use super::cmp;
use super::encoding::bool::Bool;
use super::{cmp, Statistics};
use crate::column::{RowIDs, Value, Values};
/// Encodings for boolean values.
@ -22,6 +22,17 @@ impl BooleanEncoding {
}
}
// Returns statistics about the physical layout of columns
pub(crate) fn storage_stats(&self) -> Statistics {
Statistics {
enc_type: self.name(),
log_data_type: "bool",
values: self.num_rows(),
nulls: self.null_count(),
bytes: self.size(),
}
}
/// Determines if the column contains a NULL value.
pub fn contains_null(&self) -> bool {
match self {
@ -29,6 +40,13 @@ impl BooleanEncoding {
}
}
/// The total number of rows in the column.
pub fn null_count(&self) -> u32 {
match self {
Self::BooleanNull(enc) => enc.null_count(),
}
}
/// Determines if the column contains a non-null value.
pub fn has_any_non_null_value(&self) -> bool {
match self {
@ -106,6 +124,13 @@ impl BooleanEncoding {
Self::BooleanNull(c) => c.count(row_ids),
}
}
/// The name of this encoding.
pub fn name(&self) -> &'static str {
match &self {
Self::BooleanNull(_) => "None",
}
}
}
impl std::fmt::Display for BooleanEncoding {

View File

@ -36,6 +36,10 @@ impl Bool {
self.arr.null_count() > 0
}
pub fn null_count(&self) -> u32 {
self.arr.null_count() as u32
}
/// Returns an estimation of the total size in bytes used by this column
/// encoding.
pub fn size(&self) -> usize {

View File

@ -16,6 +16,7 @@ use arrow::array::{Array, StringArray};
use crate::column::dictionary::NULL_ID;
use crate::column::{cmp, RowIDs};
pub const ENCODING_NAME: &str = "DICT";
pub struct Plain {
// The sorted set of logical values that are contained within this column
// encoding. Entries always contains None, which is used to reserve the
@ -87,6 +88,16 @@ impl Plain {
}
}
/// The number of NULL values in this column.
///
/// TODO(edd): this can be made O(1) by storing null_count on self.
pub fn null_count(&self) -> u32 {
self.encoded_data
.iter()
.filter(|&&id| id == NULL_ID)
.count() as u32
}
/// Adds the provided string value to the encoded data. It is the caller's
/// responsibility to ensure that the dictionary encoded remains sorted.
pub fn push(&mut self, v: String) {
@ -799,7 +810,8 @@ impl std::fmt::Display for Plain {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[Dictionary] size: {:?} rows: {:?} cardinality: {}",
"[{}] size: {:?} rows: {:?} cardinality: {}",
ENCODING_NAME,
self.size(),
self.num_rows(),
self.cardinality(),
@ -864,6 +876,23 @@ mod test {
);
}
#[test]
fn null_count() {
let mut enc = Plain::default();
enc.push_additional(Some("east".to_string()), 3);
assert_eq!(enc.null_count(), 0);
enc.push_additional(Some("west".to_string()), 1);
assert_eq!(enc.null_count(), 0);
enc.push_none();
assert_eq!(enc.null_count(), 1);
enc.push_none();
enc.push_none();
assert_eq!(enc.null_count(), 3);
}
#[test]
#[should_panic]
fn push_wrong_order() {

View File

@ -10,6 +10,8 @@ use arrow::array::{Array, StringArray};
use crate::column::dictionary::NULL_ID;
use crate::column::{cmp, RowIDs};
pub const ENCODING_NAME: &str = "RLE";
// `RLE` is a run-length encoding for dictionary columns, where all dictionary
// entries are utf-8 valid strings.
#[allow(clippy::upper_case_acronyms)] // this looks weird as `Rle`
@ -116,6 +118,13 @@ impl RLE {
}
}
/// The number of NULL values in this column.
pub fn null_count(&self) -> u32 {
self.index_row_ids
.get(&NULL_ID)
.map_or(0, |rows| rows.len() as u32)
}
/// Adds the provided string value to the encoded data. It is the caller's
/// responsibility to ensure that the dictionary encoded remains sorted.
pub fn push(&mut self, v: String) {
@ -925,10 +934,12 @@ impl std::fmt::Display for RLE {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[RLE] size: {:?} rows: {:?} cardinality: {}, runs: {} ",
"[{}] size: {:?} rows: {:?} cardinality: {}, nulls: {} runs: {} ",
ENCODING_NAME,
self.size(),
self.num_rows,
self.cardinality(),
self.null_count(),
self.run_lengths.len()
)
}
@ -996,6 +1007,23 @@ mod test {
assert_eq!(enc.size(), 473);
}
#[test]
fn null_count() {
let mut enc = RLE::default();
enc.push_additional(Some("east".to_string()), 3);
assert_eq!(enc.null_count(), 0);
enc.push_additional(Some("west".to_string()), 1);
assert_eq!(enc.null_count(), 0);
enc.push_none();
assert_eq!(enc.null_count(), 1);
enc.push_none();
enc.push_none();
assert_eq!(enc.null_count(), 3);
}
#[test]
#[should_panic]
fn push_wrong_order() {

View File

@ -67,6 +67,10 @@ where
self.arr.null_count() > 0
}
pub fn null_count(&self) -> u32 {
self.arr.null_count() as u32
}
/// Returns an estimation of the total size in bytes used by this column
/// encoding.
pub fn size(&self) -> usize {

View File

@ -1,7 +1,7 @@
use arrow::{self, array::Array};
use super::cmp;
use super::encoding::{fixed::Fixed, fixed_null::FixedNull};
use super::{cmp, Statistics};
use crate::column::{RowIDs, Scalar, Value, Values};
pub enum FloatEncoding {
@ -26,6 +26,17 @@ impl FloatEncoding {
}
}
// Returns statistics about the physical layout of columns
pub(crate) fn storage_stats(&self) -> Statistics {
Statistics {
enc_type: self.name(),
log_data_type: self.logical_datatype(),
values: self.num_rows(),
nulls: self.null_count(),
bytes: self.size(),
}
}
/// Determines if the column contains a NULL value.
pub fn contains_null(&self) -> bool {
match self {
@ -34,6 +45,14 @@ impl FloatEncoding {
}
}
/// The total number of rows in the column.
pub fn null_count(&self) -> u32 {
match self {
Self::Fixed64(_) => 0,
Self::FixedNull64(enc) => enc.null_count(),
}
}
/// Determines if the column contains a non-null value.
pub fn has_any_non_null_value(&self) -> bool {
match self {
@ -149,13 +168,30 @@ impl FloatEncoding {
Self::FixedNull64(c) => c.count(row_ids),
}
}
/// The name of this encoding.
pub fn name(&self) -> &'static str {
match &self {
Self::Fixed64(_) => "None",
Self::FixedNull64(_) => "None",
}
}
/// The logical datatype of this encoding.
pub fn logical_datatype(&self) -> &'static str {
match &self {
Self::Fixed64(_) => "f64",
Self::FixedNull64(_) => "f64",
}
}
}
impl std::fmt::Display for FloatEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let name = self.name();
match self {
Self::Fixed64(enc) => enc.fmt(f),
Self::FixedNull64(enc) => enc.fmt(f),
Self::Fixed64(enc) => write!(f, "[{}]: {}", name, enc),
Self::FixedNull64(enc) => write!(f, "[{}]: {}", name, enc),
}
}
}

View File

@ -1,7 +1,7 @@
use arrow::{self, array::Array};
use super::cmp;
use super::encoding::{fixed::Fixed, fixed_null::FixedNull};
use super::{cmp, Statistics};
use crate::column::{EncodedValues, RowIDs, Scalar, Value, Values};
pub enum IntegerEncoding {
@ -62,6 +62,17 @@ impl IntegerEncoding {
}
}
// Returns statistics about the physical layout of columns
pub(crate) fn storage_stats(&self) -> Statistics {
Statistics {
enc_type: self.name(),
log_data_type: self.logical_datatype(),
values: self.num_rows(),
nulls: self.null_count(),
bytes: self.size(),
}
}
/// Determines if the column contains a NULL value.
pub fn contains_null(&self) -> bool {
match self {
@ -71,6 +82,25 @@ impl IntegerEncoding {
}
}
/// The total number of rows in the column.
pub fn null_count(&self) -> u32 {
match self {
Self::I64I64(_) => 0,
Self::I64I32(_) => 0,
Self::I64U32(_) => 0,
Self::I64I16(_) => 0,
Self::I64U16(_) => 0,
Self::I64I8(_) => 0,
Self::I64U8(_) => 0,
Self::U64U64(_) => 0,
Self::U64U32(_) => 0,
Self::U64U16(_) => 0,
Self::U64U8(_) => 0,
Self::I64I64N(enc) => enc.null_count(),
Self::U64U64N(enc) => enc.null_count(),
}
}
/// Determines if the column contains a non-null value.
pub fn has_any_non_null_value(&self) -> bool {
match self {
@ -382,24 +412,63 @@ impl IntegerEncoding {
Self::U64U64N(c) => c.count(row_ids),
}
}
/// The name of this encoding.
pub fn name(&self) -> &'static str {
match &self {
Self::I64I64(_) => "None",
Self::I64I32(_) => "BT_I32",
Self::I64U32(_) => "BT_U32",
Self::I64I16(_) => "BT_I16",
Self::I64U16(_) => "BT_U16",
Self::I64I8(_) => "BT_I8",
Self::I64U8(_) => "BT_U8",
Self::U64U64(_) => "None",
Self::U64U32(_) => "BT_U32",
Self::U64U16(_) => "BT_U16",
Self::U64U8(_) => "BT_U8",
Self::I64I64N(_) => "None",
Self::U64U64N(_) => "None",
}
}
/// The logical datatype of this encoding.
pub fn logical_datatype(&self) -> &'static str {
match &self {
Self::I64I64(_) => "i64",
Self::I64I32(_) => "i64",
Self::I64U32(_) => "i64",
Self::I64I16(_) => "i64",
Self::I64U16(_) => "i64",
Self::I64I8(_) => "i64",
Self::I64U8(_) => "i64",
Self::U64U64(_) => "u64",
Self::U64U32(_) => "u64",
Self::U64U16(_) => "u64",
Self::U64U8(_) => "u64",
Self::I64I64N(_) => "i64",
Self::U64U64N(_) => "u64",
}
}
}
impl std::fmt::Display for IntegerEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let name = self.name();
match self {
Self::I64I64(enc) => write!(f, "phys i64: {}", enc),
Self::I64I32(enc) => write!(f, "phys i32: {}", enc),
Self::I64U32(enc) => write!(f, "phys u32: {}", enc),
Self::I64I16(enc) => write!(f, "phys i16: {}", enc),
Self::I64U16(enc) => write!(f, "phys u16: {}", enc),
Self::I64I8(enc) => write!(f, "phys i8: {}", enc),
Self::I64U8(enc) => write!(f, "phys u8: {}", enc),
Self::U64U64(enc) => write!(f, "phys u64: {}", enc),
Self::U64U32(enc) => write!(f, "phys u32: {}", enc),
Self::U64U16(enc) => write!(f, "phys u16: {}", enc),
Self::U64U8(enc) => write!(f, "phys u8: {}", enc),
Self::I64I64N(enc) => write!(f, "phys i64: {}", enc),
Self::U64U64N(enc) => write!(f, "phys u64: {}", enc),
Self::I64I64(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I32(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64U32(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I16(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64U16(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I8(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64U8(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U64(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U32(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U16(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U8(enc) => write!(f, "[{}]: {}", name, enc),
Self::I64I64N(enc) => write!(f, "[{}]: {}", name, enc),
Self::U64U64N(enc) => write!(f, "[{}]: {}", name, enc),
}
}
}

View File

@ -4,8 +4,9 @@ use arrow::{self, array::Array};
use either::Either;
use super::cmp;
use super::encoding::dictionary::{plain, rle};
use super::encoding::dictionary::{Encoding, Plain, RLE};
use crate::column::{RowIDs, Value, Values};
use crate::column::{RowIDs, Statistics, Value, Values};
// Edd's totally made up magic constant. This determines whether we would use
// a run-length encoded dictionary encoding or just a plain dictionary encoding.
@ -58,6 +59,20 @@ impl StringEncoding {
}
}
// Returns statistics about the physical layout of columns
pub(crate) fn storage_stats(&self) -> Statistics {
Statistics {
enc_type: match self {
Self::RleDictionary(_) => rle::ENCODING_NAME,
Self::Dictionary(_) => plain::ENCODING_NAME,
},
log_data_type: "string",
values: self.num_rows(),
nulls: self.null_count(),
bytes: self.size(),
}
}
/// Determines if the column contains a NULL value.
pub fn contains_null(&self) -> bool {
match self {
@ -66,6 +81,17 @@ impl StringEncoding {
}
}
/// Returns the number of null values in the column.
///
/// TODO(edd): store this on encodings and then it's O(1) and `contain_null`
/// can be replaced.
pub fn null_count(&self) -> u32 {
match self {
Self::RleDictionary(enc) => enc.null_count(),
Self::Dictionary(enc) => enc.null_count(),
}
}
/// Returns true if encoding can return row ID sets for logical values.
pub fn has_pre_computed_row_id_sets(&self) -> bool {
match &self {

View File

@ -12,7 +12,7 @@ use hashbrown::{hash_map, HashMap};
use itertools::Itertools;
use snafu::{ResultExt, Snafu};
use crate::column::{cmp::Operator, Column, RowIDs, RowIDsOption};
use crate::column::{self, cmp::Operator, Column, RowIDs, RowIDsOption};
use crate::schema;
use crate::schema::{AggregateType, LogicalDataType, ResultSchema};
use crate::value::{
@ -1059,6 +1059,10 @@ impl RowGroup {
dst
}
pub(crate) fn column_storage_statistics(&self) -> Vec<column::Statistics> {
self.columns.iter().map(|c| c.storage_stats()).collect()
}
}
impl std::fmt::Display for &RowGroup {

View File

@ -12,9 +12,12 @@ use arrow::record_batch::RecordBatch;
use data_types::{chunk::ChunkColumnSummary, partition_metadata::TableSummary};
use internal_types::selection::Selection;
use crate::row_group::{self, ColumnName, Predicate, RowGroup};
use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema};
use crate::value::{OwnedValue, Scalar, Value};
use crate::{
column,
row_group::{self, ColumnName, Predicate, RowGroup},
};
#[derive(Debug, Snafu)]
pub enum Error {
@ -513,6 +516,16 @@ impl Table {
.iter()
.any(|row_group| row_group.satisfies_predicate(predicate))
}
pub(crate) fn column_storage_statistics(&self) -> Vec<column::Statistics> {
let table_data = self.table_data.read();
table_data
.data
.iter()
.map(|rg| rg.column_storage_statistics())
.flatten()
.collect()
}
}
// TODO(edd): reduce owned strings here by, e.g., using references as keys.

View File

@ -297,6 +297,9 @@ pub struct Db {
/// All of the metrics for this Db.
metrics: DbMetrics,
// The metrics registry to inject into created components in the Db.
metrics_registry: Arc<metrics::MetricRegistry>,
/// Memory registries used for tracking memory usage by this Db
memory_registries: MemoryRegistries,
@ -474,6 +477,7 @@ impl Db {
write_buffer,
jobs,
metrics: db_metrics,
metrics_registry: metrics,
system_tables,
memory_registries,
sequence: AtomicU64::new(STARTING_SEQUENCE),
@ -672,8 +676,11 @@ impl Db {
let table_stats = mb_chunk.table_summaries();
// create a new read buffer chunk with memory tracking
let rb_chunk =
ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer);
let rb_chunk = ReadBufferChunk::new_with_registries(
chunk_id,
&self.memory_registries.read_buffer,
&self.metrics_registry,
);
// load tables into the new chunk one by one.
for stats in table_stats {
@ -1496,7 +1503,7 @@ mod tests {
// verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "closed", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moving", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap();
db.write_chunk_to_object_store("1970-01-01T00", "cpu", 0)
.await
@ -1515,8 +1522,8 @@ mod tests {
.eq(1.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1913).unwrap(); // now also in OS
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 2009).unwrap(); // now also in OS
db.unload_read_buffer("1970-01-01T00", "cpu", 0)
.await
@ -1532,7 +1539,7 @@ mod tests {
.unwrap();
// verify chunk size not increased for OS (it was in OS before unload)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1913).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 2009).unwrap();
// verify chunk size for RB has decreased
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 0).unwrap();
}
@ -1674,7 +1681,7 @@ mod tests {
.unwrap();
// verify chunk size updated (chunk moved from moved to writing to written)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap();
// drop, the chunk from the read buffer
db.drop_chunk(partition_key, "cpu", mb_chunk.id()).unwrap();
@ -1753,7 +1760,7 @@ mod tests {
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(4291.0)
.sample_sum_eq(4387.0)
.unwrap();
let rb = collect_read_filter(&rb_chunk, "cpu").await;
@ -1857,7 +1864,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(1913.0)
.sample_sum_eq(2009.0)
.unwrap();
// it should be the same chunk!
@ -1981,7 +1988,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(1913.0)
.sample_sum_eq(2009.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -2380,7 +2387,7 @@ mod tests {
Arc::from("cpu"),
0,
ChunkStorage::ReadBufferAndObjectStore,
1904, // size of RB and OS chunks
2000, // size of RB and OS chunks
1,
),
ChunkSummary::new_without_timestamps(
@ -2416,7 +2423,7 @@ mod tests {
);
assert_eq!(db.memory_registries.mutable_buffer.bytes(), 100 + 129 + 131);
assert_eq!(db.memory_registries.read_buffer.bytes(), 1213);
assert_eq!(db.memory_registries.read_buffer.bytes(), 1309);
assert_eq!(db.memory_registries.parquet.bytes(), 89); // TODO: This 89 must be replaced with 675. Ticket #1311
}

View File

@ -740,9 +740,10 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker(
let rb = Arc::new(read_buffer::Chunk::new_with_registries(
22,
&tracker::MemRegistry::new(),
&metrics::MetricRegistry::new(),
));
let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -784,9 +785,10 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker(
let rb = Arc::new(read_buffer::Chunk::new_with_registries(
22,
&tracker::MemRegistry::new(),
&metrics::MetricRegistry::new(),
));
let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -838,9 +840,10 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker(
let rb = Arc::new(read_buffer::Chunk::new_with_registries(
22,
&tracker::MemRegistry::new(),
&metrics::MetricRegistry::new(),
));
let chunks = vec![