feat: wire up stats to metrics
parent
c4987028fb
commit
32abe2e777
|
@ -14,7 +14,7 @@ const ONE_MS: i64 = 1_000_000;
|
|||
|
||||
fn table_names(c: &mut Criterion) {
|
||||
let rb = generate_row_group(500_000);
|
||||
let chunk = Chunk::new(0);
|
||||
let chunk = Chunk::new(0, &metrics::MetricRegistry::new());
|
||||
chunk.upsert_table("table_a", rb);
|
||||
|
||||
// no predicate - return all the tables
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::{
|
|||
convert::TryFrom,
|
||||
};
|
||||
|
||||
use metrics::MetricRegistry;
|
||||
use metrics::{KeyValue, MetricRegistry};
|
||||
use parking_lot::RwLock;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
|
@ -13,11 +13,11 @@ use internal_types::{schema::builder::Error as SchemaError, schema::Schema, sele
|
|||
use observability_deps::tracing::info;
|
||||
use tracker::{MemRegistry, MemTracker};
|
||||
|
||||
use crate::row_group::RowGroup;
|
||||
use crate::row_group::{ColumnName, Predicate};
|
||||
use crate::schema::{AggregateType, ResultSchema};
|
||||
use crate::table;
|
||||
use crate::table::Table;
|
||||
use crate::{column::Statistics, row_group::RowGroup};
|
||||
|
||||
type TableName = String;
|
||||
|
||||
|
@ -288,6 +288,9 @@ impl Chunk {
|
|||
chunk_data.rows += row_group.rows() as u64;
|
||||
chunk_data.row_groups += 1;
|
||||
|
||||
// track new row group statistics to update column-based metrics.
|
||||
let storage_statistics = row_group.column_storage_statistics();
|
||||
|
||||
// create a new table if one doesn't exist, or add the table data to
|
||||
// the existing table.
|
||||
match chunk_data.data.entry(table_name.clone()) {
|
||||
|
@ -304,6 +307,10 @@ impl Chunk {
|
|||
// Get and set new size of chunk on memory tracker
|
||||
let size = Self::base_size() + chunk_data.size();
|
||||
chunk_data.tracker.set_bytes(size);
|
||||
|
||||
// update column metrics associated with column storage
|
||||
std::mem::drop(chunk_data); // drop write lock
|
||||
self.update_column_storage_statistics(&storage_statistics, false);
|
||||
}
|
||||
|
||||
/// Removes the table specified by `name` along with all of its contained
|
||||
|
@ -600,6 +607,51 @@ impl Chunk {
|
|||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
// Updates column storage statistics for the Read Buffer.
|
||||
// `drop` indicates whether to decrease the metrics (because the chunk is
|
||||
// being dropped), or to increase the metrics because it's being created.
|
||||
fn update_column_storage_statistics(&self, statistics: &[Statistics], drop: bool) {
|
||||
// whether to increase/decrease the metrics
|
||||
let sign = if drop { -1.0 } else { 1.0 };
|
||||
|
||||
for stat in statistics {
|
||||
let labels = &[
|
||||
KeyValue::new("encoding", stat.enc_type),
|
||||
KeyValue::new("log_data_type", stat.log_data_type),
|
||||
];
|
||||
|
||||
// update number of columns
|
||||
self.metrics
|
||||
.columns_total
|
||||
.add_with_labels(1.0 * sign, labels);
|
||||
|
||||
// update bytes associated with columns
|
||||
self.metrics
|
||||
.column_bytes_total
|
||||
.add_with_labels(stat.bytes as f64 * sign, labels);
|
||||
|
||||
// update number of NULL values
|
||||
self.metrics.column_values_total.add_with_labels(
|
||||
stat.nulls as f64 * sign,
|
||||
&[
|
||||
KeyValue::new("encoding", stat.enc_type),
|
||||
KeyValue::new("log_data_type", stat.log_data_type),
|
||||
KeyValue::new("null", "true"),
|
||||
],
|
||||
);
|
||||
|
||||
// update number of non-NULL values
|
||||
self.metrics.column_values_total.add_with_labels(
|
||||
(stat.values - stat.nulls) as f64 * sign,
|
||||
&[
|
||||
KeyValue::new("encoding", stat.enc_type),
|
||||
KeyValue::new("log_data_type", stat.log_data_type),
|
||||
KeyValue::new("null", "false"),
|
||||
],
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Chunk {
|
||||
|
@ -616,8 +668,7 @@ struct ColumnMetrics {
|
|||
// column encodings further segmented by nullness.
|
||||
column_values_total: metrics::Gauge,
|
||||
|
||||
// This metric tracks the total number of bytes used by read buffer column
|
||||
// encodings further segmented by nullness.
|
||||
// This metric tracks the total number of bytes used by read buffer columns
|
||||
column_bytes_total: metrics::Gauge,
|
||||
}
|
||||
|
||||
|
@ -644,6 +695,23 @@ impl ColumnMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
// When a chunk is dropped from the Read Buffer we need to adjust the metrics
|
||||
// associated with column storage.
|
||||
impl Drop for Chunk {
|
||||
fn drop(&mut self) {
|
||||
let storage_statistics = {
|
||||
let chunk_data = self.chunk_data.read();
|
||||
chunk_data
|
||||
.data
|
||||
.values()
|
||||
.map(|table| table.column_storage_statistics())
|
||||
.flatten()
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
self.update_column_storage_statistics(&storage_statistics, true);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::sync::Arc;
|
||||
|
@ -822,7 +890,8 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn add_remove_tables() {
|
||||
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
|
||||
let reg = metrics::TestMetricRegistry::new(Arc::new(metrics::MetricRegistry::new()));
|
||||
let chunk = Chunk::new(22, ®.registry());
|
||||
|
||||
// Add a new table to the chunk.
|
||||
chunk.upsert_table("a_table", gen_recordbatch());
|
||||
|
@ -878,6 +947,68 @@ mod test {
|
|||
assert_eq!(table.rows(), 6);
|
||||
assert_eq!(table.row_groups(), 2);
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
String::from_utf8(reg.registry().metrics_as_text()).unwrap(),
|
||||
vec![
|
||||
"# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer",
|
||||
"# TYPE read_buffer_column_bytes gauge",
|
||||
r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 108"#,
|
||||
r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 1152"#,
|
||||
r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 1176"#,
|
||||
r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 1014"#,
|
||||
r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#,
|
||||
r#"# TYPE read_buffer_column_total gauge"#,
|
||||
r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 3"#,
|
||||
r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 3"#,
|
||||
r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 6"#,
|
||||
r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 3"#,
|
||||
r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#,
|
||||
r#"# TYPE read_buffer_column_values gauge"#,
|
||||
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 9"#,
|
||||
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 9"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 15"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 3"#,
|
||||
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 9"#,
|
||||
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#,
|
||||
"",
|
||||
]
|
||||
.join("\n")
|
||||
);
|
||||
|
||||
// when the chunk is dropped the metics are all correctly decreased
|
||||
std::mem::drop(chunk);
|
||||
assert_eq!(
|
||||
String::from_utf8(reg.registry().metrics_as_text()).unwrap(),
|
||||
vec![
|
||||
"# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer",
|
||||
"# TYPE read_buffer_column_bytes gauge",
|
||||
r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 0"#,
|
||||
r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 0"#,
|
||||
r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 0"#,
|
||||
r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 0"#,
|
||||
r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#,
|
||||
r#"# TYPE read_buffer_column_total gauge"#,
|
||||
r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 0"#,
|
||||
r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 0"#,
|
||||
r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 0"#,
|
||||
r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 0"#,
|
||||
r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#,
|
||||
r#"# TYPE read_buffer_column_values gauge"#,
|
||||
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 0"#,
|
||||
r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#,
|
||||
"",
|
||||
]
|
||||
.join("\n")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -1323,11 +1323,11 @@ impl Iterator for RowIDsIterator<'_> {
|
|||
|
||||
// Statistics about the composition of a column
|
||||
pub(crate) struct Statistics {
|
||||
enc_type: &'static str,
|
||||
log_data_type: &'static str,
|
||||
values: u32,
|
||||
nulls: u32,
|
||||
bytes: usize,
|
||||
pub enc_type: &'static str,
|
||||
pub log_data_type: &'static str,
|
||||
pub values: u32,
|
||||
pub nulls: u32,
|
||||
pub bytes: usize,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
Loading…
Reference in New Issue