refactor: move chunk metrics to module (#3548)
* refactor: move chunk metrics into module * refactor: use Metrics as internal name Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
ce568ab447
commit
5befa7922b
|
@ -1,12 +1,12 @@
|
|||
use crate::{
|
||||
column::Statistics,
|
||||
metrics::Metrics,
|
||||
row_group::{ColumnName, Predicate, RowGroup},
|
||||
schema::{AggregateType, ResultSchema},
|
||||
table::{self, Table},
|
||||
};
|
||||
use arrow::{error::ArrowError, record_batch::RecordBatch};
|
||||
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
|
||||
use metric::{Attributes, CumulativeGauge, CumulativeRecorder, RecorderCollection};
|
||||
|
||||
use observability_deps::tracing::debug;
|
||||
use schema::selection::Selection;
|
||||
use schema::{builder::Error as SchemaError, Schema};
|
||||
|
@ -54,7 +54,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
/// A `Chunk` is a horizontal partition of data for a single table.
|
||||
pub struct Chunk {
|
||||
// All metrics for the chunk.
|
||||
metrics: ChunkMetrics,
|
||||
metrics: Metrics,
|
||||
|
||||
// The table associated with the chunk.
|
||||
pub(crate) table: Table,
|
||||
|
@ -62,11 +62,7 @@ pub struct Chunk {
|
|||
|
||||
impl Chunk {
|
||||
/// Start a new Chunk from the given record batch.
|
||||
pub fn new(
|
||||
table_name: impl Into<String>,
|
||||
table_data: RecordBatch,
|
||||
metrics: ChunkMetrics,
|
||||
) -> Self {
|
||||
pub fn new(table_name: impl Into<String>, table_data: RecordBatch, metrics: Metrics) -> Self {
|
||||
let table_name = table_name.into();
|
||||
let row_group = record_batch_to_row_group(&table_name, table_data);
|
||||
|
||||
|
@ -77,7 +73,7 @@ impl Chunk {
|
|||
pub(crate) fn new_from_row_group(
|
||||
table_name: impl Into<String>,
|
||||
row_group: RowGroup,
|
||||
mut metrics: ChunkMetrics,
|
||||
mut metrics: Metrics,
|
||||
) -> Self {
|
||||
let storage_statistics = row_group.column_storage_statistics();
|
||||
metrics.update_column_storage_statistics(&storage_statistics);
|
||||
|
@ -374,7 +370,7 @@ pub struct ChunkBuilder {
|
|||
record_batches: Vec<RecordBatch>,
|
||||
|
||||
row_group_min_rows: usize,
|
||||
chunk_metrics: Option<ChunkMetrics>,
|
||||
chunk_metrics: Option<Metrics>,
|
||||
row_groups: Vec<RowGroup>,
|
||||
}
|
||||
|
||||
|
@ -393,7 +389,7 @@ impl ChunkBuilder {
|
|||
|
||||
/// Provide metrics for the built `Chunk`. If no metrics are provided then
|
||||
/// no metrics will be tracked for the `Chunk`.
|
||||
pub fn with_metrics(mut self, metrics: ChunkMetrics) -> Self {
|
||||
pub fn with_metrics(mut self, metrics: Metrics) -> Self {
|
||||
self.chunk_metrics = Some(metrics);
|
||||
self
|
||||
}
|
||||
|
@ -475,7 +471,7 @@ impl ChunkBuilder {
|
|||
match self.chunk_metrics.take() {
|
||||
// avoid partial move of self
|
||||
Some(metrics) => metrics,
|
||||
None => ChunkMetrics::new_unregistered(),
|
||||
None => Metrics::new_unregistered(),
|
||||
},
|
||||
);
|
||||
|
||||
|
@ -513,139 +509,6 @@ impl ChunkBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
/// The collection of metrics exposed by the Read Buffer. Note: several of these
|
||||
/// be better represented as distributions, but the histogram story in IOx is not
|
||||
/// yet figured out.
|
||||
#[derive(Debug)]
|
||||
pub struct ChunkMetrics {
|
||||
/// The base attributes to use for all metrics
|
||||
base_attributes: Attributes,
|
||||
|
||||
/// The total number of row groups in the chunk.
|
||||
row_groups_total: CumulativeRecorder,
|
||||
|
||||
/// This metric tracks the total number of columns in read buffer.
|
||||
columns_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks the total number of values stored in read buffer
|
||||
/// column encodings further segmented by nullness.
|
||||
column_values_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks the total number of bytes used by read buffer columns
|
||||
/// including any allocated but unused buffers.
|
||||
column_allocated_bytes_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks the minimal number of bytes required by read buffer
|
||||
/// columns but not including allocated but unused buffers. It's primarily
|
||||
/// of interest to the development of the Read Buffer.
|
||||
column_required_bytes_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks an estimated uncompressed data size for read buffer
|
||||
/// columns, further segmented by nullness. It is a building block for
|
||||
/// tracking a measure of overall compression.
|
||||
column_raw_bytes_total: RecorderCollection<CumulativeGauge>,
|
||||
}
|
||||
|
||||
impl ChunkMetrics {
|
||||
pub fn new(registry: &metric::Registry, db_name: impl Into<String>) -> Self {
|
||||
let db_name = db_name.into();
|
||||
let base_attributes = Attributes::from([("db_name", db_name.into())]);
|
||||
|
||||
Self {
|
||||
base_attributes: base_attributes.clone(),
|
||||
row_groups_total: registry.register_metric::<CumulativeGauge>(
|
||||
"read_buffer_row_group_total",
|
||||
"The number of row groups within the Read Buffer",
|
||||
).recorder(base_attributes),
|
||||
columns_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_total",
|
||||
"The number of columns within the Read Buffer",
|
||||
)),
|
||||
column_values_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_values",
|
||||
"The number of values within columns in the Read Buffer",
|
||||
)),
|
||||
column_allocated_bytes_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_allocated_bytes",
|
||||
"The number of bytes used by all data in the Read Buffer including allocated by unused buffers",
|
||||
)),
|
||||
column_required_bytes_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_required_bytes",
|
||||
"The number of bytes currently required to store data in the Read Buffer excluding allocated by unused buffers",
|
||||
)),
|
||||
column_raw_bytes_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_raw_bytes",
|
||||
"The number of bytes used by all columns if they were uncompressed in the Read Buffer",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates an instance of ChunkMetrics that isn't registered with a central
|
||||
/// metric registry. Observations made to instruments on this ChunkMetrics instance
|
||||
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
|
||||
/// created on a metric registry
|
||||
pub fn new_unregistered() -> Self {
|
||||
Self {
|
||||
base_attributes: Attributes::from([]),
|
||||
row_groups_total: CumulativeRecorder::new_unregistered(),
|
||||
columns_total: RecorderCollection::new_unregistered(),
|
||||
column_values_total: RecorderCollection::new_unregistered(),
|
||||
column_allocated_bytes_total: RecorderCollection::new_unregistered(),
|
||||
column_required_bytes_total: RecorderCollection::new_unregistered(),
|
||||
column_raw_bytes_total: RecorderCollection::new_unregistered(),
|
||||
}
|
||||
}
|
||||
|
||||
// Updates column storage statistics for the Read Buffer.
|
||||
fn update_column_storage_statistics(&mut self, statistics: &[Statistics]) {
|
||||
// increase number of row groups in chunk.
|
||||
self.row_groups_total.inc(1);
|
||||
|
||||
for stat in statistics {
|
||||
let mut attributes = self.base_attributes.clone();
|
||||
attributes.insert("encoding", stat.enc_type.clone());
|
||||
attributes.insert("log_data_type", stat.log_data_type);
|
||||
|
||||
// update number of columns
|
||||
self.columns_total.recorder(attributes.clone()).inc(1);
|
||||
|
||||
// update bytes allocated associated with columns
|
||||
self.column_allocated_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.allocated_bytes as u64);
|
||||
|
||||
// update bytes in use but excluded unused
|
||||
self.column_required_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.required_bytes as u64);
|
||||
|
||||
attributes.insert("null", "true");
|
||||
|
||||
// update raw estimated bytes of NULL values
|
||||
self.column_raw_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc((stat.raw_bytes - stat.raw_bytes_no_null) as u64);
|
||||
|
||||
// update number of NULL values
|
||||
self.column_values_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.nulls as u64);
|
||||
|
||||
attributes.insert("null", "false");
|
||||
|
||||
// update raw estimated bytes of non-NULL values
|
||||
self.column_raw_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.raw_bytes_no_null as u64);
|
||||
|
||||
// update number of non-NULL values
|
||||
self.column_values_total
|
||||
.recorder(attributes)
|
||||
.inc((stat.values - stat.nulls) as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
@ -665,7 +528,7 @@ mod test {
|
|||
},
|
||||
};
|
||||
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
|
||||
use metric::{MetricKind, Observation, ObservationSet, RawReporter};
|
||||
use metric::{Attributes, MetricKind, Observation, ObservationSet, RawReporter};
|
||||
use schema::builder::SchemaBuilder;
|
||||
use std::iter::FromIterator;
|
||||
use std::{num::NonZeroU64, sync::Arc};
|
||||
|
@ -836,7 +699,7 @@ mod test {
|
|||
|
||||
let rb = gen_recordbatch();
|
||||
let mut chunk = ChunkBuilder::new("mydb", rb.schema())
|
||||
.with_metrics(ChunkMetrics::new(®istry, "mydb"))
|
||||
.with_metrics(Metrics::new(®istry, "mydb"))
|
||||
.with_record_batch(rb)
|
||||
.must_build();
|
||||
|
||||
|
@ -1386,7 +1249,7 @@ mod test {
|
|||
];
|
||||
let rg = RowGroup::new(6, columns);
|
||||
|
||||
let chunk = Chunk::new_from_row_group("table_1", rg, ChunkMetrics::new_unregistered());
|
||||
let chunk = Chunk::new_from_row_group("table_1", rg, Metrics::new_unregistered());
|
||||
|
||||
// No predicate so at least one row matches
|
||||
assert!(chunk.satisfies_predicate(&Predicate::default()));
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
#![allow(dead_code, clippy::too_many_arguments)]
|
||||
mod chunk;
|
||||
mod column;
|
||||
mod metrics;
|
||||
mod row_group;
|
||||
mod schema;
|
||||
mod table;
|
||||
|
@ -10,7 +11,8 @@ mod value;
|
|||
|
||||
// Identifiers that are exported as part of the public API.
|
||||
pub use self::schema::*;
|
||||
pub use chunk::{Chunk as RBChunk, ChunkBuilder as RBChunkBuilder, ChunkMetrics, Error};
|
||||
pub use chunk::{Chunk as RBChunk, ChunkBuilder as RBChunkBuilder, Error};
|
||||
pub use metrics::Metrics as ChunkMetrics;
|
||||
pub use row_group::{BinaryExpr, Predicate};
|
||||
pub use table::ReadFilterResults;
|
||||
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
use crate::column::Statistics;
|
||||
use metric::{Attributes, CumulativeGauge, CumulativeRecorder, RecorderCollection};
|
||||
|
||||
/// The collection of metrics exposed by the Read Buffer. Note: several of these
|
||||
/// be better represented as distributions, but the histogram story in IOx is not
|
||||
/// yet figured out.
|
||||
#[derive(Debug)]
|
||||
pub struct Metrics {
|
||||
/// The base attributes to use for all metrics
|
||||
base_attributes: Attributes,
|
||||
|
||||
/// The total number of row groups in the chunk.
|
||||
row_groups_total: CumulativeRecorder,
|
||||
|
||||
/// This metric tracks the total number of columns in read buffer.
|
||||
columns_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks the total number of values stored in read buffer
|
||||
/// column encodings further segmented by nullness.
|
||||
column_values_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks the total number of bytes used by read buffer columns
|
||||
/// including any allocated but unused buffers.
|
||||
column_allocated_bytes_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks the minimal number of bytes required by read buffer
|
||||
/// columns but not including allocated but unused buffers. It's primarily
|
||||
/// of interest to the development of the Read Buffer.
|
||||
column_required_bytes_total: RecorderCollection<CumulativeGauge>,
|
||||
|
||||
/// This metric tracks an estimated uncompressed data size for read buffer
|
||||
/// columns, further segmented by nullness. It is a building block for
|
||||
/// tracking a measure of overall compression.
|
||||
column_raw_bytes_total: RecorderCollection<CumulativeGauge>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
pub fn new(registry: &metric::Registry, db_name: impl Into<String>) -> Self {
|
||||
let db_name = db_name.into();
|
||||
let base_attributes = Attributes::from([("db_name", db_name.into())]);
|
||||
|
||||
Self {
|
||||
base_attributes: base_attributes.clone(),
|
||||
row_groups_total: registry.register_metric::<CumulativeGauge>(
|
||||
"read_buffer_row_group_total",
|
||||
"The number of row groups within the Read Buffer",
|
||||
).recorder(base_attributes),
|
||||
columns_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_total",
|
||||
"The number of columns within the Read Buffer",
|
||||
)),
|
||||
column_values_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_values",
|
||||
"The number of values within columns in the Read Buffer",
|
||||
)),
|
||||
column_allocated_bytes_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_allocated_bytes",
|
||||
"The number of bytes used by all data in the Read Buffer including allocated by unused buffers",
|
||||
)),
|
||||
column_required_bytes_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_required_bytes",
|
||||
"The number of bytes currently required to store data in the Read Buffer excluding allocated by unused buffers",
|
||||
)),
|
||||
column_raw_bytes_total: RecorderCollection::new(registry.register_metric(
|
||||
"read_buffer_column_raw_bytes",
|
||||
"The number of bytes used by all columns if they were uncompressed in the Read Buffer",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates an instance of ChunkMetrics that isn't registered with a central
|
||||
/// metric registry. Observations made to instruments on this ChunkMetrics instance
|
||||
/// will therefore not be visible to other ChunkMetrics instances or metric instruments
|
||||
/// created on a metric registry
|
||||
pub fn new_unregistered() -> Self {
|
||||
Self {
|
||||
base_attributes: Attributes::from([]),
|
||||
row_groups_total: CumulativeRecorder::new_unregistered(),
|
||||
columns_total: RecorderCollection::new_unregistered(),
|
||||
column_values_total: RecorderCollection::new_unregistered(),
|
||||
column_allocated_bytes_total: RecorderCollection::new_unregistered(),
|
||||
column_required_bytes_total: RecorderCollection::new_unregistered(),
|
||||
column_raw_bytes_total: RecorderCollection::new_unregistered(),
|
||||
}
|
||||
}
|
||||
|
||||
// Updates column storage statistics for the Read Buffer.
|
||||
pub(crate) fn update_column_storage_statistics(&mut self, statistics: &[Statistics]) {
|
||||
// increase number of row groups in chunk.
|
||||
self.row_groups_total.inc(1);
|
||||
|
||||
for stat in statistics {
|
||||
let mut attributes = self.base_attributes.clone();
|
||||
attributes.insert("encoding", stat.enc_type.clone());
|
||||
attributes.insert("log_data_type", stat.log_data_type);
|
||||
|
||||
// update number of columns
|
||||
self.columns_total.recorder(attributes.clone()).inc(1);
|
||||
|
||||
// update bytes allocated associated with columns
|
||||
self.column_allocated_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.allocated_bytes as u64);
|
||||
|
||||
// update bytes in use but excluded unused
|
||||
self.column_required_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.required_bytes as u64);
|
||||
|
||||
attributes.insert("null", "true");
|
||||
|
||||
// update raw estimated bytes of NULL values
|
||||
self.column_raw_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc((stat.raw_bytes - stat.raw_bytes_no_null) as u64);
|
||||
|
||||
// update number of NULL values
|
||||
self.column_values_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.nulls as u64);
|
||||
|
||||
attributes.insert("null", "false");
|
||||
|
||||
// update raw estimated bytes of non-NULL values
|
||||
self.column_raw_bytes_total
|
||||
.recorder(attributes.clone())
|
||||
.inc(stat.raw_bytes_no_null as u64);
|
||||
|
||||
// update number of non-NULL values
|
||||
self.column_values_total
|
||||
.recorder(attributes)
|
||||
.inc((stat.values - stat.nulls) as u64);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue