From c795fc7f9d79b102d2decd8015f74573ca6e2bd5 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Tue, 17 Aug 2021 12:55:11 +0100 Subject: [PATCH] feat: add metric to track total row groups --- read_buffer/src/chunk.rs | 29 +++++++++++++++++++++++++---- server/src/db.rs | 14 +++++++------- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index de34e62752..3f347be6ca 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -301,8 +301,14 @@ impl std::fmt::Debug for Chunk { } } +/// The collection of metrics exposed by the Read Buffer. Note: several of these +/// be better represented as distributions, but the histogram story in IOx is not +/// yet figured out. #[derive(Debug)] pub struct ChunkMetrics { + /// The total number of row groups in the chunk. + row_groups_total: Gauge, + /// This metric tracks the total number of columns in read buffer. columns_total: Gauge, @@ -328,6 +334,11 @@ pub struct ChunkMetrics { impl ChunkMetrics { pub fn new(domain: &metrics::Domain) -> Self { Self { + row_groups_total: domain.register_gauge_metric( + "row_group", + Some("total"), + "The number of row groups within the Read Buffer", + ), columns_total: domain.register_gauge_metric( "column", Some("total"), @@ -362,6 +373,7 @@ impl ChunkMetrics { /// created on a metrics domain, and vice versa pub fn new_unregistered() -> Self { Self { + row_groups_total: Gauge::new_unregistered(), columns_total: Gauge::new_unregistered(), column_values_total: Gauge::new_unregistered(), column_allocated_bytes_total: Gauge::new_unregistered(), @@ -372,6 +384,9 @@ impl ChunkMetrics { // Updates column storage statistics for the Read Buffer. fn update_column_storage_statistics(&mut self, statistics: &[Statistics]) { + // increase number of row groups in chunk. + self.row_groups_total.inc(1, &[]); + for stat in statistics { let labels = &[ KeyValue::new("encoding", stat.enc_type.clone()), @@ -718,11 +733,14 @@ mod test { r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#, r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="false"} 6"#, r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#, + "# HELP read_buffer_row_group_total The number of row groups within the Read Buffer", + "# TYPE read_buffer_row_group_total gauge", + r#"read_buffer_row_group_total{db="mydb"} 2"#, "", - ]; + ]; - for (actual_line, &expected_line) in actual_lines.zip(expected_lines.iter()) { - assert_eq!(actual_line, expected_line); + for (actual_line, &expected_line) in actual_lines.clone().zip(expected_lines.iter()) { + assert_eq!(actual_line, expected_line, "{:?}", actual_lines); } // when the chunk is dropped the metrics are all correctly decreased @@ -775,7 +793,10 @@ mod test { r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#, r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="false"} 0"#, r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#, - "", + "# HELP read_buffer_row_group_total The number of row groups within the Read Buffer", + "# TYPE read_buffer_row_group_total gauge", + r#"read_buffer_row_group_total{db="mydb"} 0"#, + "", ]; for (actual_line, &expected_line) in actual_lines.zip(expected_lines.iter()) { diff --git a/server/src/db.rs b/server/src/db.rs index ed61786cfd..275f83e07e 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2131,7 +2131,7 @@ mod tests { // verify chunk size updated (chunk moved from closing to moving to moved) catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap(); - let expected_read_buffer_size = 1820; + let expected_read_buffer_size = 1916; catalog_chunk_size_bytes_metric_eq( &test_db.metric_registry, "read_buffer", @@ -2382,7 +2382,7 @@ mod tests { .unwrap(); // verify chunk size updated (chunk moved from moved to writing to written) - catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1820).unwrap(); + catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1916).unwrap(); // drop, the chunk from the read buffer db.drop_chunk("cpu", partition_key, mb_chunk.id()) @@ -2532,7 +2532,7 @@ mod tests { ("svr_id", "1"), ]) .histogram() - .sample_sum_eq(3532.0) + .sample_sum_eq(3628.0) .unwrap(); let rb = collect_read_filter(&rb_chunk).await; @@ -2642,7 +2642,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2499.0) + .sample_sum_eq(2595.0) .unwrap(); // while MB and RB chunk are identical, the PQ chunk is a new one (split off) @@ -2763,7 +2763,7 @@ mod tests { ("svr_id", "10"), ]) .histogram() - .sample_sum_eq(2499.0) + .sample_sum_eq(2595.0) .unwrap(); // Unload RB chunk but keep it in OS @@ -3403,7 +3403,7 @@ mod tests { id: 2, storage: ChunkStorage::ReadBufferAndObjectStore, lifecycle_action, - memory_bytes: 3544, // size of RB and OS chunks + memory_bytes: 3640, // size of RB and OS chunks object_store_bytes: 1577, // size of parquet file row_count: 2, time_of_last_access: None, @@ -3454,7 +3454,7 @@ mod tests { } assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87); - assert_eq!(db.catalog.metrics().memory().read_buffer(), 2670); + assert_eq!(db.catalog.metrics().memory().read_buffer(), 2766); assert_eq!(db.catalog.metrics().memory().object_store(), 874); }