feat: wire in rb column metrics

pull/24376/head
Edd Robinson 2021-05-10 22:22:59 +01:00
parent 767d6b66f6
commit 3622a92c8b
5 changed files with 86 additions and 27 deletions

1
Cargo.lock generated
View File

@ -2963,6 +2963,7 @@ dependencies = [
"hashbrown 0.11.2",
"internal_types",
"itertools 0.9.0",
"metrics",
"observability_deps",
"packers",
"parking_lot",

View File

@ -19,6 +19,7 @@ either = "1.6.1"
hashbrown = "0.11"
internal_types = { path = "../internal_types" }
itertools = "0.9.0"
metrics = { path = "../metrics" }
observability_deps = { path = "../observability_deps" }
packers = { path = "../packers" }
parking_lot = "0.11"

View File

@ -3,6 +3,7 @@ use std::{
convert::TryFrom,
};
use metrics::MetricRegistry;
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};
@ -49,6 +50,9 @@ pub struct Chunk {
// The unique identifier for this chunk.
id: u32,
// All metrics for the chunk.
metrics: ColumnMetrics,
// A chunk's data is held in a collection of mutable tables and
// mutable meta data (`TableData`).
//
@ -115,22 +119,28 @@ impl TableData {
impl Chunk {
/// Initialises a new `Chunk` with the associated chunk ID.
pub fn new(id: u32) -> Self {
pub fn new(id: u32, metrics_registry: &MetricRegistry) -> Self {
Self {
id,
chunk_data: RwLock::new(TableData::default()),
metrics: ColumnMetrics::new(metrics_registry),
}
}
/// Initialises a new `Chunk` with the associated chunk ID. The returned
/// `Chunk` will be tracked according to the provided memory tracker
/// registry and internal metrics will be registered on the provided metrics
/// registry.
pub fn new_with_memory_tracker(id: u32, registry: &MemRegistry) -> Self {
let chunk = Self::new(id);
pub fn new_with_registries(
id: u32,
mem_registry: &MemRegistry,
metrics_registry: &MetricRegistry,
) -> Self {
let chunk = Self::new(id, metrics_registry);
{
let mut chunk_data = chunk.chunk_data.write();
chunk_data.tracker = registry.register();
chunk_data.tracker = mem_registry.register();
let size = Self::base_size() + chunk_data.size();
chunk_data.tracker.set_bytes(size);
}
@ -150,6 +160,7 @@ impl Chunk {
data: vec![(table.name().to_owned(), table)].into_iter().collect(),
tracker: MemRegistry::new().register(),
}),
metrics: ColumnMetrics::new(&metrics::MetricRegistry::new()),
}
}
@ -597,6 +608,42 @@ impl std::fmt::Debug for Chunk {
}
}
struct ColumnMetrics {
// This metric tracks the total number of columns in read buffer.
columns_total: metrics::Gauge,
// This metric tracks the total number of values stored in read buffer
// column encodings further segmented by nullness.
column_values_total: metrics::Gauge,
// This metric tracks the total number of bytes used by read buffer column
// encodings further segmented by nullness.
column_bytes_total: metrics::Gauge,
}
impl ColumnMetrics {
pub fn new(registry: &MetricRegistry) -> Self {
let domain = registry.register_domain("read_buffer");
Self {
columns_total: domain.register_gauge_metric(
"column",
Some("total"),
"The number of columns within the Read Buffer",
),
column_values_total: domain.register_gauge_metric(
"column",
Some("values"),
"The number of values within columns in the Read Buffer",
),
column_bytes_total: domain.register_gauge_metric(
"column",
Some("bytes"),
"The number of bytes used by all columns in the Read Buffer",
),
}
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
@ -775,7 +822,7 @@ mod test {
#[test]
fn add_remove_tables() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -835,7 +882,7 @@ mod test {
#[test]
fn read_filter_table_schema() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -879,7 +926,7 @@ mod test {
#[test]
fn has_table() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -889,7 +936,7 @@ mod test {
#[test]
fn table_summaries() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
let schema = SchemaBuilder::new()
.non_null_tag("env")
@ -1003,7 +1050,7 @@ mod test {
#[test]
fn read_filter() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a bunch of row groups to a single table in a single chunk
for &i in &[100, 200, 300] {
@ -1102,7 +1149,7 @@ mod test {
#[test]
fn could_pass_predicate() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
// Add a new table to the chunk.
chunk.upsert_table("a_table", gen_recordbatch());
@ -1228,7 +1275,7 @@ mod test {
#[test]
fn column_names() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
let schema = SchemaBuilder::new()
.non_null_tag("region")
@ -1302,7 +1349,7 @@ mod test {
#[test]
fn column_values() {
let chunk = Chunk::new(22);
let chunk = Chunk::new(22, &metrics::MetricRegistry::new());
let schema = SchemaBuilder::new()
.non_null_tag("region")

View File

@ -297,6 +297,9 @@ pub struct Db {
/// All of the metrics for this Db.
metrics: DbMetrics,
// The metrics registry to inject into created components in the Db.
metrics_registry: Arc<metrics::MetricRegistry>,
/// Memory registries used for tracking memory usage by this Db
memory_registries: MemoryRegistries,
@ -474,6 +477,7 @@ impl Db {
write_buffer,
jobs,
metrics: db_metrics,
metrics_registry: metrics,
system_tables,
memory_registries,
sequence: AtomicU64::new(STARTING_SEQUENCE),
@ -672,8 +676,11 @@ impl Db {
let table_stats = mb_chunk.table_summaries();
// create a new read buffer chunk with memory tracking
let rb_chunk =
ReadBufferChunk::new_with_memory_tracker(chunk_id, &self.memory_registries.read_buffer);
let rb_chunk = ReadBufferChunk::new_with_registries(
chunk_id,
&self.memory_registries.read_buffer,
&self.metrics_registry,
);
// load tables into the new chunk one by one.
for stats in table_stats {
@ -1496,7 +1503,7 @@ mod tests {
// verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "closed", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moving", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap();
db.write_chunk_to_object_store("1970-01-01T00", "cpu", 0)
.await
@ -1515,8 +1522,8 @@ mod tests {
.eq(1.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1913).unwrap(); // now also in OS
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 2009).unwrap(); // now also in OS
db.unload_read_buffer("1970-01-01T00", "cpu", 0)
.await
@ -1532,7 +1539,7 @@ mod tests {
.unwrap();
// verify chunk size not increased for OS (it was in OS before unload)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1913).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 2009).unwrap();
// verify chunk size for RB has decreased
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 0).unwrap();
}
@ -1674,7 +1681,7 @@ mod tests {
.unwrap();
// verify chunk size updated (chunk moved from moved to writing to written)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1318).unwrap();
// drop, the chunk from the read buffer
db.drop_chunk(partition_key, "cpu", mb_chunk.id()).unwrap();
@ -1753,7 +1760,7 @@ mod tests {
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(4291.0)
.sample_sum_eq(4387.0)
.unwrap();
let rb = collect_read_filter(&rb_chunk, "cpu").await;
@ -1857,7 +1864,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(1913.0)
.sample_sum_eq(2009.0)
.unwrap();
// it should be the same chunk!
@ -1981,7 +1988,7 @@ mod tests {
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(1913.0)
.sample_sum_eq(2009.0)
.unwrap();
// Unload RB chunk but keep it in OS
@ -2380,7 +2387,7 @@ mod tests {
Arc::from("cpu"),
0,
ChunkStorage::ReadBufferAndObjectStore,
1904, // size of RB and OS chunks
2000, // size of RB and OS chunks
1,
),
ChunkSummary::new_without_timestamps(
@ -2416,7 +2423,7 @@ mod tests {
);
assert_eq!(db.memory_registries.mutable_buffer.bytes(), 100 + 129 + 131);
assert_eq!(db.memory_registries.read_buffer.bytes(), 1213);
assert_eq!(db.memory_registries.read_buffer.bytes(), 1309);
assert_eq!(db.memory_registries.parquet.bytes(), 89); // TODO: This 89 must be replaced with 675. Ticket #1311
}

View File

@ -740,9 +740,10 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker(
let rb = Arc::new(read_buffer::Chunk::new_with_registries(
22,
&tracker::MemRegistry::new(),
&metrics::MetricRegistry::new(),
));
let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -784,9 +785,10 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker(
let rb = Arc::new(read_buffer::Chunk::new_with_registries(
22,
&tracker::MemRegistry::new(),
&metrics::MetricRegistry::new(),
));
let chunks = vec![new_chunk(0, Some(0), Some(0))];
@ -838,9 +840,10 @@ mod tests {
..Default::default()
};
let rb = Arc::new(read_buffer::Chunk::new_with_memory_tracker(
let rb = Arc::new(read_buffer::Chunk::new_with_registries(
22,
&tracker::MemRegistry::new(),
&metrics::MetricRegistry::new(),
));
let chunks = vec![