refactor: track memory metrics in catalog (#1995)

* refactor: track memory metrics in catalog

* chore: update comment
pull/24376/head
Raphael Taylor-Davies 2021-07-14 17:23:00 +01:00 committed by GitHub
parent cedd6269c7
commit 1d00fa2fd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 142 additions and 114 deletions

View File

@ -207,10 +207,10 @@ impl GaugeValue {
/// Sets the local value for this GaugeValue /// Sets the local value for this GaugeValue
pub fn set(&mut self, new: usize) { pub fn set(&mut self, new: usize) {
if new > self.local { match new.cmp(&self.local) {
self.inc(new - self.local) std::cmp::Ordering::Less => self.decr(self.local - new),
} else { std::cmp::Ordering::Equal => {}
self.decr(self.local - new) std::cmp::Ordering::Greater => self.inc(new - self.local),
} }
} }
} }

View File

@ -11,7 +11,6 @@ use internal_types::{
schema::{builder::SchemaBuilder, InfluxColumnType, Schema}, schema::{builder::SchemaBuilder, InfluxColumnType, Schema},
selection::Selection, selection::Selection,
}; };
use metrics::GaugeValue;
use parking_lot::Mutex; use parking_lot::Mutex;
use snafu::{ensure, OptionExt, ResultExt, Snafu}; use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{collections::BTreeSet, sync::Arc}; use std::{collections::BTreeSet, sync::Arc};
@ -48,9 +47,9 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)] #[derive(Debug)]
#[allow(missing_copy_implementations)]
pub struct ChunkMetrics { pub struct ChunkMetrics {
/// keep track of memory used by chunk // Placeholder
memory_bytes: GaugeValue,
} }
impl ChunkMetrics { impl ChunkMetrics {
@ -59,13 +58,11 @@ impl ChunkMetrics {
/// will therefore not be visible to other ChunkMetrics instances or metric instruments /// will therefore not be visible to other ChunkMetrics instances or metric instruments
/// created on a metrics domain, and vice versa /// created on a metrics domain, and vice versa
pub fn new_unregistered() -> Self { pub fn new_unregistered() -> Self {
Self { Self {}
memory_bytes: GaugeValue::new_unregistered(),
}
} }
pub fn new(_metrics: &metrics::Domain, memory_bytes: GaugeValue) -> Self { pub fn new(_metrics: &metrics::Domain) -> Self {
Self { memory_bytes } Self {}
} }
} }
@ -122,8 +119,6 @@ impl MBChunk {
let columns = batch.columns(); let columns = batch.columns();
chunk.write_columns(sequence, columns)?; chunk.write_columns(sequence, columns)?;
chunk.metrics.memory_bytes.set(chunk.size());
Ok(chunk) Ok(chunk)
} }
@ -151,7 +146,6 @@ impl MBChunk {
.try_lock() .try_lock()
.expect("concurrent readers/writers to MBChunk") = None; .expect("concurrent readers/writers to MBChunk") = None;
self.metrics.memory_bytes.set(self.size());
self.time_of_last_write = Utc::now(); self.time_of_last_write = Utc::now();
Ok(()) Ok(())
@ -165,10 +159,7 @@ impl MBChunk {
return Arc::clone(snapshot); return Arc::clone(snapshot);
} }
let snapshot = Arc::new(ChunkSnapshot::new( let snapshot = Arc::new(ChunkSnapshot::new(self));
self,
self.metrics.memory_bytes.clone_empty(),
));
*guard = Some(Arc::clone(&snapshot)); *guard = Some(Arc::clone(&snapshot));
snapshot snapshot
} }

View File

