From 3622a92c8bafdfc389c21915d67c7ec0b3573e23 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 10 May 2021 22:22:59 +0100 Subject: [PATCH 1/5] feat: wire in rb column metrics --- Cargo.lock | 1 + read_buffer/Cargo.toml | 1 + read_buffer/src/chunk.rs | 71 +++++++++++++++++++++++++++++++------- server/src/db.rs | 31 ++++++++++------- server/src/db/lifecycle.rs | 9 +++-- 5 files changed, 86 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b813f62cb6..4eb083a320 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2963,6 +2963,7 @@ dependencies = [ "hashbrown 0.11.2", "internal_types", "itertools 0.9.0", + "metrics", "observability_deps", "packers", "parking_lot", diff --git a/read_buffer/Cargo.toml b/read_buffer/Cargo.toml index ba9e9a5b9f..131d3cd511 100644 --- a/read_buffer/Cargo.toml +++ b/read_buffer/Cargo.toml @@ -19,6 +19,7 @@ either = "1.6.1" hashbrown = "0.11" internal_types = { path = "../internal_types" } itertools = "0.9.0" +metrics = { path = "../metrics" } observability_deps = { path = "../observability_deps" } packers = { path = "../packers" } parking_lot = "0.11" diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 44cfc6ab8c..0a06b6f452 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -3,6 +3,7 @@ use std::{ convert::TryFrom, }; +use metrics::MetricRegistry; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; @@ -49,6 +50,9 @@ pub struct Chunk { // The unique identifier for this chunk. id: u32, + // All metrics for the chunk. + metrics: ColumnMetrics, + // A chunk's data is held in a collection of mutable tables and // mutable meta data (`TableData`). // @@ -115,22 +119,28 @@ impl TableData { impl Chunk { /// Initialises a new `Chunk` with the associated chunk ID. - pub fn new(id: u32) -> Self { + pub fn new(id: u32, metrics_registry: &MetricRegistry) -> Self { Self { id, chunk_data: RwLock::new(TableData::default()), + metrics: ColumnMetrics::new(metrics_registry), } } /// Initialises a new `Chunk` with the associated chunk ID. The returned /// `Chunk` will be tracked according to the provided memory tracker + /// registry and internal metrics will be registered on the provided metrics /// registry. - pub fn new_with_memory_tracker(id: u32, registry: &MemRegistry) -> Self { - let chunk = Self::new(id); + pub fn new_with_registries( + id: u32, + mem_registry: &MemRegistry, + metrics_registry: &MetricRegistry, + ) -> Self { + let chunk = Self::new(id, metrics_registry); { let mut chunk_data = chunk.chunk_data.write(); - chunk_data.tracker = registry.register(); + chunk_data.tracker = mem_registry.register(); let size = Self::base_size() + chunk_data.size(); chunk_data.tracker.set_bytes(size); } @@ -150,6 +160,7 @@ impl Chunk { data: vec![(table.name().to_owned(), table)].into_iter().collect(), tracker: MemRegistry::new().register(), }), + metrics: ColumnMetrics::new(&metrics::MetricRegistry::new()), } } @@ -597,6 +608,42 @@ impl std::fmt::Debug for Chunk { } } +struct ColumnMetrics { + // This metric tracks the total number of columns in read buffer. + columns_total: metrics::Gauge, + + // This metric tracks the total number of values stored in read buffer + // column encodings further segmented by nullness. + column_values_total: metrics::Gauge, + + // This metric tracks the total number of bytes used by read buffer column + // encodings further segmented by nullness. + column_bytes_total: metrics::Gauge, +} + +impl ColumnMetrics { + pub fn new(registry: &MetricRegistry) -> Self { + let domain = registry.register_domain("read_buffer"); + Self { + columns_total: domain.register_gauge_metric( + "column", + Some("total"), + "The number of columns within the Read Buffer", + ), + column_values_total: domain.register_gauge_metric( + "column", + Some("values"), + "The number of values within columns in the Read Buffer", + ), + column_bytes_total: domain.register_gauge_metric( + "column", + Some("bytes"), + "The number of bytes used by all columns in the Read Buffer", + ), + } + } +} + #[cfg(test)] mod test { use std::sync::Arc; @@ -775,7 +822,7 @@ mod test { #[test] fn add_remove_tables() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); // Add a new table to the chunk. chunk.upsert_table("a_table", gen_recordbatch()); @@ -835,7 +882,7 @@ mod test { #[test] fn read_filter_table_schema() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); // Add a new table to the chunk. chunk.upsert_table("a_table", gen_recordbatch()); @@ -879,7 +926,7 @@ mod test { #[test] fn has_table() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); // Add a new table to the chunk. chunk.upsert_table("a_table", gen_recordbatch()); @@ -889,7 +936,7 @@ mod test { #[test] fn table_summaries() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); let schema = SchemaBuilder::new() .non_null_tag("env") @@ -1003,7 +1050,7 @@ mod test { #[test] fn read_filter() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); // Add a bunch of row groups to a single table in a single chunk for &i in &[100, 200, 300] { @@ -1102,7 +1149,7 @@ mod test { #[test] fn could_pass_predicate() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); // Add a new table to the chunk. chunk.upsert_table("a_table", gen_recordbatch()); @@ -1228,7 +1275,7 @@ mod test { #[test] fn column_names() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); let schema = SchemaBuilder::new() .non_null_tag("region") @@ -1302,7 +1349,7 @@ mod test { #[test] fn column_values() { - let chunk = Chunk::new(22); + let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); let schema = SchemaBuilder::new() .non_null_tag("region") diff --git a/server/src/db.rs b/server/src/db.rs index c3226da351..69931a5093 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -297,6 +297,9 @@ pub struct Db { /// All of the metrics for this Db. metrics: DbMetrics, + // The metrics registry to inject into created components in the Db. + metrics_registry: Arc, + /// Memory registries used for tracking memory usage by this Db memory_registries: MemoryRegistries, @@ -474,6 +477,7 @@ impl Db { write_buffer, jobs, metrics: db_metrics, + metrics_registry: metrics, system_tables, memory_registries, sequence: AtomicU64::new(STARTING_SEQUENCE), @@ -672,8 +676,11 @@ impl Db { let table_stats = mb_chunk.table_summaries(); // create a new read buffer chunk with memory tracking - let rb_chunk = - ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer); + let rb_chunk = ReadBufferChunk::new_with_registries( + chunk_id, + &self.memory_registries.read_buffer, + &self.metrics_registry, + ); // load tables into the new chunk one by one. for stats in table_stats { @@ -1496,7 +1503,7 @@ mod tests { // verify chunk size updated (chunk moved from closing to moving to moved) catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "closed", 0).unwrap(); catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moving", 0).unwrap(); - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap(); db.write_chunk_to_object_store("1970-01-01T00", "cpu", 0) .await @@ -1515,8 +1522,8 @@ mod tests { .eq(1.0) .unwrap(); - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap(); - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1913).unwrap(); // now also in OS + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 2009).unwrap(); // now also in OS db.unload_read_buffer("1970-01-01T00", "cpu", 0) .await @@ -1532,7 +1539,7 @@ mod tests { .unwrap(); // verify chunk size not increased for OS (it was in OS before unload) - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1913).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 2009).unwrap(); // verify chunk size for RB has decreased catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 0).unwrap(); } @@ -1674,7 +1681,7 @@ mod tests { .unwrap(); // verify chunk size updated (chunk moved from moved to writing to written) - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap(); // drop, the chunk from the read buffer db.drop_chunk(partition_key, "cpu", mb_chunk.id()).unwrap(); @@ -1753,7 +1760,7 @@ mod tests { ("svr_id", "1"), ]) .histogram() - .sample_sum_eq(4291.0) + .sample_sum_eq(4387.0) .unwrap(); let rb = collect_read_filter(&rb_chunk, "cpu").await; @@ -1857,7 +1864,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(1913.0) + .sample_sum_eq(2009.0) .unwrap(); // it should be the same chunk! @@ -1981,7 +1988,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(1913.0) + .sample_sum_eq(2009.0) .unwrap(); // Unload RB chunk but keep it in OS @@ -2380,7 +2387,7 @@ mod tests { Arc::from("cpu"), 0, ChunkStorage::ReadBufferAndObjectStore, - 1904, // size of RB and OS chunks + 2000, // size of RB and OS chunks 1, ), ChunkSummary::new_without_timestamps( @@ -2416,7 +2423,7 @@ mod tests { ); assert_eq!(db.memory_registries.mutable_buffer.bytes(), 100 + 129 + 131); - assert_eq!(db.memory_registries.read_buffer.bytes(), 1213); + assert_eq!(db.memory_registries.read_buffer.bytes(), 1309); assert_eq!(db.memory_registries.parquet.bytes(), 89); // TODO: This 89 must be replaced with 675. Ticket #1311 } diff --git a/server/src/db/lifecycle.rs b/server/src/db/lifecycle.rs index c1d96ec6f1..6b138c3761 100644 --- a/server/src/db/lifecycle.rs +++ b/server/src/db/lifecycle.rs @@ -740,9 +740,10 @@ mod tests { ..Default::default() }; - let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker( + let rb = Arc::new(read_buffer::Chunk::new_with_registries( 22, &tracker::MemRegistry::new(), + &metrics::MetricRegistry::new(), )); let chunks = vec![new_chunk(0, Some(0), Some(0))]; @@ -784,9 +785,10 @@ mod tests { ..Default::default() }; - let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker( + let rb = Arc::new(read_buffer::Chunk::new_with_registries( 22, &tracker::MemRegistry::new(), + &metrics::MetricRegistry::new(), )); let chunks = vec![new_chunk(0, Some(0), Some(0))]; @@ -838,9 +840,10 @@ mod tests { ..Default::default() }; - let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker( + let rb = Arc::new(read_buffer::Chunk::new_with_registries( 22, &tracker::MemRegistry::new(), + &metrics::MetricRegistry::new(), )); let chunks = vec![ From ef2eda04efac6b306c5d8627eda28566ab4dba88 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 11 May 2021 10:20:17 +0100 Subject: [PATCH 2/5] feat: add string encoder statistics --- .../src/column/encoding/dictionary/plain.rs | 31 ++++++++++++++++++- .../src/column/encoding/dictionary/rle.rs | 30 +++++++++++++++++- read_buffer/src/column/string.rs | 28 ++++++++++++++++- 3 files changed, 86 insertions(+), 3 deletions(-) diff --git a/read_buffer/src/column/encoding/dictionary/plain.rs b/read_buffer/src/column/encoding/dictionary/plain.rs index fa31f8999c..061c0c9304 100644 --- a/read_buffer/src/column/encoding/dictionary/plain.rs +++ b/read_buffer/src/column/encoding/dictionary/plain.rs @@ -16,6 +16,7 @@ use arrow::array::{Array, StringArray}; use crate::column::dictionary::NULL_ID; use crate::column::{cmp, RowIDs}; +pub const ENCODING_NAME: &str = "DICT"; pub struct Plain { // The sorted set of logical values that are contained within this column // encoding. Entries always contains None, which is used to reserve the @@ -87,6 +88,16 @@ impl Plain { } } + /// The number of NULL values in this column. + /// + /// TODO(edd): this can be made O(1) by storing null_count on self. + pub fn null_count(&self) -> u32 { + self.encoded_data + .iter() + .filter(|&&id| id == NULL_ID) + .count() as u32 + } + /// Adds the provided string value to the encoded data. It is the caller's /// responsibility to ensure that the dictionary encoded remains sorted. pub fn push(&mut self, v: String) { @@ -799,7 +810,8 @@ impl std::fmt::Display for Plain { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "[Dictionary] size: {:?} rows: {:?} cardinality: {}", + "[{}] size: {:?} rows: {:?} cardinality: {}", + ENCODING_NAME, self.size(), self.num_rows(), self.cardinality(), @@ -864,6 +876,23 @@ mod test { ); } + #[test] + fn null_count() { + let mut enc = Plain::default(); + enc.push_additional(Some("east".to_string()), 3); + assert_eq!(enc.null_count(), 0); + + enc.push_additional(Some("west".to_string()), 1); + assert_eq!(enc.null_count(), 0); + + enc.push_none(); + assert_eq!(enc.null_count(), 1); + + enc.push_none(); + enc.push_none(); + assert_eq!(enc.null_count(), 3); + } + #[test] #[should_panic] fn push_wrong_order() { diff --git a/read_buffer/src/column/encoding/dictionary/rle.rs b/read_buffer/src/column/encoding/dictionary/rle.rs index 1ee9ec9f25..55848ae295 100644 --- a/read_buffer/src/column/encoding/dictionary/rle.rs +++ b/read_buffer/src/column/encoding/dictionary/rle.rs @@ -10,6 +10,8 @@ use arrow::array::{Array, StringArray}; use crate::column::dictionary::NULL_ID; use crate::column::{cmp, RowIDs}; +pub const ENCODING_NAME: &str = "RLE"; + // `RLE` is a run-length encoding for dictionary columns, where all dictionary // entries are utf-8 valid strings. #[allow(clippy::upper_case_acronyms)] // this looks weird as `Rle` @@ -116,6 +118,13 @@ impl RLE { } } + /// The number of NULL values in this column. + pub fn null_count(&self) -> u32 { + self.index_row_ids + .get(&NULL_ID) + .map_or(0, |rows| rows.len() as u32) + } + /// Adds the provided string value to the encoded data. It is the caller's /// responsibility to ensure that the dictionary encoded remains sorted. pub fn push(&mut self, v: String) { @@ -925,10 +934,12 @@ impl std::fmt::Display for RLE { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "[RLE] size: {:?} rows: {:?} cardinality: {}, runs: {} ", + "[{}] size: {:?} rows: {:?} cardinality: {}, nulls: {} runs: {} ", + ENCODING_NAME, self.size(), self.num_rows, self.cardinality(), + self.null_count(), self.run_lengths.len() ) } @@ -996,6 +1007,23 @@ mod test { assert_eq!(enc.size(), 473); } + #[test] + fn null_count() { + let mut enc = RLE::default(); + enc.push_additional(Some("east".to_string()), 3); + assert_eq!(enc.null_count(), 0); + + enc.push_additional(Some("west".to_string()), 1); + assert_eq!(enc.null_count(), 0); + + enc.push_none(); + assert_eq!(enc.null_count(), 1); + + enc.push_none(); + enc.push_none(); + assert_eq!(enc.null_count(), 3); + } + #[test] #[should_panic] fn push_wrong_order() { diff --git a/read_buffer/src/column/string.rs b/read_buffer/src/column/string.rs index 04d652567f..1681e8e836 100644 --- a/read_buffer/src/column/string.rs +++ b/read_buffer/src/column/string.rs @@ -4,8 +4,9 @@ use arrow::{self, array::Array}; use either::Either; use super::cmp; +use super::encoding::dictionary::{plain, rle}; use super::encoding::dictionary::{Encoding, Plain, RLE}; -use crate::column::{RowIDs, Value, Values}; +use crate::column::{RowIDs, Statistics, Value, Values}; // Edd's totally made up magic constant. This determines whether we would use // a run-length encoded dictionary encoding or just a plain dictionary encoding. @@ -58,6 +59,20 @@ impl StringEncoding { } } + // Returns statistics about the physical layout of columns + pub(crate) fn storage_stats(&self) -> Statistics { + Statistics { + enc_type: match self { + Self::RleDictionary(_) => rle::ENCODING_NAME, + Self::Dictionary(_) => plain::ENCODING_NAME, + }, + log_data_type: "string", + values: self.num_rows(), + nulls: self.null_count(), + bytes: self.size(), + } + } + /// Determines if the column contains a NULL value. pub fn contains_null(&self) -> bool { match self { @@ -66,6 +81,17 @@ impl StringEncoding { } } + /// Returns the number of null values in the column. + /// + /// TODO(edd): store this on encodings and then it's O(1) and `contain_null` + /// can be replaced. + pub fn null_count(&self) -> u32 { + match self { + Self::RleDictionary(enc) => enc.null_count(), + Self::Dictionary(enc) => enc.null_count(), + } + } + /// Returns true if encoding can return row ID sets for logical values. pub fn has_pre_computed_row_id_sets(&self) -> bool { match &self { From 88ed58aa8ad8b9e972c6655cbad5001c3ac236e2 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 11 May 2021 10:46:07 +0100 Subject: [PATCH 3/5] feat: column statistics for int/float --- read_buffer/src/column/encoding/fixed_null.rs | 4 + read_buffer/src/column/float.rs | 42 +++++++- read_buffer/src/column/integer.rs | 97 ++++++++++++++++--- 3 files changed, 126 insertions(+), 17 deletions(-) diff --git a/read_buffer/src/column/encoding/fixed_null.rs b/read_buffer/src/column/encoding/fixed_null.rs index 44297a5b70..3c478eda73 100644 --- a/read_buffer/src/column/encoding/fixed_null.rs +++ b/read_buffer/src/column/encoding/fixed_null.rs @@ -67,6 +67,10 @@ where self.arr.null_count() > 0 } + pub fn null_count(&self) -> u32 { + self.arr.null_count() as u32 + } + /// Returns an estimation of the total size in bytes used by this column /// encoding. pub fn size(&self) -> usize { diff --git a/read_buffer/src/column/float.rs b/read_buffer/src/column/float.rs index d1eab207b7..7eb4a5603d 100644 --- a/read_buffer/src/column/float.rs +++ b/read_buffer/src/column/float.rs @@ -1,7 +1,7 @@ use arrow::{self, array::Array}; -use super::cmp; use super::encoding::{fixed::Fixed, fixed_null::FixedNull}; +use super::{cmp, Statistics}; use crate::column::{RowIDs, Scalar, Value, Values}; pub enum FloatEncoding { @@ -26,6 +26,17 @@ impl FloatEncoding { } } + // Returns statistics about the physical layout of columns + pub(crate) fn storage_stats(&self) -> Statistics { + Statistics { + enc_type: self.name(), + log_data_type: self.logical_datatype(), + values: self.num_rows(), + nulls: self.null_count(), + bytes: self.size(), + } + } + /// Determines if the column contains a NULL value. pub fn contains_null(&self) -> bool { match self { @@ -34,6 +45,14 @@ impl FloatEncoding { } } + /// The total number of rows in the column. + pub fn null_count(&self) -> u32 { + match self { + Self::Fixed64(_) => 0, + Self::FixedNull64(enc) => enc.null_count(), + } + } + /// Determines if the column contains a non-null value. pub fn has_any_non_null_value(&self) -> bool { match self { @@ -149,13 +168,30 @@ impl FloatEncoding { Self::FixedNull64(c) => c.count(row_ids), } } + + /// The name of this encoding. + pub fn name(&self) -> &'static str { + match &self { + Self::Fixed64(_) => "None", + Self::FixedNull64(_) => "None", + } + } + + /// The logical datatype of this encoding. + pub fn logical_datatype(&self) -> &'static str { + match &self { + Self::Fixed64(_) => "f64", + Self::FixedNull64(_) => "f64", + } + } } impl std::fmt::Display for FloatEncoding { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let name = self.name(); match self { - Self::Fixed64(enc) => enc.fmt(f), - Self::FixedNull64(enc) => enc.fmt(f), + Self::Fixed64(enc) => write!(f, "[{}]: {}", name, enc), + Self::FixedNull64(enc) => write!(f, "[{}]: {}", name, enc), } } } diff --git a/read_buffer/src/column/integer.rs b/read_buffer/src/column/integer.rs index a7f5cff2ad..8b8522e3ef 100644 --- a/read_buffer/src/column/integer.rs +++ b/read_buffer/src/column/integer.rs @@ -1,7 +1,7 @@ use arrow::{self, array::Array}; -use super::cmp; use super::encoding::{fixed::Fixed, fixed_null::FixedNull}; +use super::{cmp, Statistics}; use crate::column::{EncodedValues, RowIDs, Scalar, Value, Values}; pub enum IntegerEncoding { @@ -62,6 +62,17 @@ impl IntegerEncoding { } } + // Returns statistics about the physical layout of columns + pub(crate) fn storage_stats(&self) -> Statistics { + Statistics { + enc_type: self.name(), + log_data_type: self.logical_datatype(), + values: self.num_rows(), + nulls: self.null_count(), + bytes: self.size(), + } + } + /// Determines if the column contains a NULL value. pub fn contains_null(&self) -> bool { match self { @@ -71,6 +82,25 @@ impl IntegerEncoding { } } + /// The total number of rows in the column. + pub fn null_count(&self) -> u32 { + match self { + Self::I64I64(_) => 0, + Self::I64I32(_) => 0, + Self::I64U32(_) => 0, + Self::I64I16(_) => 0, + Self::I64U16(_) => 0, + Self::I64I8(_) => 0, + Self::I64U8(_) => 0, + Self::U64U64(_) => 0, + Self::U64U32(_) => 0, + Self::U64U16(_) => 0, + Self::U64U8(_) => 0, + Self::I64I64N(enc) => enc.null_count(), + Self::U64U64N(enc) => enc.null_count(), + } + } + /// Determines if the column contains a non-null value. pub fn has_any_non_null_value(&self) -> bool { match self { @@ -382,24 +412,63 @@ impl IntegerEncoding { Self::U64U64N(c) => c.count(row_ids), } } + + /// The name of this encoding. + pub fn name(&self) -> &'static str { + match &self { + Self::I64I64(_) => "None", + Self::I64I32(_) => "BT_I32", + Self::I64U32(_) => "BT_U32", + Self::I64I16(_) => "BT_I16", + Self::I64U16(_) => "BT_U16", + Self::I64I8(_) => "BT_I8", + Self::I64U8(_) => "BT_U8", + Self::U64U64(_) => "None", + Self::U64U32(_) => "BT_U32", + Self::U64U16(_) => "BT_U16", + Self::U64U8(_) => "BT_U8", + Self::I64I64N(_) => "None", + Self::U64U64N(_) => "None", + } + } + + /// The logical datatype of this encoding. + pub fn logical_datatype(&self) -> &'static str { + match &self { + Self::I64I64(_) => "i64", + Self::I64I32(_) => "i64", + Self::I64U32(_) => "i64", + Self::I64I16(_) => "i64", + Self::I64U16(_) => "i64", + Self::I64I8(_) => "i64", + Self::I64U8(_) => "i64", + Self::U64U64(_) => "u64", + Self::U64U32(_) => "u64", + Self::U64U16(_) => "u64", + Self::U64U8(_) => "u64", + Self::I64I64N(_) => "i64", + Self::U64U64N(_) => "u64", + } + } } impl std::fmt::Display for IntegerEncoding { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let name = self.name(); match self { - Self::I64I64(enc) => write!(f, "phys i64: {}", enc), - Self::I64I32(enc) => write!(f, "phys i32: {}", enc), - Self::I64U32(enc) => write!(f, "phys u32: {}", enc), - Self::I64I16(enc) => write!(f, "phys i16: {}", enc), - Self::I64U16(enc) => write!(f, "phys u16: {}", enc), - Self::I64I8(enc) => write!(f, "phys i8: {}", enc), - Self::I64U8(enc) => write!(f, "phys u8: {}", enc), - Self::U64U64(enc) => write!(f, "phys u64: {}", enc), - Self::U64U32(enc) => write!(f, "phys u32: {}", enc), - Self::U64U16(enc) => write!(f, "phys u16: {}", enc), - Self::U64U8(enc) => write!(f, "phys u8: {}", enc), - Self::I64I64N(enc) => write!(f, "phys i64: {}", enc), - Self::U64U64N(enc) => write!(f, "phys u64: {}", enc), + Self::I64I64(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64I32(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64U32(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64I16(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64U16(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64I8(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64U8(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U64(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U32(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U16(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U8(enc) => write!(f, "[{}]: {}", name, enc), + Self::I64I64N(enc) => write!(f, "[{}]: {}", name, enc), + Self::U64U64N(enc) => write!(f, "[{}]: {}", name, enc), } } } From c4987028fb337b96c2fd3f3d96f65f0adcde867e Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 11 May 2021 11:09:12 +0100 Subject: [PATCH 4/5] feat: expose all column stats --- read_buffer/src/column.rs | 21 +++++++++++++++++++ read_buffer/src/column/boolean.rs | 27 ++++++++++++++++++++++++- read_buffer/src/column/encoding/bool.rs | 4 ++++ read_buffer/src/row_group.rs | 6 +++++- read_buffer/src/table.rs | 15 +++++++++++++- 5 files changed, 70 insertions(+), 3 deletions(-) diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 5a0551c485..9b498dbc47 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -153,6 +153,18 @@ impl Column { } } + // Returns statistics about the physical layout of columns + pub(crate) fn storage_stats(&self) -> Statistics { + match &self { + Self::String(_, data) => data.storage_stats(), + Self::Float(_, data) => data.storage_stats(), + Self::Integer(_, data) => data.storage_stats(), + Self::Unsigned(_, data) => data.storage_stats(), + Self::Bool(_, data) => data.storage_stats(), + Self::ByteArray(_, data) => data.storage_stats(), + } + } + pub fn properties(&self) -> &ColumnProperties { match &self { Self::String(meta, _) => &meta.properties, @@ -1309,6 +1321,15 @@ impl Iterator for RowIDsIterator<'_> { } } +// Statistics about the composition of a column +pub(crate) struct Statistics { + enc_type: &'static str, + log_data_type: &'static str, + values: u32, + nulls: u32, + bytes: usize, +} + #[cfg(test)] mod test { use super::*; diff --git a/read_buffer/src/column/boolean.rs b/read_buffer/src/column/boolean.rs index a6da69f111..064025ea42 100644 --- a/read_buffer/src/column/boolean.rs +++ b/read_buffer/src/column/boolean.rs @@ -1,5 +1,5 @@ -use super::cmp; use super::encoding::bool::Bool; +use super::{cmp, Statistics}; use crate::column::{RowIDs, Value, Values}; /// Encodings for boolean values. @@ -22,6 +22,17 @@ impl BooleanEncoding { } } + // Returns statistics about the physical layout of columns + pub(crate) fn storage_stats(&self) -> Statistics { + Statistics { + enc_type: self.name(), + log_data_type: "bool", + values: self.num_rows(), + nulls: self.null_count(), + bytes: self.size(), + } + } + /// Determines if the column contains a NULL value. pub fn contains_null(&self) -> bool { match self { @@ -29,6 +40,13 @@ impl BooleanEncoding { } } + /// The total number of rows in the column. + pub fn null_count(&self) -> u32 { + match self { + Self::BooleanNull(enc) => enc.null_count(), + } + } + /// Determines if the column contains a non-null value. pub fn has_any_non_null_value(&self) -> bool { match self { @@ -106,6 +124,13 @@ impl BooleanEncoding { Self::BooleanNull(c) => c.count(row_ids), } } + + /// The name of this encoding. + pub fn name(&self) -> &'static str { + match &self { + Self::BooleanNull(_) => "None", + } + } } impl std::fmt::Display for BooleanEncoding { diff --git a/read_buffer/src/column/encoding/bool.rs b/read_buffer/src/column/encoding/bool.rs index b9d6b58669..0752535d84 100644 --- a/read_buffer/src/column/encoding/bool.rs +++ b/read_buffer/src/column/encoding/bool.rs @@ -36,6 +36,10 @@ impl Bool { self.arr.null_count() > 0 } + pub fn null_count(&self) -> u32 { + self.arr.null_count() as u32 + } + /// Returns an estimation of the total size in bytes used by this column /// encoding. pub fn size(&self) -> usize { diff --git a/read_buffer/src/row_group.rs b/read_buffer/src/row_group.rs index 60f18c8f8c..693870aad4 100644 --- a/read_buffer/src/row_group.rs +++ b/read_buffer/src/row_group.rs @@ -12,7 +12,7 @@ use hashbrown::{hash_map, HashMap}; use itertools::Itertools; use snafu::{ResultExt, Snafu}; -use crate::column::{cmp::Operator, Column, RowIDs, RowIDsOption}; +use crate::column::{self, cmp::Operator, Column, RowIDs, RowIDsOption}; use crate::schema; use crate::schema::{AggregateType, LogicalDataType, ResultSchema}; use crate::value::{ @@ -1059,6 +1059,10 @@ impl RowGroup { dst } + + pub(crate) fn column_storage_statistics(&self) -> Vec { + self.columns.iter().map(|c| c.storage_stats()).collect() + } } impl std::fmt::Display for &RowGroup { diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index 713a430e2a..fa9bc3c216 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -12,9 +12,12 @@ use arrow::record_batch::RecordBatch; use data_types::{chunk::ChunkColumnSummary, partition_metadata::TableSummary}; use internal_types::selection::Selection; -use crate::row_group::{self, ColumnName, Predicate, RowGroup}; use crate::schema::{AggregateType, ColumnType, LogicalDataType, ResultSchema}; use crate::value::{OwnedValue, Scalar, Value}; +use crate::{ + column, + row_group::{self, ColumnName, Predicate, RowGroup}, +}; #[derive(Debug, Snafu)] pub enum Error { @@ -513,6 +516,16 @@ impl Table { .iter() .any(|row_group| row_group.satisfies_predicate(predicate)) } + + pub(crate) fn column_storage_statistics(&self) -> Vec { + let table_data = self.table_data.read(); + table_data + .data + .iter() + .map(|rg| rg.column_storage_statistics()) + .flatten() + .collect() + } } // TODO(edd): reduce owned strings here by, e.g., using references as keys. From 32abe2e777dfdb9d42af2fffc196c870430d0ab7 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 11 May 2021 12:18:11 +0100 Subject: [PATCH 5/5] feat: wire up stats to metrics --- read_buffer/benches/database.rs | 2 +- read_buffer/src/chunk.rs | 141 ++++++++++++++++++++++++++++++-- read_buffer/src/column.rs | 10 +-- 3 files changed, 142 insertions(+), 11 deletions(-) diff --git a/read_buffer/benches/database.rs b/read_buffer/benches/database.rs index 32f75f4b38..af6918300c 100644 --- a/read_buffer/benches/database.rs +++ b/read_buffer/benches/database.rs @@ -14,7 +14,7 @@ const ONE_MS: i64 = 1_000_000; fn table_names(c: &mut Criterion) { let rb = generate_row_group(500_000); - let chunk = Chunk::new(0); + let chunk = Chunk::new(0, &metrics::MetricRegistry::new()); chunk.upsert_table("table_a", rb); // no predicate - return all the tables diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 0a06b6f452..70b796bd9f 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -3,7 +3,7 @@ use std::{ convert::TryFrom, }; -use metrics::MetricRegistry; +use metrics::{KeyValue, MetricRegistry}; use parking_lot::RwLock; use snafu::{OptionExt, ResultExt, Snafu}; @@ -13,11 +13,11 @@ use internal_types::{schema::builder::Error as SchemaError, schema::Schema, sele use observability_deps::tracing::info; use tracker::{MemRegistry, MemTracker}; -use crate::row_group::RowGroup; use crate::row_group::{ColumnName, Predicate}; use crate::schema::{AggregateType, ResultSchema}; use crate::table; use crate::table::Table; +use crate::{column::Statistics, row_group::RowGroup}; type TableName = String; @@ -288,6 +288,9 @@ impl Chunk { chunk_data.rows += row_group.rows() as u64; chunk_data.row_groups += 1; + // track new row group statistics to update column-based metrics. + let storage_statistics = row_group.column_storage_statistics(); + // create a new table if one doesn't exist, or add the table data to // the existing table. match chunk_data.data.entry(table_name.clone()) { @@ -304,6 +307,10 @@ impl Chunk { // Get and set new size of chunk on memory tracker let size = Self::base_size() + chunk_data.size(); chunk_data.tracker.set_bytes(size); + + // update column metrics associated with column storage + std::mem::drop(chunk_data); // drop write lock + self.update_column_storage_statistics(&storage_statistics, false); } /// Removes the table specified by `name` along with all of its contained @@ -600,6 +607,51 @@ impl Chunk { .fail(), } } + + // Updates column storage statistics for the Read Buffer. + // `drop` indicates whether to decrease the metrics (because the chunk is + // being dropped), or to increase the metrics because it's being created. + fn update_column_storage_statistics(&self, statistics: &[Statistics], drop: bool) { + // whether to increase/decrease the metrics + let sign = if drop { -1.0 } else { 1.0 }; + + for stat in statistics { + let labels = &[ + KeyValue::new("encoding", stat.enc_type), + KeyValue::new("log_data_type", stat.log_data_type), + ]; + + // update number of columns + self.metrics + .columns_total + .add_with_labels(1.0 * sign, labels); + + // update bytes associated with columns + self.metrics + .column_bytes_total + .add_with_labels(stat.bytes as f64 * sign, labels); + + // update number of NULL values + self.metrics.column_values_total.add_with_labels( + stat.nulls as f64 * sign, + &[ + KeyValue::new("encoding", stat.enc_type), + KeyValue::new("log_data_type", stat.log_data_type), + KeyValue::new("null", "true"), + ], + ); + + // update number of non-NULL values + self.metrics.column_values_total.add_with_labels( + (stat.values - stat.nulls) as f64 * sign, + &[ + KeyValue::new("encoding", stat.enc_type), + KeyValue::new("log_data_type", stat.log_data_type), + KeyValue::new("null", "false"), + ], + ); + } + } } impl std::fmt::Debug for Chunk { @@ -616,8 +668,7 @@ struct ColumnMetrics { // column encodings further segmented by nullness. column_values_total: metrics::Gauge, - // This metric tracks the total number of bytes used by read buffer column - // encodings further segmented by nullness. + // This metric tracks the total number of bytes used by read buffer columns column_bytes_total: metrics::Gauge, } @@ -644,6 +695,23 @@ impl ColumnMetrics { } } +// When a chunk is dropped from the Read Buffer we need to adjust the metrics +// associated with column storage. +impl Drop for Chunk { + fn drop(&mut self) { + let storage_statistics = { + let chunk_data = self.chunk_data.read(); + chunk_data + .data + .values() + .map(|table| table.column_storage_statistics()) + .flatten() + .collect::>() + }; + self.update_column_storage_statistics(&storage_statistics, true); + } +} + #[cfg(test)] mod test { use std::sync::Arc; @@ -822,7 +890,8 @@ mod test { #[test] fn add_remove_tables() { - let chunk = Chunk::new(22, &metrics::MetricRegistry::new()); + let reg = metrics::TestMetricRegistry::new(Arc::new(metrics::MetricRegistry::new())); + let chunk = Chunk::new(22, ®.registry()); // Add a new table to the chunk. chunk.upsert_table("a_table", gen_recordbatch()); @@ -878,6 +947,68 @@ mod test { assert_eq!(table.rows(), 6); assert_eq!(table.row_groups(), 2); } + + assert_eq!( + String::from_utf8(reg.registry().metrics_as_text()).unwrap(), + vec![ + "# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer", + "# TYPE read_buffer_column_bytes gauge", + r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 108"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 1152"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 1176"#, + r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 1014"#, + r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#, + r#"# TYPE read_buffer_column_total gauge"#, + r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 3"#, + r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 3"#, + r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 6"#, + r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 3"#, + r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#, + r#"# TYPE read_buffer_column_values gauge"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 9"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 9"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 15"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 3"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 9"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#, + "", + ] + .join("\n") + ); + + // when the chunk is dropped the metics are all correctly decreased + std::mem::drop(chunk); + assert_eq!( + String::from_utf8(reg.registry().metrics_as_text()).unwrap(), + vec![ + "# HELP read_buffer_column_bytes The number of bytes used by all columns in the Read Buffer", + "# TYPE read_buffer_column_bytes gauge", + r#"read_buffer_column_bytes{encoding="BT_U32",log_data_type="i64"} 0"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="bool"} 0"#, + r#"read_buffer_column_bytes{encoding="None",log_data_type="f64"} 0"#, + r#"read_buffer_column_bytes{encoding="RLE",log_data_type="string"} 0"#, + r#"# HELP read_buffer_column_total The number of columns within the Read Buffer"#, + r#"# TYPE read_buffer_column_total gauge"#, + r#"read_buffer_column_total{encoding="BT_U32",log_data_type="i64"} 0"#, + r#"read_buffer_column_total{encoding="None",log_data_type="bool"} 0"#, + r#"read_buffer_column_total{encoding="None",log_data_type="f64"} 0"#, + r#"read_buffer_column_total{encoding="RLE",log_data_type="string"} 0"#, + r#"# HELP read_buffer_column_values The number of values within columns in the Read Buffer"#, + r#"# TYPE read_buffer_column_values gauge"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="false"} 0"#, + r#"read_buffer_column_values{encoding="BT_U32",log_data_type="i64",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="false"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="bool",null="true"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="false"} 0"#, + r#"read_buffer_column_values{encoding="None",log_data_type="f64",null="true"} 0"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="false"} 0"#, + r#"read_buffer_column_values{encoding="RLE",log_data_type="string",null="true"} 0"#, + "", + ] + .join("\n") + ); } #[test] diff --git a/read_buffer/src/column.rs b/read_buffer/src/column.rs index 9b498dbc47..ce0838cd2c 100644 --- a/read_buffer/src/column.rs +++ b/read_buffer/src/column.rs @@ -1323,11 +1323,11 @@ impl Iterator for RowIDsIterator<'_> { // Statistics about the composition of a column pub(crate) struct Statistics { - enc_type: &'static str, - log_data_type: &'static str, - values: u32, - nulls: u32, - bytes: usize, + pub enc_type: &'static str, + pub log_data_type: &'static str, + pub values: u32, + pub nulls: u32, + pub bytes: usize, } #[cfg(test)]