feat: remove deprecated catalog metrics (#2489)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-09-10 11:12:04 +01:00 committed by GitHub
parent 8a1d759fb7
commit eed81e752d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 243 deletions

View File

@ -2075,7 +2075,7 @@ mod tests {
reg: &metrics::TestMetricRegistry,
location: &'static str,
v: u64,
) -> Result<(), metrics::Error> {
) {
reg.has_metric_family("catalog_chunks_mem_usage_bytes")
.with_attributes(&[
("db_name", "placeholder"),
@ -2084,6 +2084,7 @@ mod tests {
])
.gauge()
.eq(v as f64)
.unwrap()
}
#[tokio::test]
@ -2109,18 +2110,6 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10").await;
// A chunk has been opened
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.with_attributes(&[
("db_name", "placeholder"),
("state", "open"),
("svr_id", "1"),
])
.counter()
.eq(1.0)
.unwrap();
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
@ -2129,8 +2118,7 @@ mod tests {
assert_metric("catalog_loaded_rows", "object_store", 0.0);
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 700)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 700);
// write into same chunk again.
write_lp(db.as_ref(), "cpu bar=2 20").await;
@ -2139,22 +2127,9 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=5 50").await;
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 764)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 764);
// Still only one chunk open
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.with_attributes(&[
("db_name", "placeholder"),
("state", "open"),
("svr_id", "1"),
])
.counter()
.eq(1.0)
.unwrap();
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
@ -2165,18 +2140,6 @@ mod tests {
db.rollover_partition("cpu", "1970-01-01T00").await.unwrap();
// A chunk is now closed
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.with_attributes(&[
("db_name", "placeholder"),
("state", "closed"),
("svr_id", "1"),
])
.counter()
.eq(1.0)
.unwrap();
assert_metric("catalog_loaded_chunks", "mutable_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
@ -2184,26 +2147,13 @@ mod tests {
assert_metric("catalog_loaded_rows", "read_buffer", 0.0);
assert_metric("catalog_loaded_rows", "object_store", 0.0);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1295)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1295);
db.move_chunk_to_read_buffer("cpu", "1970-01-01T00", 0)
.await
.unwrap();
// A chunk is now in the read buffer
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.with_attributes(&[
("db_name", "placeholder"),
("state", "moved"),
("svr_id", "1"),
])
.counter()
.eq(1.0)
.unwrap();
assert_metric("catalog_loaded_chunks", "mutable_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
assert_metric("catalog_loaded_chunks", "object_store", 0.0);
@ -2212,14 +2162,13 @@ mod tests {
assert_metric("catalog_loaded_rows", "object_store", 0.0);
// verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0);
let expected_read_buffer_size = 1922;
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"read_buffer",
expected_read_buffer_size,
)
.unwrap();
);
db.persist_partition(
"cpu",
@ -2230,32 +2179,18 @@ mod tests {
.unwrap();
// A chunk is now in the object store and still in read buffer
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.with_attributes(&[
("db_name", "placeholder"),
("state", "rub_and_os"),
("svr_id", "1"),
])
.counter()
.eq(1.0)
.unwrap();
let expected_parquet_size = 1551;
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"read_buffer",
expected_read_buffer_size,
)
.unwrap();
);
// now also in OS
catalog_chunk_size_bytes_metric_eq(
&test_db.metric_registry,
"object_store",
expected_parquet_size,
)
.unwrap(); // TODO: #1311
);
assert_metric("catalog_loaded_chunks", "mutable_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 1.0);
@ -2267,14 +2202,6 @@ mod tests {
db.unload_read_buffer("cpu", "1970-01-01T00", 1).unwrap();
// A chunk is now now in the "os-only" state.
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.with_attributes(&[("db_name", "placeholder"), ("state", "os"), ("svr_id", "1")])
.counter()
.eq(1.0)
.unwrap();
assert_metric("catalog_loaded_chunks", "mutable_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "read_buffer", 0.0);
assert_metric("catalog_loaded_chunks", "object_store", 1.0);
@ -2287,10 +2214,9 @@ mod tests {
&test_db.metric_registry,
"object_store",
expected_parquet_size,
)
.unwrap();
);
// verify chunk size for RB has decreased
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0);
}
#[tokio::test]
@ -2457,21 +2383,22 @@ mod tests {
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
assert_batches_eq!(&expected, &batches);
// A chunk is now in the object store
// A chunk is now in the read buffer
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.has_metric_family("catalog_loaded_chunks")
.with_attributes(&[
("db_name", "placeholder"),
("state", "moved"),
("location", "read_buffer"),
("svr_id", "1"),
("table", "cpu"),
])
.counter()
.gauge()
.eq(1.0)
.unwrap();
// verify chunk size updated (chunk moved from moved to writing to written)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1916).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1916);
// drop, the chunk from the read buffer
db.drop_chunk("cpu", partition_key, mb_chunk.id())
@ -2483,11 +2410,11 @@ mod tests {
);
// verify size is not accounted even though a reference to the RubChunk still exists
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0);
std::mem::drop(rb_chunk);
// verify chunk size updated (chunk dropped from moved state)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0);
// Currently this doesn't work (as we need to teach the stores how to
// purge tables after data bas been dropped println!("running
@ -2592,6 +2519,9 @@ mod tests {
let mb = collect_read_filter(&mb_chunk).await;
// MUB chunk size
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 3607);
// With the above data, cardinality of tag2 is 2 and tag1 is 5. Hence, RUB is sorted on (tag2, tag1)
let rb_chunk = db
.move_chunk_to_read_buffer("cpu", partition_key, mb_chunk.id())
@ -2599,30 +2529,8 @@ mod tests {
.unwrap();
// MUB chunk size
test_db
.metric_registry
.has_metric_family("catalog_chunk_creation_size_bytes")
.with_attributes(&[
("db_name", "placeholder"),
("state", "closed"),
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(5085.0)
.unwrap();
// RB chunk size
test_db
.metric_registry
.has_metric_family("catalog_chunk_creation_size_bytes")
.with_attributes(&[
("db_name", "placeholder"),
("state", "moved"),
("svr_id", "1"),
])
.histogram()
.sample_sum_eq(3834.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 3834);
let rb = collect_read_filter(&rb_chunk).await;
@ -2677,16 +2585,12 @@ mod tests {
let object_store = Arc::new(ObjectStore::new_in_memory());
// Create a DB given a server id, an object store and a db name
let server_id = ServerId::try_from(10).unwrap();
let db_name = "parquet_test_db";
let test_db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
.build()
.await;
@ -2719,17 +2623,9 @@ mod tests {
.unwrap();
// Read buffer + Parquet chunk size
test_db
.metric_registry
.has_metric_family("catalog_chunk_creation_size_bytes")
.with_attributes(&[
("db_name", "parquet_test_db"),
("state", "rub_and_os"),
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(3467.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1916);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "object_store", 1551);
// while MB and RB chunk are identical, the PQ chunk is a new one (split off)
assert_eq!(mb_chunk.id(), rb_chunk.id());
@ -2783,16 +2679,12 @@ mod tests {
let object_store = Arc::new(ObjectStore::new_in_memory());
// Create a DB given a server id, an object store and a db name
let server_id = ServerId::try_from(10).unwrap();
let db_name = "unload_read_buffer_test_db";
let test_db = TestDb::builder()
.lifecycle_rules(LifecycleRules {
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.server_id(server_id)
.object_store(Arc::clone(&object_store))
.db_name(db_name)
.build()
.await;
@ -2838,17 +2730,9 @@ mod tests {
);
// Read buffer + Parquet chunk size
test_db
.metric_registry
.has_metric_family("catalog_chunk_creation_size_bytes")
.with_attributes(&[
("db_name", "unload_read_buffer_test_db"),
("state", "rub_and_os"),
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(3467.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1916);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "object_store", 1551);
// Unload RB chunk but keep it in OS
let pq_chunk = db
@ -2868,17 +2752,9 @@ mod tests {
);
// Parquet chunk size only
test_db
.metric_registry
.has_metric_family("catalog_chunk_creation_size_bytes")
.with_attributes(&[
("db_name", "unload_read_buffer_test_db"),
("state", "os"),
("svr_id", "10"),
])
.histogram()
.sample_sum_eq(1551.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 0);
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "object_store", 1551);
// Verify data written to the parquet file in object store
//

View File

@ -10,7 +10,6 @@ use data_types::{
write_summary::TimestampSummary,
};
use internal_types::{access::AccessRecorder, schema::Schema};
use metrics::{Counter, Histogram, KeyValue};
use mutable_buffer::chunk::{snapshot::ChunkSnapshot as MBChunkSnapshot, MBChunk};
use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
@ -230,8 +229,6 @@ macro_rules! unexpected_state {
#[derive(Debug)]
pub struct ChunkMetrics {
pub(super) state: Counter,
pub(super) immutable_chunk_size: Histogram,
/// Chunk storage metrics
pub(super) chunk_storage: StorageGauge,
@ -252,8 +249,6 @@ impl ChunkMetrics {
/// created on a metrics domain, and vice versa
pub fn new_unregistered() -> Self {
Self {
state: Counter::new_unregistered(),
immutable_chunk_size: Histogram::new_unregistered(),
chunk_storage: StorageGauge::new_unregistered(),
row_count: StorageGauge::new_unregistered(),
memory_metrics: StorageGauge::new_unregistered(),
@ -276,10 +271,6 @@ impl CatalogChunk {
let stage = ChunkStage::Open { mb_chunk: chunk };
metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "open")]);
let chunk = Self {
addr,
stage,
@ -314,10 +305,6 @@ impl CatalogChunk {
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
};
metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "compacted")]);
let chunk = Self {
addr,
stage,
@ -664,14 +651,6 @@ impl CatalogChunk {
self.time_closed = Some(Utc::now());
let (s, _) = mb_chunk.snapshot();
self.metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "closed")]);
self.metrics.immutable_chunk_size.observe_with_attributes(
mb_chunk.size() as f64,
&[KeyValue::new("state", "closed")],
);
// Cache table summary + schema
let metadata = ChunkMetadata {
@ -714,15 +693,6 @@ impl CatalogChunk {
let chunk = Arc::clone(repr);
self.set_lifecycle_action(ChunkLifecycleAction::Moving, registration)?;
self.metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "moving")]);
self.metrics.immutable_chunk_size.observe_with_attributes(
chunk.size() as f64,
&[KeyValue::new("state", "moving")],
);
Ok(chunk)
}
ChunkStageFrozenRepr::ReadBuffer(_) => InternalChunkState {
@ -771,14 +741,6 @@ impl CatalogChunk {
match &representation {
ChunkStageFrozenRepr::MutableBufferSnapshot(_) => {
self.metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "moved")]);
self.metrics.immutable_chunk_size.observe_with_attributes(
chunk.size() as f64,
&[KeyValue::new("state", "moved")],
);
*representation = ChunkStageFrozenRepr::ReadBuffer(chunk);
self.update_metrics();
self.finish_lifecycle_action(ChunkLifecycleAction::Moving)?;
@ -810,9 +772,6 @@ impl CatalogChunk {
match &self.stage {
ChunkStage::Frozen { .. } => {
self.set_lifecycle_action(ChunkLifecycleAction::Persisting, registration)?;
self.metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "writing_os")]);
Ok(())
}
_ => {
@ -845,15 +804,6 @@ impl CatalogChunk {
let db = Arc::clone(repr);
self.finish_lifecycle_action(ChunkLifecycleAction::Persisting)?;
self.metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "rub_and_os")]);
self.metrics.immutable_chunk_size.observe_with_attributes(
(chunk.size() + db.size()) as f64,
&[KeyValue::new("state", "rub_and_os")],
);
self.stage = ChunkStage::Persisted {
meta,
parquet: chunk,
@ -877,21 +827,8 @@ impl CatalogChunk {
pub fn set_unload_from_read_buffer(&mut self) -> Result<Arc<RBChunk>> {
match &mut self.stage {
ChunkStage::Persisted {
parquet,
read_buffer,
..
} => {
ChunkStage::Persisted { read_buffer, .. } => {
if let Some(rub_chunk) = read_buffer.take() {
self.metrics
.state
.inc_with_attributes(&[KeyValue::new("state", "os")]);
self.metrics.immutable_chunk_size.observe_with_attributes(
parquet.size() as f64,
&[KeyValue::new("state", "os")],
);
self.update_metrics();
Ok(rub_chunk)

View File

@ -5,7 +5,7 @@ use parking_lot::Mutex;
use data_types::write_summary::TimestampSummary;
use metric::{Attributes, DurationHistogram, DurationHistogramOptions};
use metrics::{Counter, Gauge, GaugeValue, Histogram, KeyValue};
use metrics::{Gauge, GaugeValue, KeyValue};
use tracker::{LockMetrics, RwLock};
use crate::db::catalog::chunk::ChunkMetrics;
@ -155,21 +155,6 @@ impl TableMetrics {
chunk_storage: self.chunk_storage.clone_empty(),
row_count: self.row_count.clone_empty(),
memory_metrics: self.memory_metrics.clone_empty(),
chunk_state: self.metrics_domain.register_counter_metric_with_attributes(
"chunks",
None,
"In-memory chunks created in various life-cycle stages",
vec![],
),
immutable_chunk_size: self
.metrics_domain
.register_histogram_metric(
"chunk_creation",
"size",
"bytes",
"The new size of an immutable chunk",
)
.init(),
chunk_lock_metrics: Arc::clone(&self.chunk_lock_metrics),
timestamp_histogram: self.timestamp_histogram.clone(),
}
@ -187,10 +172,6 @@ pub struct PartitionMetrics {
/// Catalog memory metrics
memory_metrics: StorageGauge,
chunk_state: Counter,
immutable_chunk_size: Histogram,
/// Lock metrics for chunk-level locks
chunk_lock_metrics: Arc<LockMetrics>,
@ -206,8 +187,6 @@ impl PartitionMetrics {
pub(super) fn new_chunk_metrics(&self) -> ChunkMetrics {
ChunkMetrics {
timestamp_histogram: self.timestamp_histogram.clone(),
state: self.chunk_state.clone(),
immutable_chunk_size: self.immutable_chunk_size.clone(),
chunk_storage: self.chunk_storage.clone_empty(),
row_count: self.row_count.clone_empty(),
memory_metrics: self.memory_metrics.clone_empty(),