@ -35,11 +35,10 @@ pub struct ChunkSnapshot {
batch: RecordBatch, batch: RecordBatch,
table_name: Arc<str>, table_name: Arc<str>,
stats: Vec<ColumnSummary>, stats: Vec<ColumnSummary>,
memory: metrics::GaugeValue,
} }
impl ChunkSnapshot { impl ChunkSnapshot {
pub(crate) fn new(chunk: &MBChunk, memory: metrics::GaugeValue) -> Self { pub(crate) fn new(chunk: &MBChunk) -> Self {
let schema = chunk let schema = chunk
.schema(Selection::All) .schema(Selection::All)
.log_if_error("ChunkSnapshot getting table schema") .log_if_error("ChunkSnapshot getting table schema")
@ -52,15 +51,12 @@ impl ChunkSnapshot {
let summary = chunk.table_summary(); let summary = chunk.table_summary();
let mut s = Self { Self {
schema: Arc::new(schema), schema: Arc::new(schema),
batch, batch,
table_name: Arc::clone(&chunk.table_name), table_name: Arc::clone(&chunk.table_name),
stats: summary.columns, stats: summary.columns,
memory, }
};
s.memory.set(s.size());
s
} }
/// Return Schema for all columns in this snapshot /// Return Schema for all columns in this snapshot

View File

@ -14,7 +14,6 @@ use internal_types::{
use object_store::{path::Path, ObjectStore}; use object_store::{path::Path, ObjectStore};
use query::predicate::Predicate; use query::predicate::Predicate;
use metrics::GaugeValue;
use std::mem; use std::mem;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
@ -61,9 +60,9 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)] #[derive(Debug)]
#[allow(missing_copy_implementations)]
pub struct ChunkMetrics { pub struct ChunkMetrics {
/// keep track of memory used by chunk // Placeholder
memory_bytes: GaugeValue,
} }
impl ChunkMetrics { impl ChunkMetrics {
@ -72,13 +71,11 @@ impl ChunkMetrics {
/// will therefore not be visible to other ChunkMetrics instances or metric instruments /// will therefore not be visible to other ChunkMetrics instances or metric instruments
/// created on a metrics domain, and vice versa /// created on a metrics domain, and vice versa
pub fn new_unregistered() -> Self { pub fn new_unregistered() -> Self {
Self { Self {}
memory_bytes: GaugeValue::new_unregistered(),
}
} }
pub fn new(_metrics: &metrics::Domain, memory_bytes: GaugeValue) -> Self { pub fn new(_metrics: &metrics::Domain) -> Self {
Self { memory_bytes } Self {}
} }
} }
@ -156,7 +153,7 @@ impl ParquetChunk {
) -> Self { ) -> Self {
let timestamp_range = extract_range(&table_summary); let timestamp_range = extract_range(&table_summary);
let mut chunk = Self { Self {
partition_key, partition_key,
table_summary, table_summary,
schema, schema,
@ -165,10 +162,7 @@ impl ParquetChunk {
object_store_path: file_location, object_store_path: file_location,
parquet_metadata, parquet_metadata,
metrics, metrics,
}; }
chunk.metrics.memory_bytes.set(chunk.size());
chunk
} }
/// Return the chunk's partition key /// Return the chunk's partition key

View File

@ -3,7 +3,7 @@ use std::{
convert::TryFrom, convert::TryFrom,
}; };
use metrics::{Gauge, GaugeValue, KeyValue}; use metrics::{Gauge, KeyValue};
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
@ -100,10 +100,6 @@ impl Chunk {
self.table.add_row_group(row_group); self.table.add_row_group(row_group);
// Get and set new size of chunk on memory tracker
let size = Self::base_size() + self.table.size();
self.metrics.memory_bytes.set(size);
// update column metrics associated with column storage // update column metrics associated with column storage
self.metrics self.metrics
.update_column_storage_statistics(&storage_statistics); .update_column_storage_statistics(&storage_statistics);
@ -325,9 +321,6 @@ impl std::fmt::Debug for Chunk {
#[derive(Debug)] #[derive(Debug)]
pub struct ChunkMetrics { pub struct ChunkMetrics {
/// keep track of memory used by table data in chunk
memory_bytes: GaugeValue,
// This metric tracks the total number of columns in read buffer. // This metric tracks the total number of columns in read buffer.
columns_total: Gauge, columns_total: Gauge,
@ -345,9 +338,8 @@ pub struct ChunkMetrics {
} }
impl ChunkMetrics { impl ChunkMetrics {
pub fn new(domain: &metrics::Domain, memory_bytes: GaugeValue) -> Self { pub fn new(domain: &metrics::Domain) -> Self {
Self { Self {
memory_bytes,
columns_total: domain.register_gauge_metric( columns_total: domain.register_gauge_metric(
"column", "column",
Some("total"), Some("total"),
@ -377,7 +369,6 @@ impl ChunkMetrics {
/// created on a metrics domain, and vice versa /// created on a metrics domain, and vice versa
pub fn new_unregistered() -> Self { pub fn new_unregistered() -> Self {
Self { Self {
memory_bytes: GaugeValue::new_unregistered(),
columns_total: Gauge::new_unregistered(), columns_total: Gauge::new_unregistered(),
column_values_total: Gauge::new_unregistered(), column_values_total: Gauge::new_unregistered(),
column_bytes_total: Gauge::new_unregistered(), column_bytes_total: Gauge::new_unregistered(),
@ -633,10 +624,7 @@ mod test {
let domain = let domain =
registry.register_domain_with_labels("read_buffer", vec![KeyValue::new("db", "mydb")]); registry.register_domain_with_labels("read_buffer", vec![KeyValue::new("db", "mydb")]);
let mut chunk = Chunk::new( let mut chunk = Chunk::new("a_table", ChunkMetrics::new(&domain));
"a_table",
ChunkMetrics::new(&domain, GaugeValue::new_unregistered()),
);
// Add a new table to the chunk. // Add a new table to the chunk.
chunk.upsert_table(gen_recordbatch()); chunk.upsert_table(gen_recordbatch());

View File

@ -775,10 +775,7 @@ impl Db {
self.metric_labels.clone(), self.metric_labels.clone(),
); );
let chunk_result = MBChunk::new( let chunk_result = MBChunk::new(
MutableBufferChunkMetrics::new( MutableBufferChunkMetrics::new(&metrics),
&metrics,
self.catalog.metrics().memory().mutable_buffer(),
),
sequence, sequence,
table_batch, table_batch,
) )
@ -1345,7 +1342,7 @@ mod tests {
.eq(1.0) .eq(1.0)
.unwrap(); .unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1255) catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1239)
.unwrap(); .unwrap();
db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0) db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
@ -1367,7 +1364,7 @@ mod tests {
// verify chunk size updated (chunk moved from closing to moving to moved) // verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap(); catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
let expected_read_buffer_size = 1484; let expected_read_buffer_size = 1468;
catalog_chunk_size_bytes_metric_eq( catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry, &test_db.metric_registry,
"read_buffer", "read_buffer",
@ -1392,7 +1389,7 @@ mod tests {
.eq(1.0) .eq(1.0)
.unwrap(); .unwrap();
let expected_parquet_size = 655; let expected_parquet_size = 639;
catalog_chunk_size_bytes_metric_eq( catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry, &test_db.metric_registry,
"read_buffer", "read_buffer",
@ -1566,7 +1563,7 @@ mod tests {
.unwrap(); .unwrap();
// verify chunk size updated (chunk moved from moved to writing to written) // verify chunk size updated (chunk moved from moved to writing to written)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1486).unwrap(); catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1470).unwrap();
// drop, the chunk from the read buffer // drop, the chunk from the read buffer
db.drop_chunk("cpu", partition_key, mb_chunk.id()).unwrap(); db.drop_chunk("cpu", partition_key, mb_chunk.id()).unwrap();
@ -1575,8 +1572,8 @@ mod tests {
vec![] as Vec<u32> vec![] as Vec<u32>
); );
// verify size is reported until chunk dropped // verify size is not accounted even though a reference to the RubChunk still exists
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1486).unwrap(); catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap();
std::mem::drop(rb_chunk); std::mem::drop(rb_chunk);
// verify chunk size updated (chunk dropped from moved state) // verify chunk size updated (chunk dropped from moved state)
@ -1694,7 +1691,7 @@ mod tests {
("svr_id", "1"), ("svr_id", "1"),
]) ])
.histogram() .histogram()
.sample_sum_eq(3042.0) .sample_sum_eq(3026.0)
.unwrap(); .unwrap();
let rb = collect_read_filter(&rb_chunk).await; let rb = collect_read_filter(&rb_chunk).await;
@ -1796,7 +1793,7 @@ mod tests {
("svr_id", "10"), ("svr_id", "10"),
]) ])
.histogram() .histogram()
.sample_sum_eq(2141.0) .sample_sum_eq(2109.0)
.unwrap(); .unwrap();
// it should be the same chunk! // it should be the same chunk!
@ -1904,7 +1901,7 @@ mod tests {
("svr_id", "10"), ("svr_id", "10"),
]) ])
.histogram() .histogram()
.sample_sum_eq(2141.0) .sample_sum_eq(2109.0)
.unwrap(); .unwrap();
// Unload RB chunk but keep it in OS // Unload RB chunk but keep it in OS
@ -1931,7 +1928,7 @@ mod tests {
("svr_id", "10"), ("svr_id", "10"),
]) ])
.histogram() .histogram()
.sample_sum_eq(655.0) .sample_sum_eq(639.0)
.unwrap(); .unwrap();
// Verify data written to the parquet file in object store // Verify data written to the parquet file in object store
@ -2275,10 +2272,7 @@ mod tests {
.map(|x| x.estimated_bytes) .map(|x| x.estimated_bytes)
.sum(); .sum();
assert_eq!( assert_eq!(db.catalog.metrics().memory().mutable_buffer(), size);
db.catalog.metrics().memory().mutable_buffer().get_total(),
size
);
assert_eq!( assert_eq!(
expected, chunk_summaries, expected, chunk_summaries,
@ -2376,7 +2370,7 @@ mod tests {
0, 0,
ChunkStorage::ReadBufferAndObjectStore, ChunkStorage::ReadBufferAndObjectStore,
lifecycle_action, lifecycle_action,
2139, // size of RB and OS chunks 2107, // size of RB and OS chunks
1, 1,
), ),
ChunkSummary::new_without_timestamps( ChunkSummary::new_without_timestamps(
@ -2394,7 +2388,7 @@ mod tests {
0, 0,
ChunkStorage::ClosedMutableBuffer, ChunkStorage::ClosedMutableBuffer,
lifecycle_action, lifecycle_action,
2414, 2398,
1, 1,
), ),
ChunkSummary::new_without_timestamps( ChunkSummary::new_without_timestamps(
@ -2415,14 +2409,11 @@ mod tests {
); );
assert_eq!( assert_eq!(
db.catalog.metrics().memory().mutable_buffer().get_total(), db.catalog.metrics().memory().mutable_buffer(),
64 + 2414 + 87 64 + 2398 + 87
); );
assert_eq!( assert_eq!(db.catalog.metrics().memory().read_buffer(), 1468);
db.catalog.metrics().memory().read_buffer().get_total(), assert_eq!(db.catalog.metrics().memory().parquet(), 639);
1484
);
assert_eq!(db.catalog.metrics().memory().parquet().get_total(), 655);
} }
#[tokio::test] #[tokio::test]

View File

@ -1,3 +1,4 @@
use crate::db::catalog::metrics::MemoryMetrics;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::{ use data_types::{
chunk_metadata::{ chunk_metadata::{
@ -220,6 +221,7 @@ macro_rules! unexpected_state {
pub struct ChunkMetrics { pub struct ChunkMetrics {
pub(super) state: Counter, pub(super) state: Counter,
pub(super) immutable_chunk_size: Histogram, pub(super) immutable_chunk_size: Histogram,
pub(super) memory_metrics: MemoryMetrics,
} }
impl ChunkMetrics { impl ChunkMetrics {
@ -231,6 +233,7 @@ impl ChunkMetrics {
Self { Self {
state: Counter::new_unregistered(), state: Counter::new_unregistered(),
immutable_chunk_size: Histogram::new_unregistered(), immutable_chunk_size: Histogram::new_unregistered(),
memory_metrics: MemoryMetrics::new_unregistered(),
} }
} }
} }
@ -255,7 +258,7 @@ impl CatalogChunk {
.state .state
.inc_with_labels(&[KeyValue::new("state", "open")]); .inc_with_labels(&[KeyValue::new("state", "open")]);
Self { let mut chunk = Self {
addr, addr,
stage, stage,
lifecycle_action: None, lifecycle_action: None,
@ -263,7 +266,9 @@ impl CatalogChunk {
time_of_first_write: Some(first_write), time_of_first_write: Some(first_write),
time_of_last_write: Some(last_write), time_of_last_write: Some(last_write),
time_closed: None, time_closed: None,
} };
chunk.update_memory_metrics();
chunk
} }
/// Creates a new RUB chunk from the provided RUB chunk and metadata /// Creates a new RUB chunk from the provided RUB chunk and metadata
@ -301,7 +306,7 @@ impl CatalogChunk {
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,
}; };
chunk.record_write(); // The creation is considered the first and only "write" chunk.update_memory_metrics();
chunk chunk
} }
@ -326,7 +331,7 @@ impl CatalogChunk {
meta, meta,
}; };
Self { let mut chunk = Self {
addr, addr,
stage, stage,
lifecycle_action: None, lifecycle_action: None,
@ -334,7 +339,9 @@ impl CatalogChunk {
time_of_first_write: None, time_of_first_write: None,
time_of_last_write: None, time_of_last_write: None,
time_closed: None, time_closed: None,
} };
chunk.update_memory_metrics();
chunk
} }
pub fn addr(&self) -> &ChunkAddr { pub fn addr(&self) -> &ChunkAddr {
@ -379,13 +386,54 @@ impl CatalogChunk {
self.time_closed self.time_closed
} }
/// Update the write timestamps for this chunk /// Updates `self.memory_metrics` to match the contents of `self.stage`
fn update_memory_metrics(&mut self) {
match &self.stage {
ChunkStage::Open { mb_chunk } => {
self.metrics
.memory_metrics
.mutable_buffer
.set(mb_chunk.size());
self.metrics.memory_metrics.read_buffer.set(0);
self.metrics.memory_metrics.parquet.set(0);
}
ChunkStage::Frozen { representation, .. } => match representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(snapshot) => {
self.metrics
.memory_metrics
.mutable_buffer
.set(snapshot.size());
self.metrics.memory_metrics.read_buffer.set(0);
self.metrics.memory_metrics.parquet.set(0);
}
ChunkStageFrozenRepr::ReadBuffer(rb_chunk) => {
self.metrics.memory_metrics.mutable_buffer.set(0);
self.metrics.memory_metrics.read_buffer.set(rb_chunk.size());
self.metrics.memory_metrics.parquet.set(0);
}
},
ChunkStage::Persisted {
parquet,
read_buffer,
..
} => {
let rub_size = read_buffer.as_ref().map(|x| x.size()).unwrap_or(0);
self.metrics.memory_metrics.mutable_buffer.set(0);
self.metrics.memory_metrics.read_buffer.set(rub_size);
self.metrics.memory_metrics.parquet.set(parquet.size());
}
}
}
/// Update the metrics for this chunk
pub fn record_write(&mut self) { pub fn record_write(&mut self) {
let now = Utc::now(); let now = Utc::now();
if self.time_of_first_write.is_none() { if self.time_of_first_write.is_none() {
self.time_of_first_write = Some(now); self.time_of_first_write = Some(now);
} }
self.time_of_last_write = Some(now); self.time_of_last_write = Some(now);
self.update_memory_metrics();
} }
/// Returns the storage and the number of rows /// Returns the storage and the number of rows
@ -542,6 +590,8 @@ impl CatalogChunk {
representation: ChunkStageFrozenRepr::MutableBufferSnapshot(Arc::clone(&s)), representation: ChunkStageFrozenRepr::MutableBufferSnapshot(Arc::clone(&s)),
meta: Arc::new(metadata), meta: Arc::new(metadata),
}; };
self.update_memory_metrics();
Ok(()) Ok(())
} }
&ChunkStage::Frozen { .. } => { &ChunkStage::Frozen { .. } => {
@ -635,6 +685,7 @@ impl CatalogChunk {
&[KeyValue::new("state", "moved")], &[KeyValue::new("state", "moved")],
); );
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk); *representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
self.update_memory_metrics();
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?; self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
Ok(()) Ok(())
} }
@ -713,6 +764,7 @@ impl CatalogChunk {
parquet: chunk, parquet: chunk,
read_buffer: Some(db), read_buffer: Some(db),
}; };
self.update_memory_metrics();
Ok(()) Ok(())
} }
} }
@ -745,6 +797,8 @@ impl CatalogChunk {
&[KeyValue::new("state", "os")], &[KeyValue::new("state", "os")],
); );
self.update_memory_metrics();
Ok(rub_chunk) Ok(rub_chunk)
} else { } else {
// TODO: do we really need to error here or should unloading an unloaded chunk // TODO: do we really need to error here or should unloading an unloaded chunk

View File

@ -8,7 +8,7 @@ pub struct CatalogMetrics {
/// Metrics domain /// Metrics domain
metrics_domain: Arc<metrics::Domain>, metrics_domain: Arc<metrics::Domain>,
/// Memory registries /// Catalog memory metrics
memory_metrics: MemoryMetrics, memory_metrics: MemoryMetrics,
} }
@ -58,6 +58,7 @@ impl CatalogMetrics {
TableMetrics { TableMetrics {
metrics_domain: Arc::clone(&self.metrics_domain), metrics_domain: Arc::clone(&self.metrics_domain),
memory_metrics: self.memory_metrics.clone_empty(),
table_lock_tracker, table_lock_tracker,
partition_lock_tracker, partition_lock_tracker,
chunk_lock_tracker, chunk_lock_tracker,
@ -70,6 +71,9 @@ pub struct TableMetrics {
/// Metrics domain /// Metrics domain
metrics_domain: Arc<metrics::Domain>, metrics_domain: Arc<metrics::Domain>,
/// Catalog memory metrics
memory_metrics: MemoryMetrics,
/// Lock tracker for table-level locks /// Lock tracker for table-level locks
table_lock_tracker: LockTracker, table_lock_tracker: LockTracker,
@ -92,6 +96,7 @@ impl TableMetrics {
pub(super) fn new_partition_metrics(&self) -> PartitionMetrics { pub(super) fn new_partition_metrics(&self) -> PartitionMetrics {
// Lock tracker for chunk-level locks // Lock tracker for chunk-level locks
PartitionMetrics { PartitionMetrics {
memory_metrics: self.memory_metrics.clone_empty(),
chunk_state: self.metrics_domain.register_counter_metric_with_labels( chunk_state: self.metrics_domain.register_counter_metric_with_labels(
"chunks", "chunks",
None, None,
@ -114,6 +119,9 @@ impl TableMetrics {
#[derive(Debug)] #[derive(Debug)]
pub struct PartitionMetrics { pub struct PartitionMetrics {
/// Catalog memory metrics
memory_metrics: MemoryMetrics,
chunk_state: Counter, chunk_state: Counter,
immutable_chunk_size: Histogram, immutable_chunk_size: Histogram,
@ -131,19 +139,28 @@ impl PartitionMetrics {
ChunkMetrics { ChunkMetrics {
state: self.chunk_state.clone(), state: self.chunk_state.clone(),
immutable_chunk_size: self.immutable_chunk_size.clone(), immutable_chunk_size: self.immutable_chunk_size.clone(),
memory_metrics: self.memory_metrics.clone_empty(),
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct MemoryMetrics { pub struct MemoryMetrics {
mutable_buffer: GaugeValue, pub(super) mutable_buffer: GaugeValue,
read_buffer: GaugeValue, pub(super) read_buffer: GaugeValue,
parquet: GaugeValue, pub(super) parquet: GaugeValue,
} }
impl MemoryMetrics { impl MemoryMetrics {
fn new(metrics_domain: &metrics::Domain) -> Self { pub fn new_unregistered() -> Self {
Self {
mutable_buffer: GaugeValue::new_unregistered(),
read_buffer: GaugeValue::new_unregistered(),
parquet: GaugeValue::new_unregistered(),
}
}
pub fn new(metrics_domain: &metrics::Domain) -> Self {
let gauge = metrics_domain.register_gauge_metric( let gauge = metrics_domain.register_gauge_metric(
"chunks_mem_usage", "chunks_mem_usage",
Some("bytes"), Some("bytes"),
@ -157,19 +174,26 @@ impl MemoryMetrics {
} }
} }
fn clone_empty(&self) -> Self {
Self {
mutable_buffer: self.mutable_buffer.clone_empty(),
read_buffer: self.read_buffer.clone_empty(),
parquet: self.parquet.clone_empty(),
}
}
/// Returns the size of the mutable buffer /// Returns the size of the mutable buffer
pub fn mutable_buffer(&self) -> GaugeValue { pub fn mutable_buffer(&self) -> usize {
self.mutable_buffer.clone_empty() self.mutable_buffer.get_total()
} }
/// Returns the size of the mutable buffer /// Returns the size of the mutable buffer
pub fn read_buffer(&self) -> GaugeValue { pub fn read_buffer(&self) -> usize {
self.read_buffer.clone_empty() self.read_buffer.get_total()
} }
/// Returns the amount of data in parquet /// Returns the amount of data in parquet
pub fn parquet(&self) -> GaugeValue { pub fn parquet(&self) -> usize {
self.parquet.clone_empty() self.parquet.get_total()
} }
/// Total bytes over all registries. /// Total bytes over all registries.

View File

@ -148,11 +148,8 @@ impl Partition {
chunk_id, chunk_id,
}; };
let chunk = Arc::new(self.metrics.new_chunk_lock(CatalogChunk::new_open( let chunk = CatalogChunk::new_open(addr, chunk, self.metrics.new_chunk_metrics());
addr, let chunk = Arc::new(self.metrics.new_chunk_lock(chunk));
chunk,
self.metrics.new_chunk_metrics(),
)));
if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() { if self.chunks.insert(chunk_id, Arc::clone(&chunk)).is_some() {
// A fundamental invariant has been violated - abort // A fundamental invariant has been violated - abort

View File

@ -371,10 +371,7 @@ fn new_rub_chunk(db: &Db, table_name: &str) -> read_buffer::RBChunk {
.metrics_registry .metrics_registry
.register_domain_with_labels("read_buffer", db.metric_labels.clone()); .register_domain_with_labels("read_buffer", db.metric_labels.clone());
read_buffer::RBChunk::new( read_buffer::RBChunk::new(table_name, read_buffer::ChunkMetrics::new(&metrics))
table_name,
read_buffer::ChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
)
} }
/// Executes a plan and collects the results into a read buffer chunk /// Executes a plan and collects the results into a read buffer chunk

View File

@ -66,10 +66,7 @@ pub(crate) fn compact_chunks(
.metrics_registry .metrics_registry
.register_domain_with_labels("read_buffer", db.metric_labels.clone()); .register_domain_with_labels("read_buffer", db.metric_labels.clone());
let mut rb_chunk = RBChunk::new( let mut rb_chunk = RBChunk::new(&table_name, ChunkMetrics::new(&metrics));
&table_name,
ChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
);
let ctx = db.exec.new_context(ExecutorType::Reorg); let ctx = db.exec.new_context(ExecutorType::Reorg);

View File

@ -123,8 +123,7 @@ pub fn write_chunk_to_object_store(
.catalog .catalog
.metrics_registry .metrics_registry
.register_domain_with_labels("parquet", db.catalog.metric_labels.clone()); .register_domain_with_labels("parquet", db.catalog.metric_labels.clone());
let metrics = let metrics = ParquetChunkMetrics::new(&metrics);
ParquetChunkMetrics::new(&metrics, db.catalog.metrics().memory().parquet());
let parquet_chunk = Arc::new( let parquet_chunk = Arc::new(
ParquetChunk::new( ParquetChunk::new(
path.clone(), path.clone(),

View File

@ -155,7 +155,7 @@ impl CatalogState for Catalog {
.metrics_registry .metrics_registry
.register_domain_with_labels("parquet", self.metric_labels.clone()); .register_domain_with_labels("parquet", self.metric_labels.clone());
let metrics = ParquetChunkMetrics::new(&metrics, self.metrics().memory().parquet()); let metrics = ParquetChunkMetrics::new(&metrics);
let parquet_chunk = ParquetChunk::new( let parquet_chunk = ParquetChunk::new(
object_store.path_from_dirs_and_filename(info.path.clone()), object_store.path_from_dirs_and_filename(info.path.clone()),
object_store, object_store,