Merge pull request #1410 from influxdata/er/feat/chunk_metrics

feat: metrics for total size of chunks across stages
pull/24376/head
kodiakhq[bot] 2021-05-04 17:47:53 +00:00 committed by GitHub
commit 9882087321
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 221 additions and 35 deletions

View File

@ -336,13 +336,36 @@ struct DbMetrics {
// various immutable stages in IOx: closed/moving in the MUB, in the RB,
// and in Parquet.
catalog_immutable_chunk_bytes: metrics::Histogram,
// Tracks the current total size in bytes of all chunks in the catalog.
// sizes are segmented by database and chunk location.
catalog_chunk_bytes: metrics::Gauge,
}
impl DbMetrics {
// updates the catalog_chunks metric with a new chunk state
fn update_chunk_state(&self, state: &ChunkState) {
self.catalog_chunks
.inc_with_labels(&[metrics::KeyValue::new("state", state.metric_label())]);
// updates the catalog_chunks metric with a new chunk state and increases the
// size metric associated with the new state.
fn update_chunk_state(
&self,
prev_state_size: Option<(&'static str, usize)>,
next_state_size: Option<(&'static str, usize)>,
) {
// Reduce bytes tracked metric for previous state
if let Some((state, size)) = prev_state_size {
let labels = vec![metrics::KeyValue::new("state", state)];
self.catalog_chunk_bytes
.sub_with_labels(size as f64, labels.as_slice());
}
// Increase next metric for next chunk state
if let Some((state, size)) = next_state_size {
let labels = vec![metrics::KeyValue::new("state", state)];
self.catalog_chunk_bytes
.add_with_labels(size as f64, labels.as_slice());
// New chunk in new state
self.catalog_chunks.inc_with_labels(labels.as_slice());
}
}
}
@ -367,15 +390,17 @@ impl Db {
let system_tables = Arc::new(system_tables);
let domain = metrics.register_domain("catalog");
let default_labels = vec![
metrics::KeyValue::new("db_name", db_name.to_string()),
metrics::KeyValue::new("svr_id", format!("{}", server_id)),
];
let db_metrics = DbMetrics {
catalog_chunks: domain.register_counter_metric_with_labels(
"chunks",
None,
"In-memory chunks created in various life-cycle stages",
vec![
metrics::KeyValue::new("db_name", db_name.to_string()),
metrics::KeyValue::new("svr_id", format!("{}", server_id)),
],
default_labels.clone(),
),
catalog_immutable_chunk_bytes: domain
.register_histogram_metric(
@ -384,11 +409,14 @@ impl Db {
"bytes",
"The new size of an immutable chunk",
)
.with_labels(vec![
metrics::KeyValue::new("db_name", db_name.to_string()),
metrics::KeyValue::new("svr_id", format!("{}", server_id)),
])
.with_labels(default_labels.clone())
.init(),
catalog_chunk_bytes: domain.register_gauge_metric_with_labels(
"chunk_size",
Some("bytes"),
"The size in bytes of all chunks",
default_labels,
),
};
Self {
rules,
@ -435,11 +463,17 @@ impl Db {
})?
{
let mut chunk = chunk.write();
let prev_chunk_state = Some((chunk.state().metric_label(), chunk.size()));
chunk.set_closed().context(RollingOverPartition {
partition_key,
table_name,
})?;
self.metrics.update_chunk_state(chunk.state());
// update metrics reflecting chunk moved to new state
self.metrics.update_chunk_state(
prev_chunk_state,
Some((chunk.state().metric_label(), chunk.size())),
);
Ok(Some(DbChunk::snapshot(&chunk)))
} else {
@ -466,7 +500,7 @@ impl Db {
let chunk_state;
{
let prev_chunk_state = {
let chunk = partition
.chunk(table_name, chunk_id)
.context(DroppingChunk {
@ -492,7 +526,8 @@ impl Db {
}
);
self.metrics.update_chunk_state(chunk.state());
// track previous state before it's dropped
(chunk.state().metric_label(), chunk.size())
};
debug!(%partition_key, %table_name, %chunk_id, %chunk_state, "dropping chunk");
@ -504,6 +539,11 @@ impl Db {
table_name,
chunk_id,
})
.map(|_| {
// update metrics reflecting chunk has been dropped
self.metrics
.update_chunk_state(Some(prev_chunk_state), None);
})
}
/// Copies a chunk in the Closed state into the ReadBuffer from
@ -542,6 +582,11 @@ impl Db {
})?
};
let prev_chunk_state = {
let chunk = chunk.read();
(chunk.state().metric_label(), chunk.size())
};
// update the catalog to say we are processing this chunk and
// then drop the lock while we do the work
let mb_chunk = {
@ -553,7 +598,7 @@ impl Db {
chunk_id,
})?
};
//
// Track the size of the newly immutable closed MUB chunk.
self.metrics
.catalog_immutable_chunk_bytes
@ -565,7 +610,15 @@ impl Db {
)],
);
self.metrics.update_chunk_state(chunk.read().state());
// chunk transitioned from open/closing to moving
{
let chunk = chunk.read();
self.metrics.update_chunk_state(
Some(prev_chunk_state),
Some((chunk.state().metric_label(), chunk.size())),
);
}
info!(%partition_key, %table_name, %chunk_id, "chunk marked MOVING, loading tables into read buffer");
let mut batches = Vec::new();
@ -594,6 +647,8 @@ impl Db {
// Relock the chunk again (nothing else should have been able
// to modify the chunk state while we were moving it
let mut chunk = chunk.write();
let prev_chunk_state = (chunk.state().metric_label(), chunk.size());
// update the catalog to say we are done processing
chunk.set_moved(Arc::new(rb_chunk)).context(LoadingChunk {
partition_key,
@ -601,7 +656,7 @@ impl Db {
chunk_id,
})?;
// Track the size of the newly immutable closed MUB chunk.
// Track the size of the newly immutable RB chunk.
self.metrics
.catalog_immutable_chunk_bytes
.observe_with_labels(
@ -612,7 +667,11 @@ impl Db {
)],
);
self.metrics.update_chunk_state(chunk.state());
// chunk transitioned from moving to moved
self.metrics.update_chunk_state(
Some(prev_chunk_state),
Some((chunk.state().metric_label(), chunk.size())),
);
debug!(%partition_key, %table_name, %chunk_id, "chunk marked MOVED. loading complete");
Ok(DbChunk::snapshot(&chunk))
@ -662,7 +721,13 @@ impl Db {
})?
};
self.metrics.update_chunk_state(chunk.read().state());
// chunk transitioned from moved to "writing", but the chunk remains in
// the read buffer so we don't want to decrease that metric.
{
let chunk = chunk.read();
self.metrics
.update_chunk_state(None, Some((chunk.state().metric_label(), chunk.size())));
}
debug!(%partition_key, %table_name, %chunk_id, "chunk marked WRITING , loading tables into object store");
// Get all tables in this chunk
@ -738,6 +803,8 @@ impl Db {
// Relock the chunk again (nothing else should have been able
// to modify the chunk state while we were moving it
let mut chunk = chunk.write();
let prev_chunk_state = (chunk.state().metric_label(), chunk.size());
// update the catalog to say we are done processing
let parquet_chunk = Arc::clone(&Arc::new(parquet_chunk));
chunk
@ -759,8 +826,29 @@ impl Db {
)],
);
self.metrics.update_chunk_state(chunk.state());
debug!(%partition_key, %table_name, %chunk_id, "chunk marked MOVED. Persisting to object store complete");
// TODO(edd): metric updates here are brittle. Come up with a better
// solution.
// Reduce size of "written" chunk bytes
self.metrics.catalog_chunk_bytes.sub_with_labels(
prev_chunk_state.1 as f64,
&[metrics::KeyValue::new("state", prev_chunk_state.0)],
);
// Increase size of chunk in "os" state.
self.metrics.catalog_chunk_bytes.add_with_labels(
chunk.size() as f64,
&[metrics::KeyValue::new("state", "os")],
);
// Track a new chunk in "rub_and_os" state
self.metrics
.catalog_chunks
.inc_with_labels(&[metrics::KeyValue::new(
"state",
chunk.state().metric_label(),
)]);
debug!(%partition_key, %table_name, %chunk_id, "chunk marked WRITTEN. Persisting to object store complete");
// We know this chunk is ParquetFile type
Ok(DbChunk::parquet_file_snapshot(&chunk))
@ -798,15 +886,32 @@ impl Db {
// update the catalog to no longer use read buffer chunk if any
let mut chunk = chunk.write();
chunk
.set_unload_from_read_buffer()
.context(UnloadingChunkFromReadBuffer {
partition_key,
table_name,
chunk_id,
})?;
let rb_chunk = {
chunk
.set_unload_from_read_buffer()
.context(UnloadingChunkFromReadBuffer {
partition_key,
table_name,
chunk_id,
})?
};
// TODO(edd): metric updates here are brittle. Come up with a better
// solution.
// Reduce size of "moved" chunk bytes (in read buffer)
self.metrics.catalog_chunk_bytes.sub_with_labels(
rb_chunk.size() as f64,
&[metrics::KeyValue::new("state", "moved")],
);
// Track a new chunk in "os"-only state
self.metrics
.catalog_chunks
.inc_with_labels(&[metrics::KeyValue::new(
"state",
chunk.state().metric_label(),
)]);
// Track the size of the newly immutable closed MUB chunk.
self.metrics
.catalog_immutable_chunk_bytes
.observe_with_labels(
@ -816,7 +921,7 @@ impl Db {
chunk.state().metric_label(),
)],
);
self.metrics.update_chunk_state(chunk.state());
debug!(%partition_key, %table_name, %chunk_id, "chunk marked UNLOADED from read buffer");
Ok(DbChunk::snapshot(&chunk))
@ -1029,6 +1134,15 @@ impl Db {
chunk_id,
})?;
// set new size of chunk
self.metrics.catalog_chunk_bytes.set_with_labels(
chunk.size() as f64,
&[metrics::KeyValue::new(
"state",
chunk.state().metric_label(),
)],
);
check_chunk_closed(chunk, mutable_size_threshold, &self.metrics);
}
None => {
@ -1040,7 +1154,15 @@ impl Db {
self.memory_registries.mutable_buffer.as_ref(),
)
.context(OpenEntry { partition_key })?;
self.metrics.update_chunk_state(new_chunk.read().state()); // track new chunk
{
// track new open chunk
let chunk = new_chunk.read();
self.metrics.update_chunk_state(
None,
Some((chunk.state().metric_label(), chunk.size())),
);
}
check_chunk_closed(
new_chunk.write(),
@ -1063,13 +1185,18 @@ fn check_chunk_closed(
mutable_size_threshold: Option<NonZeroUsize>,
metrics: &DbMetrics,
) {
let prev_chunk_state = (chunk.state().metric_label(), chunk.size());
if let Some(threshold) = mutable_size_threshold {
if let Ok(mb_chunk) = chunk.mutable_buffer() {
let size = mb_chunk.size();
if size > threshold.get() {
chunk.set_closed().expect("cannot close open chunk");
metrics.update_chunk_state(chunk.state());
metrics.update_chunk_state(
Some(prev_chunk_state),
Some((chunk.state().metric_label(), chunk.size())),
);
}
}
}
@ -1217,6 +1344,21 @@ mod tests {
assert_batches_eq!(expected, &batches);
}
fn catalog_chunk_size_bytes_metric_eq(
reg: &metrics::TestMetricRegistry,
state: &'static str,
v: u64,
) -> Result<(), metrics::Error> {
reg.has_metric_family("catalog_chunk_size_bytes")
.with_labels(&[
("db_name", "placeholder"),
("state", state),
("svr_id", "1"),
])
.gauge()
.eq(v as f64)
}
#[tokio::test]
async fn metrics_during_rollover() {
let test_db = make_db();
@ -1237,9 +1379,15 @@ mod tests {
.eq(1.0)
.unwrap();
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "open", 72).unwrap();
// write into same chunk again.
write_lp(db.as_ref(), "cpu bar=2 10");
// verify chunk size updated
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "open", 88).unwrap();
// Still only one chunk open
test_db
.metric_registry
@ -1268,6 +1416,10 @@ mod tests {
.eq(1.0)
.unwrap();
// verify chunk size updated (chunk moved from open to closed)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "open", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "closed", 88).unwrap();
db.load_chunk_to_read_buffer("1970-01-01T00", "cpu", 0)
.await
.unwrap();
@ -1285,6 +1437,11 @@ mod tests {
.eq(1.0)
.unwrap();
// verify chunk size updated (chunk moved from closing to moving to moved)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "closed", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moving", 0).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
db.write_chunk_to_object_store("1970-01-01T00", "cpu", 0)
.await
.unwrap();
@ -1302,11 +1459,14 @@ mod tests {
.eq(1.0)
.unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1897).unwrap(); // now also in OS
db.unload_read_buffer("1970-01-01T00", "cpu", 0)
.await
.unwrap();
// A chunk is now no longer in read buffer
// A chunk is now now in the "os-only" state.
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
@ -1314,6 +1474,11 @@ mod tests {
.counter()
.eq(1.0)
.unwrap();
// verify chunk size not increased for OS (it was in OS before unload)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "os", 1897).unwrap();
// verify chunk size for RB has decreased
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 0).unwrap();
}
#[tokio::test]
@ -1403,7 +1568,9 @@ mod tests {
#[tokio::test]
async fn read_from_read_buffer() {
// Test that data can be loaded into the ReadBuffer
let db = Arc::new(make_db().db);
let test_db = make_db();
let db = Arc::new(test_db.db);
write_lp(db.as_ref(), "cpu bar=1 10");
write_lp(db.as_ref(), "cpu bar=2 20");
@ -1437,6 +1604,22 @@ mod tests {
let batches = run_query(Arc::clone(&db), "select * from cpu").await;
assert_batches_eq!(&expected, &batches);
// A chunk is now in the object store
test_db
.metric_registry
.has_metric_family("catalog_chunks_total")
.with_labels(&[
("db_name", "placeholder"),
("state", "moved"),
("svr_id", "1"),
])
.counter()
.eq(1.0)
.unwrap();
// verify chunk size updated (chunk moved from moved to writing to written)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 1222).unwrap();
// drop, the chunk from the read buffer
db.drop_chunk(partition_key, "cpu", mb_chunk.id()).unwrap();
assert_eq!(
@ -1444,6 +1627,9 @@ mod tests {
vec![] as Vec<u32>
);
// verify chunk size updated (chunk dropped from moved state)
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "moved", 0).unwrap();
// Currently this doesn't work (as we need to teach the stores how to
// purge tables after data bas been dropped println!("running
// query after all data dropped!"); let expected = vec![] as