feat: add chunk storage metrics (#2069)
* feat: add chunk storage metrics * chore: review feedback Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
8c974beba0
commit
38e375d11a
|
@ -1698,13 +1698,13 @@ mod tests {
|
||||||
|
|
||||||
fn catalog_chunk_size_bytes_metric_eq(
|
fn catalog_chunk_size_bytes_metric_eq(
|
||||||
reg: &metrics::TestMetricRegistry,
|
reg: &metrics::TestMetricRegistry,
|
||||||
source: &'static str,
|
location: &'static str,
|
||||||
v: u64,
|
v: u64,
|
||||||
) -> Result<(), metrics::Error> {
|
) -> Result<(), metrics::Error> {
|
||||||
reg.has_metric_family("catalog_chunks_mem_usage_bytes")
|
reg.has_metric_family("catalog_chunks_mem_usage_bytes")
|
||||||
.with_labels(&[
|
.with_labels(&[
|
||||||
("db_name", "placeholder"),
|
("db_name", "placeholder"),
|
||||||
("source", source),
|
("location", location),
|
||||||
("svr_id", "1"),
|
("svr_id", "1"),
|
||||||
])
|
])
|
||||||
.gauge()
|
.gauge()
|
||||||
|
@ -1714,7 +1714,22 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn metrics_during_rollover() {
|
async fn metrics_during_rollover() {
|
||||||
let test_db = make_db().await;
|
let test_db = make_db().await;
|
||||||
let db = test_db.db;
|
let db = Arc::clone(&test_db.db);
|
||||||
|
|
||||||
|
let assert_metric = |name: &'static str, location: &'static str, value: f64| {
|
||||||
|
test_db
|
||||||
|
.metric_registry
|
||||||
|
.has_metric_family(name)
|
||||||
|
.with_labels(&[
|
||||||
|
("db_name", "placeholder"),
|
||||||
|
("location", location),
|
||||||
|
("svr_id", "1"),
|
||||||
|
("table", "cpu"),
|
||||||
|
])
|
||||||
|
.gauge()
|
||||||
|
.eq(value)
|
||||||
|
.unwrap();
|
||||||
|
};
|
||||||
|
|
||||||
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
write_lp(db.as_ref(), "cpu bar=1 10").await;
|
||||||
|
|
||||||
|
@ -1731,11 +1746,18 @@ mod tests {
|
||||||
.eq(1.0)
|
.eq(1.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "mutable_buffer", 1.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||||
|
|
||||||
// verify chunk size updated
|
// verify chunk size updated
|
||||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap();
|
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 44).unwrap();
|
||||||
|
|
||||||
// write into same chunk again.
|
// write into same chunk again.
|
||||||
write_lp(db.as_ref(), "cpu bar=2 10").await;
|
write_lp(db.as_ref(), "cpu bar=2 20").await;
|
||||||
|
|
||||||
// verify chunk size updated
|
// verify chunk size updated
|
||||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap();
|
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 60).unwrap();
|
||||||
|
@ -1753,6 +1775,13 @@ mod tests {
|
||||||
.eq(1.0)
|
.eq(1.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||||
|
|
||||||
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
|
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
|
||||||
|
|
||||||
// A chunk is now closed
|
// A chunk is now closed
|
||||||
|
@ -1768,6 +1797,13 @@ mod tests {
|
||||||
.eq(1.0)
|
.eq(1.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "mutable_buffer", 2.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||||
|
|
||||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1239)
|
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1239)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -1788,9 +1824,16 @@ mod tests {
|
||||||
.eq(1.0)
|
.eq(1.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
assert_metric("catalog_loaded_chunks", "mutable_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "read_buffer", 2.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "object_store", 0.0);
|
||||||
|
|
||||||
// verify chunk size updated (chunk moved from closing to moving to moved)
|
// verify chunk size updated (chunk moved from closing to moving to moved)
|
||||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
|
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
|
||||||
let expected_read_buffer_size = 1611;
|
let expected_read_buffer_size = 1613;
|
||||||
catalog_chunk_size_bytes_metric_eq(
|
catalog_chunk_size_bytes_metric_eq(
|
||||||
&test_db.metric_registry,
|
&test_db.metric_registry,
|
||||||
"read_buffer",
|
"read_buffer",
|
||||||
|
@ -1829,11 +1872,18 @@ mod tests {
|
||||||
// now also in OS
|
// now also in OS
|
||||||
catalog_chunk_size_bytes_metric_eq(
|
catalog_chunk_size_bytes_metric_eq(
|
||||||
&test_db.metric_registry,
|
&test_db.metric_registry,
|
||||||
"parquet",
|
"object_store",
|
||||||
expected_parquet_size,
|
expected_parquet_size,
|
||||||
)
|
)
|
||||||
.unwrap(); // TODO: #1311
|
.unwrap(); // TODO: #1311
|
||||||
|
|
||||||
|
assert_metric("catalog_loaded_chunks", "mutable_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "object_store", 1.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "read_buffer", 2.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "object_store", 2.0);
|
||||||
|
|
||||||
db.unload_read_buffer("cpu", "1970-01-01T00", 1).unwrap();
|
db.unload_read_buffer("cpu", "1970-01-01T00", 1).unwrap();
|
||||||
|
|
||||||
// A chunk is now now in the "os-only" state.
|
// A chunk is now now in the "os-only" state.
|
||||||
|
@ -1845,10 +1895,17 @@ mod tests {
|
||||||
.eq(1.0)
|
.eq(1.0)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
assert_metric("catalog_loaded_chunks", "mutable_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_chunks", "object_store", 1.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "mutable_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
|
||||||
|
assert_metric("catalog_loaded_rows", "object_store", 2.0);
|
||||||
|
|
||||||
// verify chunk size not increased for OS (it was in OS before unload)
|
// verify chunk size not increased for OS (it was in OS before unload)
|
||||||
catalog_chunk_size_bytes_metric_eq(
|
catalog_chunk_size_bytes_metric_eq(
|
||||||
&test_db.metric_registry,
|
&test_db.metric_registry,
|
||||||
"parquet",
|
"object_store",
|
||||||
expected_parquet_size,
|
expected_parquet_size,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -2845,7 +2902,7 @@ mod tests {
|
||||||
|
|
||||||
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2398 + 87);
|
assert_eq!(db.catalog.metrics().memory().mutable_buffer(), 2398 + 87);
|
||||||
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2410);
|
assert_eq!(db.catalog.metrics().memory().read_buffer(), 2410);
|
||||||
assert_eq!(db.catalog.metrics().memory().parquet(), 826);
|
assert_eq!(db.catalog.metrics().memory().object_store(), 826);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::db::catalog::metrics::MemoryMetrics;
|
use crate::db::catalog::metrics::StorageGauge;
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use data_types::instant::to_approximate_datetime;
|
use data_types::instant::to_approximate_datetime;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
|
@ -228,7 +228,14 @@ macro_rules! unexpected_state {
|
||||||
pub struct ChunkMetrics {
|
pub struct ChunkMetrics {
|
||||||
pub(super) state: Counter,
|
pub(super) state: Counter,
|
||||||
pub(super) immutable_chunk_size: Histogram,
|
pub(super) immutable_chunk_size: Histogram,
|
||||||
pub(super) memory_metrics: MemoryMetrics,
|
/// Chunk storage metrics
|
||||||
|
pub(super) chunk_storage: StorageGauge,
|
||||||
|
|
||||||
|
/// Chunk row count metrics
|
||||||
|
pub(super) row_count: StorageGauge,
|
||||||
|
|
||||||
|
/// Catalog memory metrics
|
||||||
|
pub(super) memory_metrics: StorageGauge,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChunkMetrics {
|
impl ChunkMetrics {
|
||||||
|
@ -240,7 +247,9 @@ impl ChunkMetrics {
|
||||||
Self {
|
Self {
|
||||||
state: Counter::new_unregistered(),
|
state: Counter::new_unregistered(),
|
||||||
immutable_chunk_size: Histogram::new_unregistered(),
|
immutable_chunk_size: Histogram::new_unregistered(),
|
||||||
memory_metrics: MemoryMetrics::new_unregistered(),
|
chunk_storage: StorageGauge::new_unregistered(),
|
||||||
|
row_count: StorageGauge::new_unregistered(),
|
||||||
|
memory_metrics: StorageGauge::new_unregistered(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -275,7 +284,7 @@ impl CatalogChunk {
|
||||||
time_of_last_write: Some(last_write),
|
time_of_last_write: Some(last_write),
|
||||||
time_closed: None,
|
time_closed: None,
|
||||||
};
|
};
|
||||||
chunk.update_memory_metrics();
|
chunk.update_metrics();
|
||||||
chunk
|
chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +321,7 @@ impl CatalogChunk {
|
||||||
time_of_last_write: None,
|
time_of_last_write: None,
|
||||||
time_closed: None,
|
time_closed: None,
|
||||||
};
|
};
|
||||||
chunk.update_memory_metrics();
|
chunk.update_metrics();
|
||||||
chunk
|
chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,7 +356,7 @@ impl CatalogChunk {
|
||||||
time_of_last_write: None,
|
time_of_last_write: None,
|
||||||
time_closed: None,
|
time_closed: None,
|
||||||
};
|
};
|
||||||
chunk.update_memory_metrics();
|
chunk.update_metrics();
|
||||||
chunk
|
chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,42 +407,49 @@ impl CatalogChunk {
|
||||||
self.time_closed
|
self.time_closed
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Updates `self.memory_metrics` to match the contents of `self.stage`
|
/// Updates `self.metrics` to match the contents of `self.stage`
|
||||||
fn update_memory_metrics(&mut self) {
|
fn update_metrics(&mut self) {
|
||||||
match &self.stage {
|
match &self.stage {
|
||||||
ChunkStage::Open { mb_chunk } => {
|
ChunkStage::Open { mb_chunk } => {
|
||||||
self.metrics
|
self.metrics.memory_metrics.set_mub_only(mb_chunk.size());
|
||||||
.memory_metrics
|
self.metrics.row_count.set_mub_only(mb_chunk.rows());
|
||||||
.mutable_buffer
|
self.metrics.chunk_storage.set_mub_only(1);
|
||||||
.set(mb_chunk.size());
|
|
||||||
self.metrics.memory_metrics.read_buffer.set(0);
|
|
||||||
self.metrics.memory_metrics.parquet.set(0);
|
|
||||||
}
|
}
|
||||||
ChunkStage::Frozen { representation, .. } => match representation {
|
ChunkStage::Frozen { representation, .. } => match representation {
|
||||||
ChunkStageFrozenRepr::MutableBufferSnapshot(snapshot) => {
|
ChunkStageFrozenRepr::MutableBufferSnapshot(snapshot) => {
|
||||||
self.metrics
|
self.metrics.memory_metrics.set_mub_only(snapshot.size());
|
||||||
.memory_metrics
|
self.metrics.row_count.set_mub_only(snapshot.rows());
|
||||||
.mutable_buffer
|
self.metrics.chunk_storage.set_mub_only(1);
|
||||||
.set(snapshot.size());
|
|
||||||
self.metrics.memory_metrics.read_buffer.set(0);
|
|
||||||
self.metrics.memory_metrics.parquet.set(0);
|
|
||||||
}
|
}
|
||||||
ChunkStageFrozenRepr::ReadBuffer(rb_chunk) => {
|
ChunkStageFrozenRepr::ReadBuffer(rb_chunk) => {
|
||||||
self.metrics.memory_metrics.mutable_buffer.set(0);
|
self.metrics.memory_metrics.set_rub_only(rb_chunk.size());
|
||||||
self.metrics.memory_metrics.read_buffer.set(rb_chunk.size());
|
self.metrics
|
||||||
self.metrics.memory_metrics.parquet.set(0);
|
.row_count
|
||||||
|
.set_rub_only(rb_chunk.rows() as usize);
|
||||||
|
self.metrics.chunk_storage.set_rub_only(1);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ChunkStage::Persisted {
|
ChunkStage::Persisted {
|
||||||
parquet,
|
parquet,
|
||||||
read_buffer,
|
read_buffer: Some(read_buffer),
|
||||||
..
|
..
|
||||||
} => {
|
} => {
|
||||||
let rub_size = read_buffer.as_ref().map(|x| x.size()).unwrap_or(0);
|
self.metrics
|
||||||
|
.memory_metrics
|
||||||
self.metrics.memory_metrics.mutable_buffer.set(0);
|
.set_rub_and_object_store_only(read_buffer.size(), parquet.size());
|
||||||
self.metrics.memory_metrics.read_buffer.set(rub_size);
|
self.metrics
|
||||||
self.metrics.memory_metrics.parquet.set(parquet.size());
|
.row_count
|
||||||
|
.set_rub_and_object_store_only(read_buffer.rows() as usize, parquet.rows());
|
||||||
|
self.metrics
|
||||||
|
.chunk_storage
|
||||||
|
.set_rub_and_object_store_only(1, 1);
|
||||||
|
}
|
||||||
|
ChunkStage::Persisted { parquet, .. } => {
|
||||||
|
self.metrics
|
||||||
|
.memory_metrics
|
||||||
|
.set_object_store_only(parquet.size());
|
||||||
|
self.metrics.row_count.set_object_store_only(parquet.rows());
|
||||||
|
self.metrics.chunk_storage.set_object_store_only(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -446,7 +462,7 @@ impl CatalogChunk {
|
||||||
self.time_of_first_write = Some(now);
|
self.time_of_first_write = Some(now);
|
||||||
}
|
}
|
||||||
self.time_of_last_write = Some(now);
|
self.time_of_last_write = Some(now);
|
||||||
self.update_memory_metrics();
|
self.update_metrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the storage and the number of rows
|
/// Returns the storage and the number of rows
|
||||||
|
@ -622,7 +638,7 @@ impl CatalogChunk {
|
||||||
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(Arc::clone(&s)),
|
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(Arc::clone(&s)),
|
||||||
meta: Arc::new(metadata),
|
meta: Arc::new(metadata),
|
||||||
};
|
};
|
||||||
self.update_memory_metrics();
|
self.update_metrics();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -717,7 +733,7 @@ impl CatalogChunk {
|
||||||
&[KeyValue::new("state", "moved")],
|
&[KeyValue::new("state", "moved")],
|
||||||
);
|
);
|
||||||
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
|
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
|
||||||
self.update_memory_metrics();
|
self.update_metrics();
|
||||||
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
|
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -796,7 +812,7 @@ impl CatalogChunk {
|
||||||
parquet: chunk,
|
parquet: chunk,
|
||||||
read_buffer: Some(db),
|
read_buffer: Some(db),
|
||||||
};
|
};
|
||||||
self.update_memory_metrics();
|
self.update_metrics();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -829,7 +845,7 @@ impl CatalogChunk {
|
||||||
&[KeyValue::new("state", "os")],
|
&[KeyValue::new("state", "os")],
|
||||||
);
|
);
|
||||||
|
|
||||||
self.update_memory_metrics();
|
self.update_metrics();
|
||||||
|
|
||||||
Ok(rub_chunk)
|
Ok(rub_chunk)
|
||||||
} else {
|
} else {
|
||||||
|
@ -857,7 +873,7 @@ impl CatalogChunk {
|
||||||
// set memory metrics to 0 to stop accounting for this chunk within the catalog
|
// set memory metrics to 0 to stop accounting for this chunk within the catalog
|
||||||
self.metrics.memory_metrics.mutable_buffer.set(0);
|
self.metrics.memory_metrics.mutable_buffer.set(0);
|
||||||
self.metrics.memory_metrics.read_buffer.set(0);
|
self.metrics.memory_metrics.read_buffer.set(0);
|
||||||
self.metrics.memory_metrics.parquet.set(0);
|
self.metrics.memory_metrics.object_store.set(0);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -915,7 +931,7 @@ impl CatalogChunk {
|
||||||
// actions correctly. When clearing out that action, we need to restore the pre-action state. The easiest
|
// actions correctly. When clearing out that action, we need to restore the pre-action state. The easiest
|
||||||
// (and stateless) way to to do that is just to call the update method. Since clearing lifecycle actions
|
// (and stateless) way to to do that is just to call the update method. Since clearing lifecycle actions
|
||||||
// should be a rather rare event, the cost of this is negligible.
|
// should be a rather rare event, the cost of this is negligible.
|
||||||
self.update_memory_metrics();
|
self.update_metrics();
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::db::catalog::chunk::ChunkMetrics;
|
use crate::db::catalog::chunk::ChunkMetrics;
|
||||||
use metrics::{Counter, GaugeValue, Histogram, KeyValue};
|
use metrics::{Counter, Gauge, GaugeValue, Histogram, KeyValue};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracker::{LockTracker, RwLock};
|
use tracker::{LockTracker, RwLock};
|
||||||
|
|
||||||
|
@ -9,19 +9,25 @@ pub struct CatalogMetrics {
|
||||||
metrics_domain: Arc<metrics::Domain>,
|
metrics_domain: Arc<metrics::Domain>,
|
||||||
|
|
||||||
/// Catalog memory metrics
|
/// Catalog memory metrics
|
||||||
memory_metrics: MemoryMetrics,
|
memory_metrics: StorageGauge,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CatalogMetrics {
|
impl CatalogMetrics {
|
||||||
pub fn new(metrics_domain: metrics::Domain) -> Self {
|
pub fn new(metrics_domain: metrics::Domain) -> Self {
|
||||||
|
let chunks_mem_usage = metrics_domain.register_gauge_metric(
|
||||||
|
"chunks_mem_usage",
|
||||||
|
Some("bytes"),
|
||||||
|
"Memory usage by catalog chunks",
|
||||||
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
memory_metrics: MemoryMetrics::new(&metrics_domain),
|
memory_metrics: StorageGauge::new(&chunks_mem_usage),
|
||||||
metrics_domain: Arc::new(metrics_domain),
|
metrics_domain: Arc::new(metrics_domain),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the memory metrics for the catalog
|
/// Returns the memory metrics for the catalog
|
||||||
pub fn memory(&self) -> &MemoryMetrics {
|
pub fn memory(&self) -> &StorageGauge {
|
||||||
&self.memory_metrics
|
&self.memory_metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,8 +62,24 @@ impl CatalogMetrics {
|
||||||
&chunk_lock_tracker,
|
&chunk_lock_tracker,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let storage_gauge = self.metrics_domain.register_gauge_metric_with_labels(
|
||||||
|
"loaded",
|
||||||
|
Some("chunks"),
|
||||||
|
"The number of chunks loaded in a each chunk storage location",
|
||||||
|
&[KeyValue::new("table", table_name.to_string())],
|
||||||
|
);
|
||||||
|
|
||||||
|
let row_gauge = self.metrics_domain.register_gauge_metric_with_labels(
|
||||||
|
"loaded",
|
||||||
|
Some("rows"),
|
||||||
|
"The number of rows loaded in each chunk storage location",
|
||||||
|
&[KeyValue::new("table", table_name.to_string())],
|
||||||
|
);
|
||||||
|
|
||||||
TableMetrics {
|
TableMetrics {
|
||||||
metrics_domain: Arc::clone(&self.metrics_domain),
|
metrics_domain: Arc::clone(&self.metrics_domain),
|
||||||
|
chunk_storage: StorageGauge::new(&storage_gauge),
|
||||||
|
row_count: StorageGauge::new(&row_gauge),
|
||||||
memory_metrics: self.memory_metrics.clone_empty(),
|
memory_metrics: self.memory_metrics.clone_empty(),
|
||||||
table_lock_tracker,
|
table_lock_tracker,
|
||||||
partition_lock_tracker,
|
partition_lock_tracker,
|
||||||
|
@ -71,8 +93,14 @@ pub struct TableMetrics {
|
||||||
/// Metrics domain
|
/// Metrics domain
|
||||||
metrics_domain: Arc<metrics::Domain>,
|
metrics_domain: Arc<metrics::Domain>,
|
||||||
|
|
||||||
|
/// Chunk storage metrics
|
||||||
|
chunk_storage: StorageGauge,
|
||||||
|
|
||||||
|
/// Chunk row count metrics
|
||||||
|
row_count: StorageGauge,
|
||||||
|
|
||||||
/// Catalog memory metrics
|
/// Catalog memory metrics
|
||||||
memory_metrics: MemoryMetrics,
|
memory_metrics: StorageGauge,
|
||||||
|
|
||||||
/// Lock tracker for table-level locks
|
/// Lock tracker for table-level locks
|
||||||
table_lock_tracker: LockTracker,
|
table_lock_tracker: LockTracker,
|
||||||
|
@ -96,6 +124,8 @@ impl TableMetrics {
|
||||||
pub(super) fn new_partition_metrics(&self) -> PartitionMetrics {
|
pub(super) fn new_partition_metrics(&self) -> PartitionMetrics {
|
||||||
// Lock tracker for chunk-level locks
|
// Lock tracker for chunk-level locks
|
||||||
PartitionMetrics {
|
PartitionMetrics {
|
||||||
|
chunk_storage: self.chunk_storage.clone_empty(),
|
||||||
|
row_count: self.row_count.clone_empty(),
|
||||||
memory_metrics: self.memory_metrics.clone_empty(),
|
memory_metrics: self.memory_metrics.clone_empty(),
|
||||||
chunk_state: self.metrics_domain.register_counter_metric_with_labels(
|
chunk_state: self.metrics_domain.register_counter_metric_with_labels(
|
||||||
"chunks",
|
"chunks",
|
||||||
|
@ -119,8 +149,14 @@ impl TableMetrics {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PartitionMetrics {
|
pub struct PartitionMetrics {
|
||||||
|
/// Chunk storage metrics
|
||||||
|
chunk_storage: StorageGauge,
|
||||||
|
|
||||||
|
/// Chunk row count metrics
|
||||||
|
row_count: StorageGauge,
|
||||||
|
|
||||||
/// Catalog memory metrics
|
/// Catalog memory metrics
|
||||||
memory_metrics: MemoryMetrics,
|
memory_metrics: StorageGauge,
|
||||||
|
|
||||||
chunk_state: Counter,
|
chunk_state: Counter,
|
||||||
|
|
||||||
|
@ -139,65 +175,92 @@ impl PartitionMetrics {
|
||||||
ChunkMetrics {
|
ChunkMetrics {
|
||||||
state: self.chunk_state.clone(),
|
state: self.chunk_state.clone(),
|
||||||
immutable_chunk_size: self.immutable_chunk_size.clone(),
|
immutable_chunk_size: self.immutable_chunk_size.clone(),
|
||||||
|
chunk_storage: self.chunk_storage.clone_empty(),
|
||||||
|
row_count: self.row_count.clone_empty(),
|
||||||
memory_metrics: self.memory_metrics.clone_empty(),
|
memory_metrics: self.memory_metrics.clone_empty(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Created from a `metrics::Gauge` and extracts a `GaugeValue` for each chunk storage
|
||||||
|
///
|
||||||
|
/// This can then be used within each `CatalogChunk` to record its observations for
|
||||||
|
/// the different storages
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct MemoryMetrics {
|
pub struct StorageGauge {
|
||||||
pub(super) mutable_buffer: GaugeValue,
|
pub(super) mutable_buffer: GaugeValue,
|
||||||
pub(super) read_buffer: GaugeValue,
|
pub(super) read_buffer: GaugeValue,
|
||||||
pub(super) parquet: GaugeValue,
|
pub(super) object_store: GaugeValue,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MemoryMetrics {
|
impl StorageGauge {
|
||||||
pub fn new_unregistered() -> Self {
|
pub(super) fn new_unregistered() -> Self {
|
||||||
Self {
|
Self {
|
||||||
mutable_buffer: GaugeValue::new_unregistered(),
|
mutable_buffer: GaugeValue::new_unregistered(),
|
||||||
read_buffer: GaugeValue::new_unregistered(),
|
read_buffer: GaugeValue::new_unregistered(),
|
||||||
parquet: GaugeValue::new_unregistered(),
|
object_store: GaugeValue::new_unregistered(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(metrics_domain: &metrics::Domain) -> Self {
|
pub(super) fn new(gauge: &Gauge) -> Self {
|
||||||
let gauge = metrics_domain.register_gauge_metric(
|
|
||||||
"chunks_mem_usage",
|
|
||||||
Some("bytes"),
|
|
||||||
"Memory usage by catalog chunks",
|
|
||||||
);
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
mutable_buffer: gauge.gauge_value(&[KeyValue::new("source", "mutable_buffer")]),
|
mutable_buffer: gauge.gauge_value(&[KeyValue::new("location", "mutable_buffer")]),
|
||||||
read_buffer: gauge.gauge_value(&[KeyValue::new("source", "read_buffer")]),
|
read_buffer: gauge.gauge_value(&[KeyValue::new("location", "read_buffer")]),
|
||||||
parquet: gauge.gauge_value(&[KeyValue::new("source", "parquet")]),
|
object_store: gauge.gauge_value(&[KeyValue::new("location", "object_store")]),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) fn set_mub_only(&mut self, value: usize) {
|
||||||
|
self.mutable_buffer.set(value);
|
||||||
|
self.read_buffer.set(0);
|
||||||
|
self.object_store.set(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn set_rub_only(&mut self, value: usize) {
|
||||||
|
self.mutable_buffer.set(0);
|
||||||
|
self.read_buffer.set(value);
|
||||||
|
self.object_store.set(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn set_rub_and_object_store_only(&mut self, rub: usize, parquet: usize) {
|
||||||
|
self.mutable_buffer.set(0);
|
||||||
|
self.read_buffer.set(rub);
|
||||||
|
self.object_store.set(parquet);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn set_object_store_only(&mut self, value: usize) {
|
||||||
|
self.mutable_buffer.set(0);
|
||||||
|
self.read_buffer.set(0);
|
||||||
|
self.object_store.set(value);
|
||||||
|
}
|
||||||
|
|
||||||
fn clone_empty(&self) -> Self {
|
fn clone_empty(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
mutable_buffer: self.mutable_buffer.clone_empty(),
|
mutable_buffer: self.mutable_buffer.clone_empty(),
|
||||||
read_buffer: self.read_buffer.clone_empty(),
|
read_buffer: self.read_buffer.clone_empty(),
|
||||||
parquet: self.parquet.clone_empty(),
|
object_store: self.object_store.clone_empty(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Returns the size of the mutable buffer
|
|
||||||
|
/// Returns the total for the mutable buffer
|
||||||
pub fn mutable_buffer(&self) -> usize {
|
pub fn mutable_buffer(&self) -> usize {
|
||||||
self.mutable_buffer.get_total()
|
self.mutable_buffer.get_total()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the size of the mutable buffer
|
/// Returns the total for the read buffer
|
||||||
pub fn read_buffer(&self) -> usize {
|
pub fn read_buffer(&self) -> usize {
|
||||||
self.read_buffer.get_total()
|
self.read_buffer.get_total()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the amount of data in parquet
|
/// Returns the total for object storage
|
||||||
pub fn parquet(&self) -> usize {
|
pub fn object_store(&self) -> usize {
|
||||||
self.parquet.get_total()
|
self.object_store.get_total()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Total bytes over all registries.
|
/// Returns the total over all storages
|
||||||
pub fn total(&self) -> usize {
|
pub fn total(&self) -> usize {
|
||||||
self.mutable_buffer.get_total() + self.read_buffer.get_total() + self.parquet.get_total()
|
self.mutable_buffer.get_total()
|
||||||
|
+ self.read_buffer.get_total()
|
||||||
|
+ self.object_store.get_total()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue