From 32abe2e777dfdb9d42af2fffc196c870430d0ab7 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 11 May 2021 12:18:11 +0100 Subject: [PATCH] feat: wire up stats to metrics --- read_buffer/benches/database.rs | 2 +- read_buffer/src/chunk.rs | 141 ++++++++++++++++++++++++++++++-- read_buffer/src/column.rs | 10 +-- 3 files changed, 142 insertions(+), 11 deletions(-) diff --git a/read_buffer/benches/database.rs b/read_buffer/benches/database.rs index 32f75f4b38..af6918300c 100644 --- a/read_buffer/benches/database.rs +++ b/read_buffer/benches/database.rs @@ -14,7 +14,7 @@ const ONE_MS: i64 = 1_000_000; fn table_names(c: &mut Criterion) { let rb = generate_row_group(500_000); - let chunk = Chunk::new(0); + let chunk = Chunk::new(0, &metrics::MetricRegistry::new()); chunk.upsert_table("table_a", rb); // no predicate - return all the tables diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 0a06b6f452..70b796bd9f 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -3,7 +3,7 @@ use std::{ convert::TryFrom, }; -use metrics::MetricRegistry; +use metrics::{KeyValue, MetricRegistry}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; @@ -13,11 +13,11 @@ use internal_types::{schema::builder::Error as SchemaError, schema::Schema, sele use observability_deps::tracing::info; use tracker::{MemRegistry, MemTracker}; -use crate::row_group::RowGroup; use crate::row_group::{ColumnName, Predicate}; use crate::schema::{AggregateType, ResultSchema}; use crate::table; use crate::table::Table; +use crate::{column::Statistics, row_group::RowGroup}; type TableName = String; @@ -288,6 +288,9 @@ impl Chunk { chunk_data.rows += row_group.rows() as u64; chunk_data.row_groups += 1; + // track new row group statistics to update column-based metrics. + let storage_statistics = row_group.column_storage_statistics(); + // create a new table if one doesn't exist, or add the table data to // the existing table. match chunk_data.data.entry(table_name.clone()) { @@ -304,6 +307,10 @@ impl Chunk { // Get and set new size of chunk on memory tracker let size = Self::base_size() + chunk_data.size(); chunk_data.tracker.set_bytes(size); + + // update column metrics associated with column storage + std::mem::drop(chunk_data); // drop write lock + self.update_column_storage_statistics(&storage_statistics, false); } /// Removes the table specified by `name` along with all of its contained @@ -600,6 +607,51 @@ impl Chunk { .fail(), } } + + // Updates column storage statistics for the Read Buffer. + // `drop` indicates whether to decrease the metrics (because the chunk is + // being dropped), or to increase the metrics because it's being created. + fn update_column_storage_statistics(&self, statistics: &[Statistics], drop: bool) { + // whether to increase/decrease the metrics + let sign = if drop { -1.0 } else { 1.0 }; + + for stat in statistics { + let labels = &[ + KeyValue::new("encoding", stat.enc_type), + KeyValue::new("log_data_type", stat.log_data_type), + ]; + + // update number of columns + self.metrics + .columns_total + .add_with_labels(1.0 * sign, labels); + + // update bytes associated with columns + self.metrics + .column_bytes_total + .add_with_labels(stat.bytes as f64 * sign, labels); + + // update number of NULL values + self.metrics.column_values_total.add_with_labels( + stat.nulls as f64 * sign, + &[ + KeyValue::new("encoding", stat.enc_type), + KeyValue::new("log_data_type", stat.log_data_type), + KeyValue::new("null", "true"), + ], + ); + + // update number of non-NULL values + self.metrics.column_values_total.add_with_labels( + (stat.values - stat.nulls) as f64 * sign, + &[ + KeyValue::new("encoding", stat.enc_type), + KeyValue::new("log_data_type", stat.log_data_type), + KeyValue::new("null", "false"), + ], + ); + } + } } impl std::fmt::Debug for Chunk { @@ -616,8 +668,7 @@ struct ColumnMetrics { // column encodings further segmented by nullness. column_values_total: metrics::Gauge, - // This metric tracks the total number of bytes used by read buffer column - // encodings further segmented by nullness. + // This metric tracks the total number of bytes used by read buffer columns column_bytes_total: metrics::Gauge, } @@ -644,6 +695,23 @@ impl ColumnMetrics { } } +// When a chunk is dropped from the Read Buffer we need to adjust the metrics +// associated with column storage. +impl Drop for Chunk { + fn drop(&mut self) { + let storage_statistics = { + let chunk_data = self.chunk_data.read(); + chunk_data + .data + .values() + .map(|table| table.column_storage_statistics()) + .flatten() + .collect::>() + }; + self.update_column_storage_statistics(&storage_statistics, true); + } +} + #[cfg(test)] mod test { use std::sync::Arc; @@ -822,7 +890,8 @@ mod test { #[test] fn add_remove_tables() { - let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); + let reg = metrics::TestMetricRegistry::new(Arc::new(metrics::MetricRegistry::new())); + let chunk = Chunk::new(22, ®.registry()); // Add a new table to the chunk. chunk.upsert_table("a_table", gen_recordbatch()); @@ -878,6 +947,68 @@ mod test { assert_eq!(table.rows(), 6); assert_eq!(table.row_groups(), 2); } + + assert_eq!( + String::from_utf8(reg.registry().metrics_as_text()).unwrap(), + vec![ + "# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer", + "# TYPE read_buffer_column_bytes gauge", + r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 108"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 1152"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 1176"#, + r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 1014"#, + r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#, + r#"# TYPE read_buffer_column_total gauge"#, + r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 3"#, + r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 3"#, + r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 6"#, + r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 3"#, + r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#, + r#"# TYPE read_buffer_column_values gauge"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 9"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 9"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 15"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 3"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 9"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#, + "", + ] + .join("\n") + ); + + // when the chunk is dropped the metics are all correctly decreased + std::mem::drop(chunk); + assert_eq!( + String::from_utf8(reg.registry().metrics_as_text()).unwrap(), + vec![ + "# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer", + "# TYPE read_buffer_column_bytes gauge", + r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 0"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 0"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 0"#, + r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 0"#, + r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#, + r#"# TYPE read_buffer_column_total gauge"#, + r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 0"#, + r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 0"#, + r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 0"#, + r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 0"#, + r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#, + r#"# TYPE read_buffer_column_values gauge"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 0"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 0"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 0"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#, + "", + ] + .join("\n") + ); } #[test] diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 9b498dbc47..ce0838cd2c 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -1323,11 +1323,11 @@ impl Iterator for RowIDsIterator<'_> { // Statistics about the composition of a column pub(crate) struct Statistics { - enc_type: &'static str, - log_data_type: &'static str, - values: u32, - nulls: u32, - bytes: usize, + pub enc_type: &'static str, + pub log_data_type: &'static str, + pub values: u32, + pub nulls: u32, + pub bytes: usize, } #[cfg(test)]