Merge pull request #2320 from influxdata/er/feat/read_buffer/row_group_metrics
feat: add metric to track total row groupspull/24376/head
commit
f93be2bae4
|
@ -301,8 +301,14 @@ impl std::fmt::Debug for Chunk {
|
|||
}
|
||||
}
|
||||
|
||||
/// The collection of metrics exposed by the Read Buffer. Note: several of these
|
||||
/// be better represented as distributions, but the histogram story in IOx is not
|
||||
/// yet figured out.
|
||||
#[derive(Debug)]
|
||||
pub struct ChunkMetrics {
|
||||
/// The total number of row groups in the chunk.
|
||||
row_groups_total: Gauge,
|
||||
|
||||
/// This metric tracks the total number of columns in read buffer.
|
||||
columns_total: Gauge,
|
||||
|
||||
|
@ -328,6 +334,11 @@ pub struct ChunkMetrics {
|
|||
impl ChunkMetrics {
|
||||
pub fn new(domain: &metrics::Domain) -> Self {
|
||||
Self {
|
||||
row_groups_total: domain.register_gauge_metric(
|
||||
"row_group",
|
||||
Some("total"),
|
||||
"The number of row groups within the Read Buffer",
|
||||
),
|
||||
columns_total: domain.register_gauge_metric(
|
||||
"column",
|
||||
Some("total"),
|
||||
|
@ -362,6 +373,7 @@ impl ChunkMetrics {
|
|||
/// created on a metrics domain, and vice versa
|
||||
pub fn new_unregistered() -> Self {
|
||||
Self {
|
||||
row_groups_total: Gauge::new_unregistered(),
|
||||
columns_total: Gauge::new_unregistered(),
|
||||
column_values_total: Gauge::new_unregistered(),
|
||||
column_allocated_bytes_total: Gauge::new_unregistered(),
|
||||
|
@ -372,6 +384,9 @@ impl ChunkMetrics {
|
|||
|
||||
// Updates column storage statistics for the Read Buffer.
|
||||
fn update_column_storage_statistics(&mut self, statistics: &[Statistics]) {
|
||||
// increase number of row groups in chunk.
|
||||
self.row_groups_total.inc(1, &[]);
|
||||
|
||||
for stat in statistics {
|
||||
let labels = &[
|
||||
KeyValue::new("encoding", stat.enc_type.clone()),
|
||||
|
@ -718,11 +733,14 @@ mod test {
|
|||
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
|
||||
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="false"} 6"#,
|
||||
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
|
||||
"# HELP read_buffer_row_group_total The number of row groups within the Read Buffer",
|
||||
"# TYPE read_buffer_row_group_total gauge",
|
||||
r#"read_buffer_row_group_total{db="mydb"} 2"#,
|
||||
"",
|
||||
];
|
||||
|
||||
for (actual_line, &expected_line) in actual_lines.zip(expected_lines.iter()) {
|
||||
assert_eq!(actual_line, expected_line);
|
||||
for (actual_line, &expected_line) in actual_lines.clone().zip(expected_lines.iter()) {
|
||||
assert_eq!(actual_line, expected_line, "{:?}", actual_lines);
|
||||
}
|
||||
|
||||
// when the chunk is dropped the metrics are all correctly decreased
|
||||
|
@ -775,6 +793,9 @@ mod test {
|
|||
r#"read_buffer_column_values{db="mydb",encoding="FIXEDN",log_data_type="bool",null="true"} 0"#,
|
||||
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="false"} 0"#,
|
||||
r#"read_buffer_column_values{db="mydb",encoding="RLE",log_data_type="string",null="true"} 0"#,
|
||||
"# HELP read_buffer_row_group_total The number of row groups within the Read Buffer",
|
||||
"# TYPE read_buffer_row_group_total gauge",
|
||||
r#"read_buffer_row_group_total{db="mydb"} 0"#,
|
||||
"",
|
||||
];
|
||||
|
||||
|
|
|
@ -2150,7 +2150,7 @@ mod tests {
|
|||
|
||||
// verify chunk size updated (chunk moved from closing to moving to moved)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
|
||||
let expected_read_buffer_size = 1820;
|
||||
let expected_read_buffer_size = 1916;
|
||||
catalog_chunk_size_bytes_metric_eq(
|
||||
&test_db.metric_registry,
|
||||
"read_buffer",
|
||||
|
@ -2401,7 +2401,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// verify chunk size updated (chunk moved from moved to writing to written)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1820).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1916).unwrap();
|
||||
|
||||
// drop, the chunk from the read buffer
|
||||
db.drop_chunk("cpu", partition_key, mb_chunk.id())
|
||||
|
@ -2551,7 +2551,7 @@ mod tests {
|
|||
("svr_id", "1"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(3532.0)
|
||||
.sample_sum_eq(3628.0)
|
||||
.unwrap();
|
||||
|
||||
let rb = collect_read_filter(&rb_chunk).await;
|
||||
|
@ -2661,7 +2661,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2499.0)
|
||||
.sample_sum_eq(2595.0)
|
||||
.unwrap();
|
||||
|
||||
// while MB and RB chunk are identical, the PQ chunk is a new one (split off)
|
||||
|
@ -2782,7 +2782,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2499.0)
|
||||
.sample_sum_eq(2595.0)
|
||||
.unwrap();
|
||||
|
||||
// Unload RB chunk but keep it in OS
|
||||
|
@ -3422,7 +3422,7 @@ mod tests {
|
|||
id: 2,
|
||||
storage: ChunkStorage::ReadBufferAndObjectStore,
|
||||
lifecycle_action,
|
||||
memory_bytes: 3544, // size of RB and OS chunks
|
||||
memory_bytes: 3640, // size of RB and OS chunks
|
||||
object_store_bytes: 1577, // size of parquet file
|
||||
row_count: 2,
|
||||
time_of_last_access: None,
|
||||
|
@ -3473,7 +3473,7 @@ mod tests {
|
|||
}
|
||||
|
||||
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2486 + 87);
|
||||
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2670);
|
||||
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2766);
|
||||
assert_eq!(db.catalog.metrics().memory().object_store(), 874);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue