Merge pull request #1369 from influxdata/entrysize
feat: Add ingest_entries_bytes_total counterpull/24376/head
commit
98be7e8b00
|
@ -253,11 +253,14 @@ pub struct ServerMetrics {
|
|||
/// This metric tracks all requests to the Server
|
||||
pub http_requests: metrics::RedMetric,
|
||||
|
||||
/// The number of LP points written
|
||||
/// The number of LP points ingested
|
||||
pub ingest_points_total: metrics::Counter,
|
||||
|
||||
/// The number of bytes written
|
||||
/// The number of LP bytes ingested
|
||||
pub ingest_points_bytes_total: metrics::Counter,
|
||||
|
||||
/// The number of Entry bytes ingested
|
||||
pub ingest_entries_bytes_total: metrics::Counter,
|
||||
}
|
||||
|
||||
impl ServerMetrics {
|
||||
|
@ -271,12 +274,17 @@ impl ServerMetrics {
|
|||
ingest_points_total: ingest_domain.register_counter_metric(
|
||||
"points",
|
||||
None,
|
||||
"total LP points written",
|
||||
"total LP points ingested",
|
||||
),
|
||||
ingest_points_bytes_total: ingest_domain.register_counter_metric(
|
||||
"points",
|
||||
Some("bytes"),
|
||||
"total LP points bytes written",
|
||||
"total LP points bytes ingested",
|
||||
),
|
||||
ingest_entries_bytes_total: ingest_domain.register_counter_metric(
|
||||
"entries",
|
||||
Some("bytes"),
|
||||
"total Entry bytes ingested",
|
||||
),
|
||||
}
|
||||
}
|
||||
|
@ -536,7 +544,10 @@ impl<M: ConnectionManager> Server<M> {
|
|||
self.write_entry_downstream(db_name, node_group, sharded_entry.entry)
|
||||
.await?
|
||||
}
|
||||
None => self.write_entry_local(db, sharded_entry.entry).await?,
|
||||
None => {
|
||||
self.write_entry_local(&db_name, db, sharded_entry.entry)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -586,17 +597,35 @@ impl<M: ConnectionManager> Server<M> {
|
|||
.context(DatabaseNotFound { db_name: &*db_name })?;
|
||||
|
||||
let entry = entry_bytes.try_into().context(DecodingEntry)?;
|
||||
self.write_entry_local(&db, entry).await
|
||||
self.write_entry_local(&db_name, &db, entry).await
|
||||
}
|
||||
|
||||
pub async fn write_entry_local(&self, db: &Db, entry: Entry) -> Result<()> {
|
||||
db.store_entry(entry).map_err(|e| match e {
|
||||
db::Error::HardLimitReached {} => Error::HardLimitReached {},
|
||||
_ => Error::UnknownDatabaseError {
|
||||
source: Box::new(e),
|
||||
},
|
||||
pub async fn write_entry_local(&self, db_name: &str, db: &Db, entry: Entry) -> Result<()> {
|
||||
let bytes = entry.data().len() as u64;
|
||||
db.store_entry(entry).map_err(|e| {
|
||||
self.metrics.ingest_entries_bytes_total.add_with_labels(
|
||||
bytes,
|
||||
&[
|
||||
metrics::KeyValue::new("status", "error"),
|
||||
metrics::KeyValue::new("db_name", db_name.to_string()),
|
||||
],
|
||||
);
|
||||
match e {
|
||||
db::Error::HardLimitReached {} => Error::HardLimitReached {},
|
||||
_ => Error::UnknownDatabaseError {
|
||||
source: Box::new(e),
|
||||
},
|
||||
}
|
||||
})?;
|
||||
|
||||
self.metrics.ingest_entries_bytes_total.add_with_labels(
|
||||
bytes,
|
||||
&[
|
||||
metrics::KeyValue::new("status", "ok"),
|
||||
metrics::KeyValue::new("db_name", db_name.to_string()),
|
||||
],
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -929,12 +958,21 @@ mod tests {
|
|||
use super::*;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
fn config() -> ServerConfig {
|
||||
ServerConfig::new(
|
||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||
Arc::new(MetricRegistry::new()), // new registry ensures test isolation of metrics
|
||||
fn config_with_metric_registry() -> (metrics::TestMetricRegistry, ServerConfig) {
|
||||
let registry = Arc::new(metrics::MetricRegistry::new());
|
||||
let test_registry = metrics::TestMetricRegistry::new(Arc::clone(®istry));
|
||||
(
|
||||
test_registry,
|
||||
ServerConfig::new(
|
||||
Arc::new(ObjectStore::new_in_memory(InMemory::new())),
|
||||
registry, // new registry ensures test isolation of metrics
|
||||
)
|
||||
.with_num_worker_threads(1),
|
||||
)
|
||||
.with_num_worker_threads(1)
|
||||
}
|
||||
|
||||
fn config() -> ServerConfig {
|
||||
config_with_metric_registry().1
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1108,8 +1146,9 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn write_entry_local() {
|
||||
let (metric_registry, config) = config_with_metric_registry();
|
||||
let manager = TestConnectionManager::new();
|
||||
let server = Server::new(manager, config());
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
|
||||
let name = DatabaseName::new("foo".to_string()).unwrap();
|
||||
|
@ -1147,6 +1186,13 @@ mod tests {
|
|||
"+-----+-------------------------------+",
|
||||
];
|
||||
assert_table_eq!(expected, &batches);
|
||||
|
||||
metric_registry
|
||||
.has_metric_family("ingest_entries_bytes_total")
|
||||
.with_labels(&[("status", "ok"), ("db_name", "foo")])
|
||||
.counter()
|
||||
.eq(240.0)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// This tests sets up a database with a sharding config which defines exactly one shard
|
||||
|
|
Loading…
Reference in New Issue